package se.sics.ktoolbox.netmngr.chunk;

import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ClassMatchedHandler;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.netty.serialization.Serializers;
import se.sics.kompics.timer.CancelTimeout;
import se.sics.kompics.timer.ScheduleTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifiable;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.netmngr.chunk.util.ChunkPrefixHelper;
import se.sics.ktoolbox.netmngr.chunk.util.CompleteChunkTracker;
import se.sics.ktoolbox.netmngr.chunk.util.IncompleteChunkTracker;
import se.sics.ktoolbox.util.config.impl.SystemKCWrapper;
import se.sics.ktoolbox.util.identifiable.BasicIdentifiers;
import se.sics.ktoolbox.util.network.KAddress;
import se.sics.ktoolbox.util.network.KContentMsg;
import se.sics.ktoolbox.util.network.KHeader;
import se.sics.ktoolbox.util.network.basic.BasicContentMsg;
import se.sics.ktoolbox.util.network.other.Chunkable;

/* loaded from: input_file:se/sics/ktoolbox/netmngr/chunk/ChunkMngrComp.class */
public class ChunkMngrComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ChunkMngrComp.class);
    private final Positive<Network> requiredNetwork = requires(Network.class);
    private final Negative<Network> providedNetwork = provides(Network.class);
    private final Positive<Timer> timer = requires(Timer.class);
    private final Map<Identifier, Pair<CompleteChunkTracker, UUID>> outgoingChunks = new HashMap();
    private final Map<Identifier, Pair<IncompleteChunkTracker, UUID>> incomingChunks = new HashMap();
    private Handler handleStart = new Handler<Start>() { // from class: se.sics.ktoolbox.netmngr.chunk.ChunkMngrComp.1
        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            ChunkMngrComp.LOG.info("{}starting...", ChunkMngrComp.this.logPrefix);
        }
    };
    ClassMatchedHandler handleOutgoing = new ClassMatchedHandler<Chunkable, BasicContentMsg<?, ?, Chunkable>>() { // from class: se.sics.ktoolbox.netmngr.chunk.ChunkMngrComp.2
        /* JADX WARN: Type inference failed for: r2v7, types: [se.sics.ktoolbox.util.network.KHeader] */
        @Override // se.sics.kompics.MatchedHandler
        public void handle(Chunkable chunkable, BasicContentMsg<?, ?, Chunkable> basicContentMsg) {
            ChunkMngrComp.LOG.trace("{}received outgoing:{}", ChunkMngrComp.this.logPrefix, basicContentMsg);
            ByteBuf buffer = Unpooled.buffer();
            ByteBuf buffer2 = Unpooled.buffer();
            Serializers.toBinary(chunkable, buffer);
            Serializers.toBinary(basicContentMsg.getHeader(), buffer2);
            int readableBytes = buffer2.readableBytes();
            ChunkMngrComp.this.chunkMngrConfig.getClass();
            int chunkPrefixSize = (1000 - readableBytes) - ChunkPrefixHelper.getChunkPrefixSize();
            if (chunkPrefixSize <= 0) {
                ChunkMngrComp.LOG.error("{}chunk manager is badly configured", ChunkMngrComp.this.logPrefix);
                throw new RuntimeException("chunk manager is badly configured");
            }
            if (buffer.readableBytes() < chunkPrefixSize) {
                ChunkMngrComp.LOG.trace("{}forwarding to UDP - small message:{}", ChunkMngrComp.this.logPrefix, basicContentMsg);
                ChunkMngrComp.this.trigger(basicContentMsg, ChunkMngrComp.this.requiredNetwork);
                return;
            }
            Identifier id = chunkable instanceof Identifiable ? ((Identifiable) chunkable).getId() : BasicIdentifiers.eventId();
            CompleteChunkTracker completeChunkTracker = new CompleteChunkTracker(id, buffer, chunkPrefixSize);
            for (Chunk chunk : completeChunkTracker.chunks.values()) {
                BasicContentMsg basicContentMsg2 = new BasicContentMsg(basicContentMsg.getHeader(), chunk);
                ChunkMngrComp.LOG.trace("{}sending chunk nr:{}", ChunkMngrComp.this.logPrefix, Integer.valueOf(chunk.chunkNr));
                ChunkMngrComp.this.trigger(basicContentMsg2, ChunkMngrComp.this.requiredNetwork);
            }
            ChunkMngrComp.this.outgoingChunks.put(id, Pair.with(completeChunkTracker, ChunkMngrComp.this.scheduleCleanupTimeout(id)));
        }
    };
    ClassMatchedHandler handleIncoming = new ClassMatchedHandler<Chunk, KContentMsg<KAddress, KHeader<KAddress>, Chunk>>() { // from class: se.sics.ktoolbox.netmngr.chunk.ChunkMngrComp.3
        @Override // se.sics.kompics.MatchedHandler
        public void handle(Chunk chunk, KContentMsg<KAddress, KHeader<KAddress>, Chunk> kContentMsg) {
            ChunkMngrComp.LOG.trace("{}received incoming:{}", ChunkMngrComp.this.logPrefix, kContentMsg);
            Pair pair = (Pair) ChunkMngrComp.this.incomingChunks.get(chunk.originId);
            if (pair == null) {
                pair = Pair.with(new IncompleteChunkTracker(chunk.lastChunk), ChunkMngrComp.this.scheduleCleanupTimeout(chunk.originId));
                ChunkMngrComp.this.incomingChunks.put(chunk.originId, pair);
            }
            ((IncompleteChunkTracker) pair.getValue0()).add(chunk);
            if (((IncompleteChunkTracker) pair.getValue0()).isComplete()) {
                ChunkMngrComp.this.cancelCleanupTimeout((UUID) ((Pair) ChunkMngrComp.this.incomingChunks.get(chunk.originId)).getValue1());
                ChunkMngrComp.this.incomingChunks.remove(chunk.originId);
                BasicContentMsg basicContentMsg = new BasicContentMsg((KHeader) kContentMsg.getHeader(), Serializers.fromBinary(Unpooled.wrappedBuffer(((IncompleteChunkTracker) pair.getValue0()).getMsg()), (Optional<Object>) Optional.absent()));
                ChunkMngrComp.LOG.debug("{}rebuilt chunked message:{}", ChunkMngrComp.this.logPrefix, basicContentMsg);
                ChunkMngrComp.this.trigger(basicContentMsg, ChunkMngrComp.this.providedNetwork);
            }
        }
    };
    final Handler handleCleanupTimeout = new Handler<CleanupTrackerTimeout>() { // from class: se.sics.ktoolbox.netmngr.chunk.ChunkMngrComp.4
        @Override // se.sics.kompics.Handler
        public void handle(CleanupTrackerTimeout cleanupTrackerTimeout) {
            if (ChunkMngrComp.this.incomingChunks.remove(cleanupTrackerTimeout.originId) != null) {
                ChunkMngrComp.LOG.debug("{}incoming chunked message:{} timed out", ChunkMngrComp.this.logPrefix, cleanupTrackerTimeout.originId);
            }
            if (ChunkMngrComp.this.outgoingChunks.remove(cleanupTrackerTimeout.originId) != null) {
                ChunkMngrComp.LOG.debug("{}outgoing chunked message:{} timed out", ChunkMngrComp.this.logPrefix, cleanupTrackerTimeout.originId);
            }
            ChunkMngrComp.LOG.debug("{}late timeout for chunk message:{}", ChunkMngrComp.this.logPrefix, cleanupTrackerTimeout.originId);
        }
    };
    private final SystemKCWrapper systemConfig = new SystemKCWrapper(config());
    private final ChunkMngrKCWrapper chunkMngrConfig = new ChunkMngrKCWrapper(config());
    private final String logPrefix = "<nid:" + this.systemConfig.id + "> ";

    /* loaded from: input_file:se/sics/ktoolbox/netmngr/chunk/ChunkMngrComp$CleanupTrackerTimeout.class */
    public static class CleanupTrackerTimeout extends Timeout {
        public final Identifier originId;

        public CleanupTrackerTimeout(ScheduleTimeout scheduleTimeout, Identifier identifier) {
            super(scheduleTimeout);
            this.originId = identifier;
        }

        public String toString() {
            return "ChunkMngr_CleanupTracker<" + getTimeoutId() + ">";
        }
    }

    /* loaded from: input_file:se/sics/ktoolbox/netmngr/chunk/ChunkMngrComp$Init.class */
    public static class Init extends se.sics.kompics.Init<ChunkMngrComp> {
    }

    public ChunkMngrComp(Init init) {
        LOG.info("{}initiating...", this.logPrefix);
        subscribe(this.handleStart, this.control);
        subscribe(this.handleOutgoing, this.providedNetwork);
        subscribe(this.handleOutgoing, this.requiredNetwork);
        subscribe(this.handleIncoming, this.requiredNetwork);
        subscribe(this.handleCleanupTimeout, this.timer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public UUID scheduleCleanupTimeout(Identifier identifier) {
        this.chunkMngrConfig.getClass();
        ScheduleTimeout scheduleTimeout = new ScheduleTimeout(30000L);
        CleanupTrackerTimeout cleanupTrackerTimeout = new CleanupTrackerTimeout(scheduleTimeout, identifier);
        scheduleTimeout.setTimeoutEvent(cleanupTrackerTimeout);
        trigger(scheduleTimeout, this.timer);
        return cleanupTrackerTimeout.getTimeoutId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelCleanupTimeout(UUID uuid) {
        trigger(new CancelTimeout(uuid), this.timer);
    }
}
