package se.sics.nstream.torrent;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Kill;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.timer.Timer;
import se.sics.ktoolbox.util.identifiable.overlay.OverlayId;
import se.sics.ktoolbox.util.idextractor.EventOverlayIdExtractor;
import se.sics.ktoolbox.util.idextractor.MsgOverlayIdExtractor;
import se.sics.ktoolbox.util.network.KAddress;
import se.sics.ktoolbox.util.network.ports.One2NChannel;
import se.sics.ktoolbox.util.result.Result;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.DStreamControlPort;
import se.sics.nstream.torrent.TorrentComp;
import se.sics.nstream.torrent.event.StartTorrent;
import se.sics.nstream.torrent.event.StopTorrent;
import se.sics.nstream.torrent.resourceMngr.ResourceMngrComp;
import se.sics.nstream.torrent.resourceMngr.ResourceMngrPort;
import se.sics.nstream.torrent.status.event.TorrentReady;
import se.sics.nstream.torrent.tracking.TorrentStatusPort;
import se.sics.nstream.torrent.transfer.TransferCtrlPort;

/* loaded from: input_file:se/sics/nstream/torrent/TorrentMngrComp.class */
public class TorrentMngrComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TorrentMngrComp.class);
    private String logPrefix;
    private final One2NChannel networkChannel;
    private final One2NChannel transferCtrlChannel;
    private final One2NChannel reportChannel;
    private final KAddress selfAdr;
    private Component resourceMngrComp;
    private final Positive<Timer> timerPort = requires(Timer.class);
    private final Positive<Network> networkPort = requires(Network.class);
    private final Positive<DStreamControlPort> streamControlPort = requires(DStreamControlPort.class);
    private final Positive<DStoragePort> storagePort = requires(DStoragePort.class);
    private final Negative<TorrentMngrPort> torrentMngrPort = provides(TorrentMngrPort.class);
    private final Negative<TransferCtrlPort> transferCtrlPort = provides(TransferCtrlPort.class);
    private final Negative<TorrentStatusPort> torrentStatusPort = provides(TorrentStatusPort.class);
    private Map<OverlayId, Component> torrentComps = new HashMap();
    private Map<UUID, OverlayId> compIdToTorrentId = new HashMap();
    private Map<OverlayId, StartTorrent.Request> pendingStarts = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.torrent.TorrentMngrComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            TorrentMngrComp.LOG.info("{}starting", TorrentMngrComp.this.logPrefix);
            TorrentMngrComp.this.baseSetup();
            TorrentMngrComp.this.baseStart();
        }
    };
    Handler handleTorrentStart = new Handler<StartTorrent.Request>() { // from class: se.sics.nstream.torrent.TorrentMngrComp.2
        @Override // se.sics.kompics.Handler
        public void handle(StartTorrent.Request request) {
            TorrentMngrComp.LOG.info("{}starting torrent:{}", TorrentMngrComp.this.logPrefix, request.torrentId);
            Component create = TorrentMngrComp.this.create(TorrentComp.class, new TorrentComp.Init(TorrentMngrComp.this.selfAdr, request.torrentId, request.partners));
            TorrentMngrComp.this.connect(create.getNegative(Timer.class), TorrentMngrComp.this.timerPort, Channel.TWO_WAY);
            TorrentMngrComp.this.networkChannel.addChannel(request.torrentId, create.getNegative(Network.class));
            TorrentMngrComp.this.connect(create.getNegative(ResourceMngrPort.class), TorrentMngrComp.this.resourceMngrComp.getPositive(ResourceMngrPort.class), Channel.TWO_WAY);
            TorrentMngrComp.this.connect(create.getNegative(DStoragePort.class), TorrentMngrComp.this.storagePort, Channel.TWO_WAY);
            TorrentMngrComp.this.transferCtrlChannel.addChannel(request.torrentId, create.getPositive(TransferCtrlPort.class));
            TorrentMngrComp.this.reportChannel.addChannel(request.torrentId, create.getPositive(TorrentStatusPort.class));
            TorrentMngrComp.this.torrentComps.put(request.torrentId, create);
            TorrentMngrComp.this.compIdToTorrentId.put(create.id(), request.torrentId);
            TorrentMngrComp.this.trigger(Start.event, create.control());
            TorrentMngrComp.this.pendingStarts.put(request.torrentId, request);
        }
    };
    Handler handleTorrentReady = new Handler<TorrentReady>() { // from class: se.sics.nstream.torrent.TorrentMngrComp.3
        @Override // se.sics.kompics.Handler
        public void handle(TorrentReady torrentReady) {
            TorrentMngrComp.LOG.info("{}started torrent:{}", TorrentMngrComp.this.logPrefix, torrentReady.torrentId);
            StartTorrent.Request request = (StartTorrent.Request) TorrentMngrComp.this.pendingStarts.remove(torrentReady.torrentId);
            if (request == null) {
                return;
            }
            TorrentMngrComp.this.answer(request, request.success2(Result.success(true)));
        }
    };
    Handler handleTorrentStop = new Handler<StopTorrent.Request>() { // from class: se.sics.nstream.torrent.TorrentMngrComp.4
        @Override // se.sics.kompics.Handler
        public void handle(StopTorrent.Request request) {
            TorrentMngrComp.LOG.info("{}stopping torrent:{}", TorrentMngrComp.this.logPrefix, request.torrentId);
            Component component = (Component) TorrentMngrComp.this.torrentComps.remove(request.torrentId);
            TorrentMngrComp.this.compIdToTorrentId.remove(component.id());
            TorrentMngrComp.this.disconnect(component.getNegative(Timer.class), TorrentMngrComp.this.timerPort);
            TorrentMngrComp.this.networkChannel.removeChannel(request.torrentId, component.getNegative(Network.class));
            TorrentMngrComp.this.disconnect(component.getNegative(ResourceMngrPort.class), TorrentMngrComp.this.resourceMngrComp.getPositive(ResourceMngrPort.class));
            TorrentMngrComp.this.disconnect(component.getNegative(DStoragePort.class), TorrentMngrComp.this.storagePort);
            TorrentMngrComp.this.transferCtrlChannel.removeChannel(request.torrentId, component.getPositive(TransferCtrlPort.class));
            TorrentMngrComp.this.reportChannel.removeChannel(request.torrentId, component.getPositive(TorrentStatusPort.class));
            TorrentMngrComp.this.trigger(Kill.event, component.control());
            TorrentMngrComp.this.answer(request, request.success());
        }
    };

    /* loaded from: input_file:se/sics/nstream/torrent/TorrentMngrComp$Init.class */
    public static class Init extends se.sics.kompics.Init<TorrentMngrComp> {
        public final KAddress selfAdr;

        public Init(KAddress kAddress) {
            this.selfAdr = kAddress;
        }
    }

    public TorrentMngrComp(Init init) {
        this.selfAdr = init.selfAdr;
        this.logPrefix = "<nid:" + this.selfAdr.getId() + ">";
        LOG.info("{}initiating...", this.logPrefix);
        this.networkChannel = One2NChannel.getChannel(this.logPrefix + "torrent", this.networkPort, new MsgOverlayIdExtractor());
        this.transferCtrlChannel = One2NChannel.getChannel(this.logPrefix + "transferCtrl", this.transferCtrlPort, new EventOverlayIdExtractor());
        this.reportChannel = One2NChannel.getChannel("hopsTorrentMngrReport", this.torrentStatusPort, new EventOverlayIdExtractor());
        subscribe(this.handleStart, this.control);
        subscribe(this.handleTorrentStart, this.torrentMngrPort);
        subscribe(this.handleTorrentReady, this.torrentStatusPort.getPair());
        subscribe(this.handleTorrentStop, this.torrentMngrPort);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void baseSetup() {
        this.resourceMngrComp = create(ResourceMngrComp.class, new ResourceMngrComp.Init(this.selfAdr.getId()));
        connect(this.resourceMngrComp.getNegative(DStreamControlPort.class), this.streamControlPort, Channel.TWO_WAY);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void baseStart() {
        trigger(Start.event, this.resourceMngrComp.control());
    }
}
