package se.sics.nstream.torrent.resourceMngr;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.identifiable.overlay.OverlayId;
import se.sics.nstream.FileId;
import se.sics.nstream.StreamId;
import se.sics.nstream.storage.durable.DStreamControlPort;
import se.sics.nstream.storage.durable.events.DStreamConnect;
import se.sics.nstream.storage.durable.util.FileExtendedDetails;
import se.sics.nstream.storage.durable.util.MyStream;
import se.sics.nstream.torrent.resourceMngr.PrepareResources;

/* loaded from: input_file:se/sics/nstream/torrent/resourceMngr/ResourceMngrComp.class */
public class ResourceMngrComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResourceMngrComp.class);
    private final String logPrefix;
    private final Identifier self;
    private Negative<ResourceMngrPort> resourceMngrPort = provides(ResourceMngrPort.class);
    private Positive<DStreamControlPort> streamControlPort = requires(DStreamControlPort.class);
    private final Map<OverlayId, Op> preparingResources = new HashMap();
    private final Map<Identifier, Op> eventsToOp = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.torrent.resourceMngr.ResourceMngrComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            ResourceMngrComp.LOG.info("{}starting...", ResourceMngrComp.this.logPrefix);
        }
    };
    Handler handlePrepare = new Handler<PrepareResources.Request>() { // from class: se.sics.nstream.torrent.resourceMngr.ResourceMngrComp.2
        @Override // se.sics.kompics.Handler
        public void handle(PrepareResources.Request request) {
            ResourceMngrComp.LOG.info("{}preparing resources for torrent:{}", ResourceMngrComp.this.logPrefix, request.torrentId);
            Op op = new Op(request);
            ResourceMngrComp.this.preparingResources.put(op.getId(), op);
            for (Map.Entry<FileId, FileExtendedDetails> entry : request.torrent.extended.entrySet()) {
                ResourceMngrComp.this.prepareResource(op, entry.getKey(), entry.getValue().getMainStream());
                Iterator<Pair<StreamId, MyStream>> it = entry.getValue().getSecondaryStreams().iterator();
                while (it.hasNext()) {
                    ResourceMngrComp.this.prepareResource(op, entry.getKey(), it.next());
                }
            }
        }
    };
    Handler handlePrepareSuccess = new Handler<DStreamConnect.Success>() { // from class: se.sics.nstream.torrent.resourceMngr.ResourceMngrComp.3
        @Override // se.sics.kompics.Handler
        public void handle(DStreamConnect.Success success) {
            ResourceMngrComp.LOG.info("{}prepared:{}", ResourceMngrComp.this.logPrefix, success.req.stream.getValue0());
            Op op = (Op) ResourceMngrComp.this.eventsToOp.remove(success.getId());
            if (op == null) {
                ResourceMngrComp.LOG.warn("{}weird - investigate", ResourceMngrComp.this.logPrefix);
                return;
            }
            op.prepared(success.getId(), success.req.stream.getValue0(), success.streamPos);
            if (op.ready()) {
                ResourceMngrComp.LOG.info("{}prepared all resources for:{}", ResourceMngrComp.this.logPrefix, op.getId());
                ResourceMngrComp.this.preparingResources.remove(op.getId());
                ResourceMngrComp.this.answer(op.req, op.req.success(op.streamsInfo));
            }
        }
    };

    /* loaded from: input_file:se/sics/nstream/torrent/resourceMngr/ResourceMngrComp$Init.class */
    public static class Init extends se.sics.kompics.Init<ResourceMngrComp> {
        public Identifier self;

        public Init(Identifier identifier) {
            this.self = identifier;
        }
    }

    /* loaded from: input_file:se/sics/nstream/torrent/resourceMngr/ResourceMngrComp$Op.class */
    public static class Op {
        public final PrepareResources.Request req;
        public final Set<Identifier> preparingResources = new HashSet();
        public final Map<StreamId, Long> streamsInfo = new HashMap();

        public Op(PrepareResources.Request request) {
            this.req = request;
        }

        public OverlayId getId() {
            return this.req.torrentId;
        }

        public void preparing(Identifier identifier) {
            this.preparingResources.add(identifier);
        }

        public void prepared(Identifier identifier, StreamId streamId, long j) {
            this.preparingResources.remove(identifier);
            this.streamsInfo.put(streamId, Long.valueOf(j));
        }

        public boolean ready() {
            return this.preparingResources.isEmpty();
        }
    }

    public ResourceMngrComp(Init init) {
        this.self = init.self;
        this.logPrefix = "<nid:" + this.self + ">";
        LOG.info("{}initiating...", this.logPrefix);
        subscribe(this.handleStart, this.control);
        subscribe(this.handlePrepare, this.resourceMngrPort);
        subscribe(this.handlePrepareSuccess, this.streamControlPort);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void prepareResource(Op op, FileId fileId, Pair<StreamId, MyStream> pair) {
        LOG.info("{}preparing file:{} resource:{} endpoint:{}", this.logPrefix, fileId, pair.getValue1().resource.getSinkName(), pair.getValue1().endpoint.getEndpointName());
        DStreamConnect.Request request = new DStreamConnect.Request(pair);
        op.preparing(request.getId());
        this.eventsToOp.put(request.getId(), op);
        trigger(request, this.streamControlPort);
    }
}
