package se.sics.nstream.storage.durable;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Kill;
import se.sics.kompics.Killed;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.network.ports.One2NChannel;
import se.sics.nstream.StreamId;
import se.sics.nstream.storage.durable.events.DStreamConnect;
import se.sics.nstream.storage.durable.events.DStreamDisconnect;

/* loaded from: input_file:se/sics/nstream/storage/durable/DStreamMngrComp.class */
public class DStreamMngrComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DStreamMngrComp.class);
    private final String logPrefix;
    private final One2NChannel storageChannel;
    private final Identifier self;
    private final DurableStorageProvider storageProvider;
    private final Positive<Timer> timerPort = requires(Timer.class);
    private final Negative<DStoragePort> storagePort = provides(DStoragePort.class);
    private final Negative<DStreamControlPort> streamControlPort = provides(DStreamControlPort.class);
    private final Map<UUID, StreamId> compIdToStreamId = new HashMap();
    private final Map<StreamId, Component> streamStorage = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.storage.durable.DStreamMngrComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            DStreamMngrComp.LOG.info("{}starting", DStreamMngrComp.this.logPrefix);
        }
    };
    Handler handleKilled = new Handler<Killed>() { // from class: se.sics.nstream.storage.durable.DStreamMngrComp.2
        @Override // se.sics.kompics.Handler
        public void handle(Killed killed) {
            DStreamMngrComp.LOG.info("{}killed stream comp:{}", DStreamMngrComp.this.logPrefix, killed.component.id());
        }
    };
    Handler handleConnect = new Handler<DStreamConnect.Request>() { // from class: se.sics.nstream.storage.durable.DStreamMngrComp.3
        @Override // se.sics.kompics.Handler
        public void handle(DStreamConnect.Request request) {
            Pair initiate = DStreamMngrComp.this.storageProvider.initiate(request.stream.getValue1().resource);
            DStreamMngrComp.LOG.info("{}connecting stream:{} pos:{}", DStreamMngrComp.this.logPrefix, request.stream.getValue0(), initiate.getValue1());
            Component create = DStreamMngrComp.this.create(DStreamMngrComp.this.storageProvider.getStorageDefinition(), (se.sics.kompics.Init) initiate.getValue0());
            DStreamMngrComp.this.storageChannel.addChannel(request.stream.getValue0(), create.getPositive(DStoragePort.class));
            DStreamMngrComp.this.connect(DStreamMngrComp.this.timerPort, create.getNegative(Timer.class), Channel.TWO_WAY);
            DStreamMngrComp.this.compIdToStreamId.put(create.id(), request.stream.getValue0());
            DStreamMngrComp.this.streamStorage.put(request.stream.getValue0(), create);
            DStreamMngrComp.this.trigger(Start.event, create.control());
            DStreamMngrComp.this.answer(request, request.success(((Long) initiate.getValue1()).longValue()));
        }
    };
    Handler handleDisconnect = new Handler<DStreamDisconnect.Request>() { // from class: se.sics.nstream.storage.durable.DStreamMngrComp.4
        @Override // se.sics.kompics.Handler
        public void handle(DStreamDisconnect.Request request) {
            DStreamMngrComp.LOG.info("{}disconnecting stream:{}", DStreamMngrComp.this.logPrefix, request.streamId);
            Component component = (Component) DStreamMngrComp.this.streamStorage.remove(request.streamId);
            if (component == null) {
                throw new RuntimeException("TODO Alex - probbably mismatch between things keeping track of resources");
            }
            DStreamMngrComp.this.compIdToStreamId.remove(component.id());
            DStreamMngrComp.this.storageChannel.removeChannel(request.streamId, component.getPositive(DStoragePort.class));
            DStreamMngrComp.this.disconnect(DStreamMngrComp.this.timerPort, component.getNegative(Timer.class));
            DStreamMngrComp.this.trigger(Kill.event, component.control());
            DStreamMngrComp.this.answer(request, request.success());
        }
    };

    /* loaded from: input_file:se/sics/nstream/storage/durable/DStreamMngrComp$Init.class */
    public static class Init extends se.sics.kompics.Init<DStreamMngrComp> {
        public final Identifier self;
        public final DurableStorageProvider storageProvider;

        public Init(Identifier identifier, DurableStorageProvider durableStorageProvider) {
            this.self = identifier;
            this.storageProvider = durableStorageProvider;
        }
    }

    public DStreamMngrComp(Init init) {
        this.self = init.self;
        this.logPrefix = "<nid:" + this.self + ">";
        LOG.info("{}initiating...", this.logPrefix);
        this.storageProvider = init.storageProvider;
        this.storageChannel = One2NChannel.getChannel(this.logPrefix + ":dstream_storage", this.storagePort, new DStreamIdExtractor());
        subscribe(this.handleStart, this.control);
        subscribe(this.handleKilled, this.control);
        subscribe(this.handleConnect, this.streamControlPort);
        subscribe(this.handleDisconnect, this.streamControlPort);
    }
}
