package se.sics.nstream.torrent.transfer;

import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifiable;
import se.sics.kompics.util.Identifier;
import se.sics.kompics.util.PatternExtractorHelper;
import se.sics.ktoolbox.util.config.impl.SystemKCWrapper;
import se.sics.ktoolbox.util.network.KAddress;
import se.sics.ktoolbox.util.network.KContentMsg;
import se.sics.ktoolbox.util.network.KHeader;
import se.sics.ktoolbox.util.network.basic.BasicContentMsg;
import se.sics.ktoolbox.util.network.basic.BasicHeader;
import se.sics.ledbat.core.AppCongestionWindow;
import se.sics.ledbat.core.LedbatConfig;
import se.sics.ledbat.ncore.msg.LedbatMsg;
import se.sics.nstream.ConnId;
import se.sics.nstream.old.torrent.event.TorrentTimeout;
import se.sics.nstream.torrent.old.TransferConfig;
import se.sics.nstream.torrent.transfer.dwnl.event.CompletedBlocks;
import se.sics.nstream.torrent.transfer.dwnl.event.DownloadBlocks;
import se.sics.nstream.torrent.transfer.dwnl.event.FPDControl;
import se.sics.nstream.torrent.transfer.msg.CacheHint;
import se.sics.nstream.torrent.transfer.msg.DownloadHash;
import se.sics.nstream.torrent.transfer.msg.DownloadPiece;
import se.sics.nstream.torrent.transfer.tracking.DownloadTrackingReport;
import se.sics.nstream.torrent.transfer.tracking.DownloadTrackingTrace;
import se.sics.nstream.torrent.transfer.tracking.DwnlConnTracker;
import se.sics.nstream.torrent.transfer.tracking.TransferTrackingPort;
import se.sics.nstream.torrent.transfer.tracking.event.TrackingConnection;
import se.sics.nstream.util.BlockDetails;
import se.sics.nutil.network.bestEffort.event.BestEffortMsg;
import se.sics.nutil.tracking.load.NetworkQueueLoadProxy;

