package se.sics.nstream.torrent.transfer;

import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifiable;
import se.sics.ktoolbox.util.network.KAddress;
import se.sics.ktoolbox.util.network.KContentMsg;
import se.sics.ktoolbox.util.network.KHeader;
import se.sics.ktoolbox.util.reference.KReference;
import se.sics.ktoolbox.util.reference.KReferenceException;
import se.sics.ledbat.ncore.msg.LedbatMsg;
import se.sics.nstream.ConnId;
import se.sics.nstream.torrent.transfer.msg.CacheHint;
import se.sics.nstream.torrent.transfer.msg.DownloadHash;
import se.sics.nstream.torrent.transfer.msg.DownloadPiece;
import se.sics.nstream.torrent.transfer.upld.event.GetBlocks;
import se.sics.nstream.torrent.transfer.upld.event.UpldConnReport;
import se.sics.nstream.util.BlockDetails;
import se.sics.nstream.util.BlockHelper;
import se.sics.nstream.util.range.RangeKReference;
import se.sics.nutil.tracking.load.NetworkQueueLoadProxy;

/* loaded from: input_file:se/sics/nstream/torrent/transfer/UpldConnComp.class */
public class UpldConnComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) UpldConnComp.class);
    private String logPrefix;
    private static final long REPORT_PERIOD = 1000;
    private final ConnId connId;
    private final KAddress self;
    private final BlockDetails defaultBlock;
    private final boolean withHashes;
    private final NetworkQueueLoadProxy networkQueueLoad;
    private KContentMsg<?, ?, CacheHint.Request> pendingCacheReq;
    private UUID reportTid;
    Negative<UpldConnPort> connPort = provides(UpldConnPort.class);
    Positive<Network> networkPort = requires(Network.class);
    Positive<Timer> timerPort = requires(Timer.class);
    private final Map<Integer, BlockDetails> irregularBlocks = new HashMap();
    private final Map<Integer, KReference<byte[]>> servedBlocks = new HashMap();
    private final Map<Integer, byte[]> servedHashes = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.torrent.transfer.UpldConnComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            UpldConnComp.LOG.info("{}starting", UpldConnComp.this.logPrefix);
            UpldConnComp.this.networkQueueLoad.start();
            UpldConnComp.this.scheduleReport();
        }
    };
    Handler handleReport = new Handler<ReportTimeout>() { // from class: se.sics.nstream.torrent.transfer.UpldConnComp.2
        @Override // se.sics.kompics.Handler
        public void handle(ReportTimeout reportTimeout) {
            double adjustment = UpldConnComp.this.networkQueueLoad.adjustment();
            UpldConnComp.this.trigger(new UpldConnReport(UpldConnComp.this.connId, UpldConnComp.this.networkQueueLoad.queueDelay(), adjustment), UpldConnComp.this.connPort);
        }
    };
    ClassMatchedHandler handleCache = new ClassMatchedHandler<CacheHint.Request, KContentMsg<KAddress, KHeader<KAddress>, CacheHint.Request>>() { // from class: se.sics.nstream.torrent.transfer.UpldConnComp.3
        @Override // se.sics.kompics.MatchedHandler
        public void handle(CacheHint.Request request, KContentMsg<KAddress, KHeader<KAddress>, CacheHint.Request> kContentMsg) {
            UpldConnComp.LOG.trace("{}received:{}", UpldConnComp.this.logPrefix, request);
            if (UpldConnComp.this.pendingCacheReq == null) {
                UpldConnComp.LOG.debug("{}cache:{} req - ts:{} blocks:{}", UpldConnComp.this.logPrefix, request.getId(), Long.valueOf(request.requestCache.lStamp), request.requestCache.blocks);
                UpldConnComp.this.pendingCacheReq = kContentMsg;
                Sets.SetView difference = Sets.difference(request.requestCache.blocks, UpldConnComp.this.servedBlocks.keySet());
                HashSet<Integer> hashSet = new HashSet(Sets.difference(UpldConnComp.this.servedBlocks.keySet(), request.requestCache.blocks));
                if (difference.isEmpty()) {
                    UpldConnComp.this.answerCacheHint();
                } else {
                    UpldConnComp.this.trigger(new GetBlocks.Request(UpldConnComp.this.connId, difference, UpldConnComp.this.withHashes, request.requestCache), UpldConnComp.this.connPort);
                }
                for (Integer num : hashSet) {
                    KReference kReference = (KReference) UpldConnComp.this.servedBlocks.remove(num);
                    UpldConnComp.this.servedHashes.remove(num);
                    UpldConnComp.this.irregularBlocks.remove(num);
                    UpldConnComp.this.silentRelease(kReference);
                }
            }
        }
    };
    Handler handleGetBlocks = new Handler<GetBlocks.Response>() { // from class: se.sics.nstream.torrent.transfer.UpldConnComp.4
        @Override // se.sics.kompics.Handler
        public void handle(GetBlocks.Response response) {
            UpldConnComp.LOG.debug("{}serving blocks:{} hashes:{}", UpldConnComp.this.logPrefix, response.blocks.keySet(), response.hashes.keySet());
            UpldConnComp.this.servedBlocks.putAll(response.blocks);
            UpldConnComp.this.servedHashes.putAll(response.hashes);
            UpldConnComp.this.irregularBlocks.putAll(response.irregularBlocks);
            UpldConnComp.this.answerCacheHint();
        }
    };
    ClassMatchedHandler handleLedbat = new ClassMatchedHandler<LedbatMsg.Request, KContentMsg<KAddress, KHeader<KAddress>, LedbatMsg.Request>>() { // from class: se.sics.nstream.torrent.transfer.UpldConnComp.5
        @Override // se.sics.kompics.MatchedHandler
        public void handle(LedbatMsg.Request request, KContentMsg<KAddress, KHeader<KAddress>, LedbatMsg.Request> kContentMsg) {
            Identifiable extractValue = request.extractValue();
            if (extractValue instanceof DownloadPiece.Request) {
                UpldConnComp.this.handlePiece(kContentMsg, request);
            } else if (extractValue instanceof DownloadHash.Request) {
                UpldConnComp.this.handleHashes(kContentMsg, request);
            } else {
                UpldConnComp.LOG.error("{}received:{}", UpldConnComp.this.logPrefix, request);
                throw new RuntimeException("ups");
            }
        }
    };

    /* loaded from: input_file:se/sics/nstream/torrent/transfer/UpldConnComp$Init.class */
    public static class Init extends se.sics.kompics.Init<UpldConnComp> {
        public final ConnId connId;
        public final KAddress self;
        public final BlockDetails defaultBlock;
        public final boolean withHashes;

        public Init(ConnId connId, KAddress kAddress, BlockDetails blockDetails, boolean z) {
            this.connId = connId;
            this.self = kAddress;
            this.defaultBlock = blockDetails;
            this.withHashes = z;
        }
    }

    /* loaded from: input_file:se/sics/nstream/torrent/transfer/UpldConnComp$ReportTimeout.class */
    public static class ReportTimeout extends Timeout {
        public ReportTimeout(SchedulePeriodicTimeout schedulePeriodicTimeout) {
            super(schedulePeriodicTimeout);
        }
    }

    public UpldConnComp(Init init) {
        this.connId = init.connId;
        this.self = init.self;
        this.defaultBlock = init.defaultBlock;
        this.withHashes = init.withHashes;
        this.logPrefix = "<" + this.connId.toString() + ">";
        this.networkQueueLoad = NetworkQueueLoadProxy.instance("load_upld" + this.logPrefix, this.proxy, config(), Optional.fromNullable((String) null));
        subscribe(this.handleStart, this.control);
        subscribe(this.handleReport, this.timerPort);
        subscribe(this.handleCache, this.networkPort);
        subscribe(this.handleGetBlocks, this.connPort);
        subscribe(this.handleLedbat, this.networkPort);
    }

    @Override // se.sics.kompics.ComponentDefinition
    public void tearDown() {
        LOG.info("{}tear down", this.logPrefix);
        this.networkQueueLoad.tearDown();
        cancelReport();
        Iterator<KReference<byte[]>> it = this.servedBlocks.values().iterator();
        while (it.hasNext()) {
            silentRelease(it.next());
        }
        this.servedBlocks.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void answerCacheHint() {
        answerMsg(this.pendingCacheReq, this.pendingCacheReq.getContent().success());
        this.pendingCacheReq = null;
    }

    public void handlePiece(KContentMsg kContentMsg, LedbatMsg.Request<DownloadPiece.Request> request) {
        int intValue = request.extractValue().piece.getValue0().intValue();
        BlockDetails blockDetails = this.irregularBlocks.containsKey(Integer.valueOf(intValue)) ? this.irregularBlocks.get(Integer.valueOf(intValue)) : this.defaultBlock;
        KReference<byte[]> kReference = this.servedBlocks.get(Integer.valueOf(intValue));
        if (kReference == null) {
            answerMsg(kContentMsg, request.answer(request.extractValue().badRequest()));
        } else {
            answerMsg(kContentMsg, request.answer(request.extractValue().success(RangeKReference.createInstance(kReference, BlockHelper.getBlockPos(intValue, this.defaultBlock), BlockHelper.getPieceRange(request.extractValue().piece, blockDetails, this.defaultBlock)))));
        }
    }

    public void handleHashes(KContentMsg kContentMsg, LedbatMsg.Request<DownloadHash.Request> request) {
        TreeMap treeMap = new TreeMap();
        for (Integer num : request.extractValue().hashes) {
            byte[] bArr = this.servedHashes.get(num);
            if (bArr == null) {
                LOG.warn("{}no hash for:{} - not serving incomplete", this.logPrefix, num);
                LOG.warn("{}no hash - serving blocks:{}", this.logPrefix, this.servedBlocks.keySet());
                LOG.warn("{}no hash - serving hashes:{}", this.logPrefix, this.servedHashes.keySet());
                answerMsg(kContentMsg, request.answer(request.extractValue().badRequest()));
                return;
            }
            treeMap.put(num, bArr);
        }
        answerMsg(kContentMsg, request.answer(request.extractValue().success(treeMap)));
    }

    private void answerMsg(KContentMsg kContentMsg, Identifiable identifiable) {
        LOG.trace("{}answering with:{}", this.logPrefix, identifiable);
        trigger(kContentMsg.answer(identifiable), this.networkPort);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void silentRelease(KReference<byte[]> kReference) {
        try {
            kReference.release();
        } catch (KReferenceException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReport() {
        SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(1000L, 1000L);
        ReportTimeout reportTimeout = new ReportTimeout(schedulePeriodicTimeout);
        schedulePeriodicTimeout.setTimeoutEvent(reportTimeout);
        this.reportTid = reportTimeout.getTimeoutId();
        trigger(schedulePeriodicTimeout, this.timerPort);
    }

    private void cancelReport() {
        trigger(new CancelPeriodicTimeout(this.reportTid), this.timerPort);
        this.reportTid = null;
    }
}
