package se.sics.nstream.storage.buffer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentProxy;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.config.Config;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.reference.KReference;
import se.sics.ktoolbox.util.reference.KReferenceException;
import se.sics.ktoolbox.util.result.DelayedExceptionSyncHandler;
import se.sics.ktoolbox.util.result.Result;
import se.sics.nstream.StreamId;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.events.DStorageWrite;
import se.sics.nstream.storage.durable.util.MyStream;
import se.sics.nstream.util.actuator.ComponentLoadTracking;
import se.sics.nstream.util.range.KBlock;
import se.sics.nstream.util.result.WriteCallback;

/* loaded from: input_file:se/sics/nstream/storage/buffer/SimpleAppendKBuffer.class */
public class SimpleAppendKBuffer implements KBuffer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SimpleAppendKBuffer.class);
    private final KBufferConfig bufferConfig;
    private final Pair<StreamId, MyStream> stream;
    private final Positive<DStoragePort> writePort;
    private final ComponentProxy proxy;
    private final DelayedExceptionSyncHandler syncExHandling;
    private final ComponentLoadTracking loadTracker;
    private long appendPos;
    private String logPrefix = "";
    private final Map<Long, Pair<KReference<byte[]>, WriteCallback>> buffer = new HashMap();
    private final Set<Identifier> pendingWriteReqs = new HashSet();
    Handler handleWriteResp = new Handler<DStorageWrite.Response>() { // from class: se.sics.nstream.storage.buffer.SimpleAppendKBuffer.1
        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Response response) {
            if (SimpleAppendKBuffer.this.pendingWriteReqs.remove(response.getId())) {
                SimpleAppendKBuffer.LOG.debug("{}received:{}", SimpleAppendKBuffer.this.logPrefix, response);
                if (!response.result.isSuccess()) {
                    SimpleAppendKBuffer.this.fail(response.result);
                    return;
                }
                SimpleAppendKBuffer.access$308(SimpleAppendKBuffer.this);
                Pair pair = (Pair) SimpleAppendKBuffer.this.buffer.remove(Long.valueOf(response.req.pos));
                if (pair == null) {
                    SimpleAppendKBuffer.LOG.error("{}pos:{}", SimpleAppendKBuffer.this.logPrefix, Long.valueOf(response.req.pos));
                    SimpleAppendKBuffer.LOG.error("{}buf size:{}", SimpleAppendKBuffer.this.logPrefix, Integer.valueOf(SimpleAppendKBuffer.this.buffer.size()));
                    throw new RuntimeException("error");
                }
                SimpleAppendKBuffer.this.loadTracker.setBufferSize(SimpleAppendKBuffer.this.stream, SimpleAppendKBuffer.this.buffer.size());
                try {
                    ((KReference) pair.getValue0()).release();
                } catch (KReferenceException e) {
                    SimpleAppendKBuffer.this.fail(Result.internalFailure(e));
                }
                ((WriteCallback) pair.getValue1()).success(Result.success(new WriteResult(SimpleAppendKBuffer.this.stream, response.req.pos, response.req.value.length)));
                SimpleAppendKBuffer.this.addNewTasks();
            }
        }
    };
    private int blockPos = 0;
    private int answeredBlockPos = 0;

    public SimpleAppendKBuffer(Config config, ComponentProxy componentProxy, DelayedExceptionSyncHandler delayedExceptionSyncHandler, ComponentLoadTracking componentLoadTracking, Pair<StreamId, MyStream> pair, long j) {
        this.bufferConfig = new KBufferConfig(config);
        this.stream = pair;
        this.proxy = componentProxy;
        this.syncExHandling = delayedExceptionSyncHandler;
        this.writePort = componentProxy.getNegative(DStoragePort.class).getPair();
        this.loadTracker = componentLoadTracking;
        this.appendPos = j;
        componentProxy.subscribe(this.handleWriteResp, this.writePort);
    }

    @Override // se.sics.nstream.util.StreamControl
    public void start() {
    }

    @Override // se.sics.nstream.util.StreamControl
    public boolean isIdle() {
        return this.buffer.isEmpty();
    }

    @Override // se.sics.nstream.util.StreamControl
    public void close() {
        this.proxy.unsubscribe(this.handleWriteResp, this.writePort);
        try {
            clean();
        } catch (KReferenceException e) {
            this.syncExHandling.fail(Result.internalFailure(e));
        }
    }

    /* renamed from: write, reason: avoid collision after fix types in other method */
    public void write2(KBlock kBlock, KReference<byte[]> kReference, WriteCallback writeCallback) {
        if (!kReference.retain()) {
            fail(Result.internalFailure(new IllegalStateException("buffer can't retain ref")));
            return;
        }
        this.buffer.put(Long.valueOf(kBlock.lowerAbsEndpoint()), Pair.with(kReference, writeCallback));
        this.loadTracker.setBufferSize(this.stream, this.buffer.size());
        if (kBlock.lowerAbsEndpoint() == this.appendPos) {
            addNewTasks();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addNewTasks() {
        while (true) {
            Pair<KReference<byte[]>, WriteCallback> pair = this.buffer.get(Long.valueOf(this.appendPos));
            if (pair == null) {
                return;
            }
            DStorageWrite.Request request = new DStorageWrite.Request(this.stream.getValue0(), this.appendPos, pair.getValue0().getValue().get());
            this.pendingWriteReqs.add(request.eventId);
            this.proxy.trigger(request, this.writePort);
            this.appendPos += pair.getValue0().getValue().get().length;
            this.blockPos++;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(Result result) {
        Result result2 = null;
        try {
            clean();
        } catch (KReferenceException e) {
            result2 = Result.internalFailure(e);
        }
        this.syncExHandling.fail(result);
        if (result2 != null) {
            this.syncExHandling.fail(result2);
        }
    }

    private void clean() throws KReferenceException {
        Iterator<Pair<KReference<byte[]>, WriteCallback>> it = this.buffer.values().iterator();
        while (it.hasNext()) {
            it.next().getValue0().release();
        }
        this.buffer.clear();
    }

    @Override // se.sics.nstream.storage.buffer.KBuffer
    public KBufferReport report() {
        return new SimpleKBufferReport(this.blockPos, this.appendPos, this.buffer.size());
    }

    @Override // se.sics.nstream.storage.AsyncWriteOp
    public /* bridge */ /* synthetic */ void write(KBlock kBlock, KReference kReference, WriteCallback writeCallback) {
        write2(kBlock, (KReference<byte[]>) kReference, writeCallback);
    }

    static /* synthetic */ int access$308(SimpleAppendKBuffer simpleAppendKBuffer) {
        int i = simpleAppendKBuffer.answeredBlockPos;
        simpleAppendKBuffer.answeredBlockPos = i + 1;
        return i;
    }
}
