package se.sics.nstream.storage.durable;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.Channel;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Fault;
import se.sics.kompics.Handler;
import se.sics.kompics.Kill;
import se.sics.kompics.Killed;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.identifiable.overlay.OverlayId;
import se.sics.ktoolbox.util.network.ports.One2NChannel;
import se.sics.ktoolbox.util.result.Result;
import se.sics.nstream.storage.durable.DStreamMngrComp;
import se.sics.nstream.storage.durable.events.DEndpoint;

/* loaded from: input_file:se/sics/nstream/storage/durable/DStorageMngrComp.class */
public class DStorageMngrComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DStorageMngrComp.class);
    private final String logPrefix;
    private final One2NChannel streamControlChannel;
    private final One2NChannel storageChannel;
    private final Identifier self;
    private final Positive<Timer> timerPort = requires(Timer.class);
    private final Negative<DStoragePort> storagePort = provides(DStoragePort.class);
    private final Negative<DStreamControlPort> streamControlPort = provides(DStreamControlPort.class);
    private final Negative<DEndpointCtrlPort> endpointControlPort = provides(DEndpointCtrlPort.class);
    private final Map<UUID, Identifier> compIdToEndpointId = new HashMap();
    private final Map<Identifier, Map<OverlayId, DEndpoint.Connect>> clients = new HashMap();
    private final Map<Identifier, Component> storageEndpoints = new HashMap();
    private final Map<Identifier, Throwable> reportedFaulty = new HashMap();
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.storage.durable.DStorageMngrComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            DStorageMngrComp.LOG.info("{}starting", DStorageMngrComp.this.logPrefix);
        }
    };
    Handler handleKilled = new Handler<Killed>() { // from class: se.sics.nstream.storage.durable.DStorageMngrComp.2
        @Override // se.sics.kompics.Handler
        public void handle(Killed killed) {
            DStorageMngrComp.LOG.info("{}killed endpoint component:{}", DStorageMngrComp.this.logPrefix, killed.component.id());
        }
    };
    Handler handleConnect = new Handler<DEndpoint.Connect>() { // from class: se.sics.nstream.storage.durable.DStorageMngrComp.3
        @Override // se.sics.kompics.Handler
        public void handle(DEndpoint.Connect connect) {
            Map map;
            DStorageMngrComp.LOG.info("{}connecting endpoint:{}", DStorageMngrComp.this.logPrefix, connect.endpointId);
            if (DStorageMngrComp.this.reportedFaulty.containsKey(connect.endpointId)) {
                DStorageMngrComp.this.answer(connect, connect.fail2(Result.internalFailure((Exception) DStorageMngrComp.this.reportedFaulty.get(connect.endpointId))));
            }
            if (DStorageMngrComp.this.storageEndpoints.containsKey(connect.endpointId)) {
                map = (Map) DStorageMngrComp.this.clients.get(connect.endpointId);
            } else {
                Component create = DStorageMngrComp.this.create(DStreamMngrComp.class, new DStreamMngrComp.Init(DStorageMngrComp.this.self, connect.endpointProvider));
                DStorageMngrComp.this.streamControlChannel.addChannel(connect.endpointId, create.getPositive(DStreamControlPort.class));
                DStorageMngrComp.this.storageChannel.addChannel(connect.endpointId, create.getPositive(DStoragePort.class));
                DStorageMngrComp.this.connect(DStorageMngrComp.this.timerPort, create.getNegative(Timer.class), Channel.TWO_WAY);
                DStorageMngrComp.this.compIdToEndpointId.put(create.id(), connect.endpointId);
                DStorageMngrComp.this.storageEndpoints.put(connect.endpointId, create);
                DStorageMngrComp.this.trigger(Start.event, create.control());
                map = new HashMap();
                DStorageMngrComp.this.clients.put(connect.endpointId, map);
            }
            map.put(connect.torrentId, connect);
            DStorageMngrComp.this.answer(connect, connect.success2(Result.success(true)));
        }
    };
    Handler handleDisconnect = new Handler<DEndpoint.Disconnect>() { // from class: se.sics.nstream.storage.durable.DStorageMngrComp.4
        @Override // se.sics.kompics.Handler
        public void handle(DEndpoint.Disconnect disconnect) {
            DStorageMngrComp.LOG.info("{}disconnecting endpoint:{}", DStorageMngrComp.this.logPrefix, disconnect.endpointId);
            Map map = (Map) DStorageMngrComp.this.clients.get(disconnect.endpointId);
            if (map == null) {
                DStorageMngrComp.this.answer(disconnect, disconnect.success());
                return;
            }
            map.remove(disconnect.torrentId);
            Component component = (Component) DStorageMngrComp.this.storageEndpoints.get(disconnect.endpointId);
            if (component == null) {
                throw new RuntimeException("weird behaviour - someone is not keeping track of things right");
            }
            if (map.isEmpty()) {
                DStorageMngrComp.this.streamControlChannel.removeChannel(disconnect.endpointId, component.getPositive(DStreamControlPort.class));
                DStorageMngrComp.this.storageChannel.removeChannel(disconnect.endpointId, component.getPositive(DStoragePort.class));
                DStorageMngrComp.this.disconnect(DStorageMngrComp.this.timerPort, component.getNegative(Timer.class));
                DStorageMngrComp.this.clients.remove(disconnect.endpointId);
                DStorageMngrComp.this.storageEndpoints.remove(disconnect.endpointId);
                DStorageMngrComp.this.compIdToEndpointId.remove(component.id());
                DStorageMngrComp.this.trigger(Kill.event, component.control());
            }
            DStorageMngrComp.this.answer(disconnect, disconnect.success());
        }
    };

    /* loaded from: input_file:se/sics/nstream/storage/durable/DStorageMngrComp$Init.class */
    public static class Init extends se.sics.kompics.Init<DStorageMngrComp> {
        public final Identifier self;

        public Init(Identifier identifier) {
            this.self = identifier;
        }
    }

    public DStorageMngrComp(Init init) {
        this.self = init.self;
        this.logPrefix = "<nid:" + this.self + ">";
        LOG.info("{}initiating...", this.logPrefix);
        this.streamControlChannel = One2NChannel.getChannel(this.logPrefix + ":dstream_control", this.streamControlPort, new DEndpointIdExtractor());
        this.storageChannel = One2NChannel.getChannel(this.logPrefix + ":dstorage", this.storagePort, new DEndpointIdExtractor());
        subscribe(this.handleStart, this.control);
        subscribe(this.handleKilled, this.control);
        subscribe(this.handleConnect, this.endpointControlPort);
        subscribe(this.handleDisconnect, this.endpointControlPort);
    }

    @Override // se.sics.kompics.ComponentDefinition
    public Fault.ResolveAction handleFault(Fault fault) {
        UUID id = fault.getSource().id();
        Identifier identifier = this.compIdToEndpointId.get(id);
        Throwable cause = fault.getCause();
        this.reportedFaulty.put(identifier, cause);
        for (DEndpoint.Connect connect : this.clients.get(identifier).values()) {
            answer(connect, connect.fail2(Result.internalFailure((Exception) cause)));
        }
        Component component = this.storageEndpoints.get(identifier);
        if (component != null) {
            for (DEndpoint.Connect connect2 : this.clients.get(identifier).values()) {
                this.streamControlChannel.removeChannel(identifier, component.getPositive(DStreamControlPort.class));
                this.storageChannel.removeChannel(identifier, component.getPositive(DStoragePort.class));
            }
            this.clients.remove(identifier);
            this.storageEndpoints.remove(identifier);
            this.compIdToEndpointId.remove(id);
            trigger(Kill.event, component.control());
        }
        return Fault.ResolveAction.RESOLVED;
    }
}