/* loaded from: input_file:se/sics/nstream/torrent/transfer/DwnlConnComp.class */
public class DwnlConnComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DwnlConnComp.class);
    private String logPrefix;
    private static final long ADVANCE_DOWNLOAD = 1000;
    private static final long CACHE_BASE_TIMEOUT = 1000;
    private static final int CACHE_RETRY = 30;
    private static final long REPORT_PERIOD = 200;
    private final ConnId connId;
    private final List<KAddress> self;
    private final List<KAddress> target;
    private final int parallelPorts;
    private final NetworkQueueLoadProxy networkQueueLoad;
    private final AppCongestionWindow cwnd;
    private final DwnlConnWorkCtrl workController;
    private final Optional<DwnlConnTracker> tracker;
    private UUID advanceDownloadTid;
    private UUID cacheTid;
    private UUID reportTid;
    private final LedbatConfig ledbatConfig;
    Negative<DwnlConnPort> connPort = provides(DwnlConnPort.class);
    Negative<TransferTrackingPort> reportPort = provides(TransferTrackingPort.class);
    Positive<Network> networkPort = requires(Network.class);
    Positive<Timer> timerPort = requires(Timer.class);
    private final Map<Identifier, Identifiable> pendingMsgs = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            DwnlConnComp.LOG.info("{}starting conn to:{}", DwnlConnComp.this.logPrefix, DwnlConnComp.this.target);
            DwnlConnComp.this.networkQueueLoad.start();
            DwnlConnComp.this.scheduleAdvanceDownload();
            DwnlConnComp.this.scheduleReport();
        }
    };
    Handler handleReport = new Handler<ReportTimeout>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.2
        @Override // se.sics.kompics.Handler
        public void handle(ReportTimeout reportTimeout) {
            DwnlConnComp.LOG.trace("{}reporting", DwnlConnComp.this.logPrefix);
            DwnlConnComp.this.networkQueueLoad.queueDelay();
            DwnlConnComp.this.trigger(new DownloadTrackingReport(DwnlConnComp.this.connId, new DownloadTrackingTrace(DwnlConnComp.this.cwnd.report(), DwnlConnComp.this.workController.blockSize(), DwnlConnComp.this.cwnd.cwnd())), DwnlConnComp.this.reportPort);
        }
    };
    Handler handleAdvanceDownload = new Handler<TorrentTimeout.AdvanceDownload>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.3
        @Override // se.sics.kompics.Handler
        public void handle(TorrentTimeout.AdvanceDownload advanceDownload) {
            DwnlConnComp.LOG.info("{}advance download", DwnlConnComp.this.logPrefix);
            long currentTimeMillis = System.currentTimeMillis();
            DwnlConnComp.this.cwnd.adjustState(currentTimeMillis, DwnlConnComp.this.networkQueueLoad.adjustment());
            DwnlConnComp.this.tryDownload(currentTimeMillis);
        }
    };
    Handler handleFPDControl = new Handler<FPDControl>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.4
        @Override // se.sics.kompics.Handler
        public void handle(FPDControl fPDControl) {
            DwnlConnComp.LOG.error("{}fpd", DwnlConnComp.this.logPrefix);
            double min = Math.min(fPDControl.appCwndAdjustment, DwnlConnComp.this.networkQueueLoad.adjustment());
            DwnlConnComp.this.cwnd.adjustState(System.currentTimeMillis(), min);
        }
    };
    private Handler handleNewBlocks = new Handler<DownloadBlocks>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.5
        @Override // se.sics.kompics.Handler
        public void handle(DownloadBlocks downloadBlocks) {
            DwnlConnComp.LOG.trace("{}new blocks:{}", DwnlConnComp.this.logPrefix, downloadBlocks.blocks);
            DwnlConnComp.this.workController.add(downloadBlocks.blocks, downloadBlocks.irregularBlocks);
        }
    };
    ClassMatchedHandler handleNetworkTimeouts = new ClassMatchedHandler<BestEffortMsg.Timeout, KContentMsg<KAddress, KHeader<KAddress>, BestEffortMsg.Timeout>>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.6
        @Override // se.sics.kompics.MatchedHandler
        public void handle(BestEffortMsg.Timeout timeout, KContentMsg<KAddress, KHeader<KAddress>, BestEffortMsg.Timeout> kContentMsg) {
            Identifiable identifiable = (Identifiable) PatternExtractorHelper.peelAllLayers(timeout);
            if (identifiable instanceof CacheHint.Request) {
                DwnlConnComp.this.handleCacheTimeout((CacheHint.Request) identifiable);
                return;
            }
            if (identifiable instanceof DownloadPiece.Request) {
                DwnlConnComp.this.handlePieceTimeout((DownloadPiece.Request) identifiable);
                DwnlConnComp.this.reportTimeout(System.currentTimeMillis(), identifiable, timeout.req.rto);
            } else if (!(identifiable instanceof DownloadHash.Request)) {
                DwnlConnComp.LOG.warn("{}!!!possible performance issue - fix:{}", DwnlConnComp.this.logPrefix, identifiable);
            } else {
                DwnlConnComp.this.handleHashTimeout((DownloadHash.Request) identifiable);
                DwnlConnComp.this.reportTimeout(System.currentTimeMillis(), identifiable, timeout.req.rto);
            }
        }
    };
    ClassMatchedHandler handleCache = new ClassMatchedHandler<CacheHint.Response, KContentMsg<KAddress, KHeader<KAddress>, CacheHint.Response>>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.7
        @Override // se.sics.kompics.MatchedHandler
        public void handle(CacheHint.Response response, KContentMsg<KAddress, KHeader<KAddress>, CacheHint.Response> kContentMsg) {
            CacheHint.Request request = (CacheHint.Request) DwnlConnComp.this.pendingMsgs.remove(response.getId());
            if (request != null) {
                DwnlConnComp.LOG.debug("{}cache confirm ts:{}", DwnlConnComp.this.logPrefix, Long.valueOf(request.requestCache.lStamp));
                DwnlConnComp.this.workController.cacheConfirmed(request.requestCache.lStamp);
                DwnlConnComp.this.tryDownload(System.currentTimeMillis());
            }
        }
    };
    ClassMatchedHandler handleLedbat = new ClassMatchedHandler<LedbatMsg.Response, KContentMsg<KAddress, KHeader<KAddress>, LedbatMsg.Response>>() { // from class: se.sics.nstream.torrent.transfer.DwnlConnComp.8
        @Override // se.sics.kompics.MatchedHandler
        public void handle(LedbatMsg.Response response, KContentMsg<KAddress, KHeader<KAddress>, LedbatMsg.Response> kContentMsg) {
            Identifiable extractValue = response.extractValue();
            if (extractValue instanceof DownloadPiece.Success) {
                DwnlConnComp.this.handlePiece(response);
                return;
            }
            if (extractValue instanceof DownloadHash.Success) {
                DwnlConnComp.this.handleHash(response);
            } else if (extractValue instanceof DownloadPiece.BadRequest) {
                DwnlConnComp.LOG.warn("{}dropping bad request:{} - if this is due to retransmission - it should be fine", DwnlConnComp.this.logPrefix, extractValue);
            } else {
                if (!(extractValue instanceof DownloadHash.BadRequest)) {
                    throw new RuntimeException("ups");
                }
                DwnlConnComp.LOG.warn("{}dropping bad request:{} - if this is due to retransmission - it should be fine", DwnlConnComp.this.logPrefix, extractValue);
            }
        }
    };

    /* loaded from: input_file:se/sics/nstream/torrent/transfer/DwnlConnComp$Init.class */
    public static class Init extends se.sics.kompics.Init<DwnlConnComp> {
        public final ConnId connId;
        public final KAddress self;
        public final KAddress target;
        public final BlockDetails defaultBlockDetails;
        public final boolean withHashes;

        public Init(ConnId connId, KAddress kAddress, KAddress kAddress2, BlockDetails blockDetails, boolean z) {
            this.connId = connId;
            this.self = kAddress;
            this.target = kAddress2;
            this.defaultBlockDetails = blockDetails;
            this.withHashes = z;
        }
    }

    /* loaded from: input_file:se/sics/nstream/torrent/transfer/DwnlConnComp$ReportTimeout.class */
    public static class ReportTimeout extends Timeout {
        public ReportTimeout(SchedulePeriodicTimeout schedulePeriodicTimeout) {
            super(schedulePeriodicTimeout);
        }
    }

    public DwnlConnComp(Init init) {
        this.connId = init.connId;
        SystemKCWrapper systemKCWrapper = new SystemKCWrapper(config());
        if (systemKCWrapper.parallelPorts.isPresent()) {
            this.parallelPorts = systemKCWrapper.parallelPorts.get().intValue();
        } else {
            this.parallelPorts = 1;
        }
        this.self = new ArrayList(this.parallelPorts);
        this.target = new ArrayList(this.parallelPorts);
        this.self.add(0, init.self);
        this.target.add(0, init.target);
        for (int i = 1; i < this.parallelPorts; i++) {
            KAddress withPort = init.self.withPort(init.self.getPort() + i);
            this.self.add(i, withPort);
            KAddress withPort2 = init.target.withPort(init.target.getPort() + i);
            this.target.add(i, withPort2);
            LOG.info("{}setting s:{} t:{}", this.logPrefix, withPort, withPort2);
        }
        this.logPrefix = "<" + this.connId.toString() + ">";
        DwnlConnConfig dwnlConnConfig = new DwnlConnConfig(config());
        this.ledbatConfig = new LedbatConfig(config());
        this.networkQueueLoad = NetworkQueueLoadProxy.instance("load_dwnl_" + this.logPrefix, this.proxy, config(), dwnlConnConfig.reportDir);
        this.cwnd = new AppCongestionWindow(this.ledbatConfig, this.connId, dwnlConnConfig.minRTO, dwnlConnConfig.reportDir);
        this.workController = new DwnlConnWorkCtrl(init.defaultBlockDetails, init.withHashes);
        if (dwnlConnConfig.reportDir.isPresent()) {
            this.tracker = Optional.fromNullable(DwnlConnTracker.onDisk(dwnlConnConfig.reportDir.get(), this.connId, this.parallelPorts));
        } else {
            this.tracker = Optional.absent();
        }
        subscribe(this.handleStart, this.control);
        subscribe(this.handleReport, this.timerPort);
        subscribe(this.handleAdvanceDownload, this.timerPort);
        subscribe(this.handleFPDControl, this.connPort);
        subscribe(this.handleNewBlocks, this.connPort);
        subscribe(this.handleNetworkTimeouts, this.networkPort);
        subscribe(this.handleCache, this.networkPort);
        subscribe(this.handleLedbat, this.networkPort);
    }

    @Override // se.sics.kompics.ComponentDefinition
    public void tearDown() {
        LOG.info("{}tear down", this.logPrefix);
        Iterator<Identifiable> it = this.pendingMsgs.values().iterator();
        while (it.hasNext()) {
            cancelMsg(it.next());
        }
        this.networkQueueLoad.tearDown();
        cancelAdvanceDownload();
        cancelReport();
        this.cwnd.close();
        if (this.tracker.isPresent()) {
            this.tracker.get().close();
        }
        trigger(new TrackingConnection.Close(this.connId), this.reportPort);
    }

    public void handleCacheTimeout(CacheHint.Request request) {
        LOG.error("{}cache timeout on hint:{} blocks:{}", this.logPrefix, Long.valueOf(request.requestCache.lStamp), request.requestCache.blocks);
        throw new RuntimeException("ups");
    }

    public void handlePieceTimeout(DownloadPiece.Request request) {
        LOG.debug("{}piece timeout:<{},{}>", this.logPrefix, request.piece.getValue0(), request.piece.getValue1());
        if (this.pendingMsgs.remove(request.msgId) != null) {
            this.workController.pieceTimeout(request.piece);
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.cwnd.timeout(currentTimeMillis, this.ledbatConfig.mss);
        tryDownload(currentTimeMillis);
    }

    public void handleHashTimeout(DownloadHash.Request request) {
        LOG.debug("{}hash timeout:<{}>", this.logPrefix, request.hashes);
        if (this.pendingMsgs.remove(request.msgId) != null) {
            this.workController.hashTimeout(request.hashes);
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.cwnd.timeout(currentTimeMillis, this.ledbatConfig.mss);
        tryDownload(currentTimeMillis);
    }

    private Pair<KAddress, KAddress> getSrcDst(Identifiable identifiable) {
        int partition = identifiable.getId().partition(this.parallelPorts);
        return Pair.with(this.self.get(partition), this.target.get(partition));
    }

    private void cancelMsg(Identifiable identifiable) {
        BestEffortMsg.Cancel cancel = new BestEffortMsg.Cancel(identifiable);
        Pair<KAddress, KAddress> srcDst = getSrcDst(identifiable);
        BasicContentMsg basicContentMsg = new BasicContentMsg(new BasicHeader(srcDst.getValue0(), srcDst.getValue1(), Transport.UDP), cancel);
        LOG.trace("{}canceling:{}", this.logPrefix, identifiable);
        trigger(basicContentMsg, this.networkPort);
    }

    private void sendSimpleUDP(Identifiable identifiable, int i, long j) {
        BestEffortMsg.Request request = new BestEffortMsg.Request(identifiable, i, j);
        Pair<KAddress, KAddress> srcDst = getSrcDst(identifiable);
        BasicContentMsg basicContentMsg = new BasicContentMsg(new BasicHeader(srcDst.getValue0(), srcDst.getValue1(), Transport.UDP), request);
        LOG.trace("{}sending:{}", this.logPrefix, identifiable);
        trigger(basicContentMsg, this.networkPort);
        reportPortEvent(identifiable);
    }

    private void sendSimpleLedbat(Identifiable identifiable, int i) {
        sendSimpleUDP(new LedbatMsg.Request(identifiable), i, this.cwnd.getRTT());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handlePiece(LedbatMsg.Response<DownloadPiece.Success> response) {
        LOG.trace("{}received:{}", this.logPrefix, response);
        DownloadPiece.Success extractValue = response.extractValue();
        long currentTimeMillis = System.currentTimeMillis();
        if (this.pendingMsgs.remove(extractValue.msgId) != null) {
            this.workController.piece(extractValue.piece, extractValue.val.getRight());
            this.cwnd.success(currentTimeMillis, this.ledbatConfig.mss, response);
            tryDownload(currentTimeMillis);
        } else {
            LOG.debug("{}late piece:<{},{}>", this.logPrefix, extractValue.piece.getValue0(), extractValue.piece.getValue1());
            this.workController.latePiece(extractValue.piece, extractValue.val.getRight());
            this.cwnd.late(currentTimeMillis, this.ledbatConfig.mss, response);
            reportLate(System.currentTimeMillis(), response);
        }
        if (this.workController.hasComplete()) {
            Pair<Map<Integer, byte[]>, Map<Integer, byte[]>> complete = this.workController.getComplete();
            LOG.debug("{}completed hashes:{} blocks:{}", this.logPrefix, complete.getValue0().keySet(), complete.getValue1().keySet());
            trigger(new CompletedBlocks(this.connId, complete.getValue0(), complete.getValue1()), this.connPort);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleHash(LedbatMsg.Response<DownloadHash.Success> response) {
        LOG.trace("{}received:{}", this.logPrefix, response);
        DownloadHash.Success extractValue = response.extractValue();
        long currentTimeMillis = System.currentTimeMillis();
        if (this.pendingMsgs.remove(extractValue.msgId) != null) {
            this.workController.hashes(extractValue.hashValues);
            this.cwnd.success(currentTimeMillis, this.ledbatConfig.mss, response);
            tryDownload(currentTimeMillis);
        } else {
            LOG.debug("{}late hashes", this.logPrefix);
            this.workController.lateHashes(extractValue.hashValues);
            this.cwnd.late(currentTimeMillis, this.ledbatConfig.mss, response);
            reportLate(System.currentTimeMillis(), response);
        }
        if (this.workController.hasComplete()) {
            Pair<Map<Integer, byte[]>, Map<Integer, byte[]>> complete = this.workController.getComplete();
            LOG.trace("{}completed hashes:{} blocks:{}", this.logPrefix, complete.getValue0().keySet(), complete.getValue1().keySet());
            trigger(new CompletedBlocks(this.connId, complete.getValue0(), complete.getValue1()), this.connPort);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryDownload(long j) {
        if (this.workController.hasNewHint()) {
            CacheHint.Request request = new CacheHint.Request(this.connId.fileId, this.workController.newHint());
            LOG.debug("{}cache hint:{} ts:{} blocks:{}", this.logPrefix, request.getId(), Long.valueOf(request.requestCache.lStamp), request.requestCache.blocks);
            sendSimpleUDP(request, 30, 1000L);
            this.pendingMsgs.put(request.getId(), request);
        }
        while (this.workController.hasHashes() && this.cwnd.canSend()) {
            Identifiable request2 = new DownloadHash.Request(this.connId.fileId, this.workController.nextHashes());
            sendSimpleLedbat(request2, 5);
            this.pendingMsgs.put(request2.getId(), request2);
            this.cwnd.request(j, this.ledbatConfig.mss);
        }
        int i = 2;
        while (this.workController.hasPiece() && this.cwnd.canSend() && i > 0) {
            i--;
            Identifiable request3 = new DownloadPiece.Request(this.connId.fileId, this.workController.nextPiece());
            sendSimpleLedbat(request3, 1);
            this.pendingMsgs.put(request3.getId(), request3);
            this.cwnd.request(j, this.ledbatConfig.mss);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reportTimeout(long j, Identifiable identifiable, long j2) {
        if (this.tracker.isPresent()) {
            this.tracker.get().reportTimeout(j, identifiable.getId(), j2);
        }
    }

    private void reportLate(long j, LedbatMsg.Response response) {
        if (this.tracker.isPresent()) {
            this.tracker.get().reportLate(j, response);
        }
    }

    private void reportPortEvent(Identifiable identifiable) {
        if (this.tracker.isPresent()) {
            this.tracker.get().reportPortEvent(System.currentTimeMillis(), identifiable.getId().partition(this.parallelPorts));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleAdvanceDownload() {
        SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(TransferConfig.advanceDownloadPeriod, TransferConfig.advanceDownloadPeriod);
        TorrentTimeout.AdvanceDownload advanceDownload = new TorrentTimeout.AdvanceDownload(schedulePeriodicTimeout);
        schedulePeriodicTimeout.setTimeoutEvent(advanceDownload);
        this.advanceDownloadTid = advanceDownload.getTimeoutId();
        trigger(schedulePeriodicTimeout, this.timerPort);
    }

    private void cancelAdvanceDownload() {
        trigger(new CancelPeriodicTimeout(this.advanceDownloadTid), this.timerPort);
        this.advanceDownloadTid = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReport() {
        SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(REPORT_PERIOD, REPORT_PERIOD);
        ReportTimeout reportTimeout = new ReportTimeout(schedulePeriodicTimeout);
        schedulePeriodicTimeout.setTimeoutEvent(reportTimeout);
        this.reportTid = reportTimeout.getTimeoutId();
        trigger(schedulePeriodicTimeout, this.timerPort);
    }

    private void cancelReport() {
        trigger(new CancelPeriodicTimeout(this.reportTid), this.timerPort);
        this.reportTid = null;
    }
}
