package se.sics.nstream.storage.cache;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.hibernate.hql.internal.classic.ParserHelper;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentProxy;
import se.sics.kompics.Handler;
import se.sics.kompics.Positive;
import se.sics.kompics.config.Config;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.reference.KReference;
import se.sics.ktoolbox.util.reference.KReferenceException;
import se.sics.ktoolbox.util.reference.KReferenceFactory;
import se.sics.ktoolbox.util.result.DelayedExceptionSyncHandler;
import se.sics.ktoolbox.util.result.Result;
import se.sics.nstream.StreamId;
import se.sics.nstream.storage.cache.KHint;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.events.DStorageRead;
import se.sics.nstream.storage.durable.util.MyStream;
import se.sics.nstream.util.actuator.ComponentLoadTracking;
import se.sics.nstream.util.range.KBlock;
import se.sics.nstream.util.range.KPiece;
import se.sics.nstream.util.range.KRange;
import se.sics.nstream.util.range.RangeKReference;
import se.sics.nstream.util.result.ReadCallback;

/* loaded from: input_file:se/sics/nstream/storage/cache/SimpleKCache.class */
public class SimpleKCache implements KCache {
    private static final Logger LOG;
    private String logPrefix;
    private final KCacheConfig cacheConfig;
    private final Pair<StreamId, MyStream> stream;
    private final Positive<DStoragePort> readPort;
    private final Positive<Timer> timerPort;
    private final ComponentProxy proxy;
    private final DelayedExceptionSyncHandler syncExHandling;
    private final ComponentLoadTracking loadTracker;
    private UUID extendedCacheCleanTid;
    static final /* synthetic */ boolean $assertionsDisabled;
    final TreeMap<Long, Pair<KBlock, CacheKReference>> cacheRef = new TreeMap<>();
    final Map<Long, Pair<KBlock, KReference<byte[]>>> systemRef = new HashMap();
    final Map<Identifier, ReaderHead> readerHeads = new HashMap();
    final TreeMap<Long, Pair<KBlock, List<Identifier>>> pendingCacheFetch = new TreeMap<>();
    final Map<Long, List<Pair<KRange, ReadCallback>>> delayedReads = new HashMap();
    Handler handleExtendedCacheClean = new Handler<ExtendedCacheClean>() { // from class: se.sics.nstream.storage.cache.SimpleKCache.1
        @Override // se.sics.kompics.Handler
        public void handle(ExtendedCacheClean extendedCacheClean) {
            SimpleKCache.LOG.trace("{}extended cache clean", SimpleKCache.this.logPrefix);
            Iterator<Map.Entry<Long, Pair<KBlock, KReference<byte[]>>>> it = SimpleKCache.this.systemRef.entrySet().iterator();
            while (it.hasNext()) {
                if (it.next().getValue().getValue1().isValid()) {
                    SimpleKCache.LOG.debug("{}ref count", SimpleKCache.this.logPrefix);
                } else {
                    it.remove();
                }
            }
            SimpleKCache.LOG.debug("{}cache size - cache ref:{}, system ref:{}", SimpleKCache.this.logPrefix, Integer.valueOf(SimpleKCache.this.cacheRef.size()), Integer.valueOf(SimpleKCache.this.systemRef.size()));
        }
    };
    Handler handleReadResp = new Handler<DStorageRead.Response>() { // from class: se.sics.nstream.storage.cache.SimpleKCache.2
        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Response response) {
            SimpleKCache.LOG.debug("{}received:{}", SimpleKCache.this.logPrefix, response);
            SimpleKCache.this.loadTracker.setCacheSize(SimpleKCache.this.stream, SimpleKCache.this.cacheRef.size(), SimpleKCache.this.systemRef.size());
            if (!response.result.isSuccess()) {
                SimpleKCache.this.fail(response.result);
                return;
            }
            KBlock kBlock = response.req.readRange;
            long lowerAbsEndpoint = kBlock.lowerAbsEndpoint();
            KReference reference = KReferenceFactory.getReference(response.result.getValue());
            CacheKReference createInstance = CacheKReference.createInstance(reference);
            Pair<KBlock, List<Identifier>> remove = SimpleKCache.this.pendingCacheFetch.remove(Long.valueOf(lowerAbsEndpoint));
            if (remove != null) {
                Iterator<Identifier> it = remove.getValue1().iterator();
                while (it.hasNext()) {
                    ReaderHead readerHead = SimpleKCache.this.readerHeads.get(it.next());
                    if (readerHead != null) {
                        readerHead.add(lowerAbsEndpoint, createInstance);
                    }
                }
            }
            List<Pair<KRange, ReadCallback>> remove2 = SimpleKCache.this.delayedReads.remove(Long.valueOf(lowerAbsEndpoint));
            if (remove2 != null) {
                for (Pair<KRange, ReadCallback> pair : remove2) {
                    SimpleKCache.this.readFromBlock(lowerAbsEndpoint, pair.getValue0(), reference, pair.getValue1());
                }
            }
            SimpleKCache.this.silentRelease((KReference<byte[]>) reference);
            SimpleKCache.this.silentRelease(createInstance);
            if (createInstance.isValid()) {
                SimpleKCache.this.cacheRef.put(Long.valueOf(lowerAbsEndpoint), Pair.with(kBlock, createInstance));
            } else if (reference.isValid()) {
                SimpleKCache.this.systemRef.put(Long.valueOf(lowerAbsEndpoint), Pair.with(kBlock, reference));
            }
        }
    };

    /* loaded from: input_file:se/sics/nstream/storage/cache/SimpleKCache$ExtendedCacheClean.class */
    public static class ExtendedCacheClean extends Timeout {
        public ExtendedCacheClean(SchedulePeriodicTimeout schedulePeriodicTimeout) {
            super(schedulePeriodicTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:se/sics/nstream/storage/cache/SimpleKCache$ReaderHead.class */
    public static class ReaderHead {
        public long hintLStamp;
        public final Set<Long> preCaching;
        public final Map<Long, CacheKReference> caching;

        private ReaderHead() {
            this.preCaching = new HashSet();
            this.caching = new HashMap();
        }

        public void add(long j, CacheKReference cacheKReference) {
            if (this.preCaching.remove(Long.valueOf(j))) {
                cacheKReference.retain();
                this.caching.put(Long.valueOf(j), cacheKReference);
            }
        }

        public void releaseAll() throws KReferenceException {
            Iterator<CacheKReference> it = this.caching.values().iterator();
            while (it.hasNext()) {
                it.next().release();
            }
            this.preCaching.clear();
            this.caching.clear();
        }

        public Pair<Map<Long, KBlock>, Set<Long>> processHint(KHint.Expanded expanded) throws KReferenceException {
            HashMap hashMap = new HashMap();
            HashSet hashSet = new HashSet();
            if (expanded.lStamp <= this.hintLStamp) {
                return Pair.with(hashMap, hashSet);
            }
            HashSet<Long> hashSet2 = new HashSet(Sets.difference(this.caching.keySet(), expanded.futureReads.keySet()));
            hashSet2.addAll(Sets.difference(this.preCaching, expanded.futureReads.keySet()));
            HashSet<Long> hashSet3 = new HashSet(Sets.difference(expanded.futureReads.keySet(), this.caching.keySet()));
            for (Long l : hashSet2) {
                this.preCaching.remove(l);
                CacheKReference remove = this.caching.remove(l);
                if (remove != null) {
                    remove.release();
                    if (!remove.isValid()) {
                        hashSet.add(l);
                    }
                }
            }
            for (Long l2 : hashSet3) {
                this.preCaching.add(l2);
                hashMap.put(l2, expanded.futureReads.get(l2));
            }
            return Pair.with(hashMap, hashSet);
        }
    }

    public SimpleKCache(Config config, ComponentProxy componentProxy, DelayedExceptionSyncHandler delayedExceptionSyncHandler, ComponentLoadTracking componentLoadTracking, Pair<StreamId, MyStream> pair) {
        this.logPrefix = "";
        this.cacheConfig = new KCacheConfig(config);
        this.proxy = componentProxy;
        this.stream = pair;
        this.syncExHandling = delayedExceptionSyncHandler;
        this.loadTracker = componentLoadTracking;
        this.readPort = componentProxy.getNegative(DStoragePort.class).getPair();
        this.timerPort = componentProxy.getNegative(Timer.class).getPair();
        this.proxy.subscribe(this.handleExtendedCacheClean, this.timerPort);
        this.proxy.subscribe(this.handleReadResp, this.readPort);
        this.logPrefix = "<" + pair.getValue1().endpoint.getEndpointName() + ParserHelper.HQL_VARIABLE_PREFIX + pair.getValue1().resource.getSinkName() + ">";
        LOG.info("{}initiating...", this.logPrefix);
    }

    @Override // se.sics.nstream.util.StreamControl
    public void start() {
        LOG.info("{}starting...", this.logPrefix);
        this.proxy.trigger(scheduleExtendedCacheClean(), this.timerPort);
    }

    private SchedulePeriodicTimeout scheduleExtendedCacheClean() {
        this.cacheConfig.getClass();
        this.cacheConfig.getClass();
        SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(1000L, 1000L);
        ExtendedCacheClean extendedCacheClean = new ExtendedCacheClean(schedulePeriodicTimeout);
        schedulePeriodicTimeout.setTimeoutEvent(extendedCacheClean);
        this.extendedCacheCleanTid = extendedCacheClean.getTimeoutId();
        return schedulePeriodicTimeout;
    }

    @Override // se.sics.nstream.util.StreamControl
    public boolean isIdle() {
        return true;
    }

    @Override // se.sics.nstream.util.StreamControl
    public void close() {
        this.proxy.trigger(new CancelPeriodicTimeout(this.extendedCacheCleanTid), this.timerPort);
        this.proxy.unsubscribe(this.handleExtendedCacheClean, this.timerPort);
        this.proxy.unsubscribe(this.handleReadResp, this.readPort);
        Result clean = clean();
        if (clean.isSuccess()) {
            return;
        }
        this.syncExHandling.fail(clean);
    }

    @Override // se.sics.nstream.storage.cache.CacheHint.Read
    public void setFutureReads(Identifier identifier, KHint.Expanded expanded) {
        ReaderHead readerHead = this.readerHeads.get(identifier);
        if (readerHead == null) {
            readerHead = new ReaderHead();
            this.readerHeads.put(identifier, readerHead);
        }
        try {
            Pair<Map<Long, KBlock>, Set<Long>> processHint = readerHead.processHint(expanded);
            Iterator<Long> it = processHint.getValue1().iterator();
            while (it.hasNext()) {
                cacheClean(it.next().longValue());
            }
            for (Map.Entry<Long, KBlock> entry : processHint.getValue0().entrySet()) {
                long longValue = entry.getKey().longValue();
                if (!checkCache(readerHead, longValue) && !checkSystem(readerHead, longValue)) {
                    addToPendingFetch(identifier, entry.getValue());
                }
            }
        } catch (KReferenceException e) {
            IllegalStateException illegalStateException = new IllegalStateException("OUR cRef problem");
            fail(Result.internalFailure(illegalStateException));
            throw illegalStateException;
        }
    }

    private boolean checkCache(ReaderHead readerHead, long j) {
        Pair<KBlock, CacheKReference> pair = this.cacheRef.get(Long.valueOf(j));
        if (pair == null) {
            return false;
        }
        CacheKReference value1 = pair.getValue1();
        checkCRef(value1);
        readerHead.add(j, value1);
        return true;
    }

    private boolean checkSystem(ReaderHead readerHead, long j) {
        Pair<KBlock, KReference<byte[]>> remove = this.systemRef.remove(Long.valueOf(j));
        if (remove == null) {
            return false;
        }
        KReference<byte[]> value1 = remove.getValue1();
        if (!value1.retain()) {
            return false;
        }
        CacheKReference createInstance = CacheKReference.createInstance(value1);
        readerHead.add(j, createInstance);
        this.cacheRef.put(Long.valueOf(j), Pair.with(remove.getValue0(), createInstance));
        silentRelease(createInstance);
        silentRelease(value1);
        return true;
    }

    private void addToPendingFetch(Identifier identifier, KBlock kBlock) {
        long lowerAbsEndpoint = kBlock.lowerAbsEndpoint();
        Pair<KBlock, List<Identifier>> pair = this.pendingCacheFetch.get(Long.valueOf(lowerAbsEndpoint));
        if (pair == null) {
            pair = Pair.with(kBlock, new LinkedList());
            this.pendingCacheFetch.put(Long.valueOf(lowerAbsEndpoint), pair);
            this.proxy.trigger(new DStorageRead.Request(this.stream.getValue0(), kBlock), this.readPort);
        }
        pair.getValue1().add(identifier);
    }

    @Override // se.sics.nstream.storage.cache.CacheHint.Read
    public void clean(Identifier identifier) {
        try {
            this.readerHeads.remove(identifier).releaseAll();
        } catch (KReferenceException e) {
            IllegalStateException illegalStateException = new IllegalStateException("bad internal cache ref manipulation");
            fail(Result.internalFailure(illegalStateException));
            throw illegalStateException;
        }
    }

    @Override // se.sics.nstream.storage.cache.KCache
    public void buffered(KBlock kBlock, KReference<byte[]> kReference) {
        this.systemRef.put(Long.valueOf(kBlock.lowerAbsEndpoint()), Pair.with(kBlock, kReference));
    }

    @Override // se.sics.nstream.storage.AsyncReadOp
    public void read(KRange kRange, ReadCallback readCallback) {
        if (!(kRange instanceof KBlock) && !(kRange instanceof KPiece)) {
            IllegalArgumentException illegalArgumentException = new IllegalArgumentException("only blocks or pieces are allowed");
            fail(Result.internalFailure(illegalArgumentException));
            throw illegalArgumentException;
        }
        this.loadTracker.setCacheSize(this.stream, this.cacheRef.size(), this.systemRef.size());
        Map.Entry<Long, Pair<KBlock, CacheKReference>> floorEntry = this.cacheRef.floorEntry(Long.valueOf(kRange.lowerAbsEndpoint()));
        if (floorEntry != null) {
            KBlock value0 = floorEntry.getValue().getValue0();
            long lowerAbsEndpoint = value0.lowerAbsEndpoint();
            CacheKReference value1 = floorEntry.getValue().getValue1();
            checkCRef(value1);
            if (value0.encloses(kRange)) {
                readFromBlock(lowerAbsEndpoint, kRange, value1.value(), readCallback);
                return;
            } else if (value0.isConnected(kRange)) {
                IllegalArgumentException illegalArgumentException2 = new IllegalArgumentException("external Block/Piece problem");
                fail(Result.internalFailure(illegalArgumentException2));
                throw illegalArgumentException2;
            }
        }
        Map.Entry<Long, Pair<KBlock, List<Identifier>>> floorEntry2 = this.pendingCacheFetch.floorEntry(Long.valueOf(kRange.lowerAbsEndpoint()));
        if (floorEntry2 == null) {
            IllegalArgumentException illegalArgumentException3 = new IllegalArgumentException("external Read/Hint problem");
            fail(Result.internalFailure(illegalArgumentException3));
            throw illegalArgumentException3;
        }
        KBlock value02 = floorEntry2.getValue().getValue0();
        if (!value02.encloses(kRange)) {
            IllegalArgumentException illegalArgumentException4 = new IllegalArgumentException("external Read/Hint problem or Block/Piece problem");
            fail(Result.internalFailure(illegalArgumentException4));
            throw illegalArgumentException4;
        }
        long lowerAbsEndpoint2 = value02.lowerAbsEndpoint();
        List<Pair<KRange, ReadCallback>> list = this.delayedReads.get(Long.valueOf(lowerAbsEndpoint2));
        if (list == null) {
            list = new LinkedList();
            this.delayedReads.put(Long.valueOf(lowerAbsEndpoint2), list);
        }
        list.add(Pair.with(kRange, readCallback));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(Result result) {
        Result clean = clean();
        this.syncExHandling.fail(result);
        if (clean.isSuccess()) {
            return;
        }
        this.syncExHandling.fail(clean);
    }

    private void cacheClean(long j) {
        Pair<KBlock, CacheKReference> remove = this.cacheRef.remove(Long.valueOf(j));
        if (remove == null) {
            return;
        }
        CacheKReference value1 = remove.getValue1();
        KBlock value0 = remove.getValue0();
        if (value1.isValid()) {
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            this.cacheRef.put(Long.valueOf(j), remove);
        } else {
            KReference<byte[]> kReference = value1.getValue().get();
            if (kReference.isValid() && kReference.retain()) {
                this.systemRef.put(Long.valueOf(j), Pair.with(value0, kReference));
                silentRelease(kReference);
            }
        }
    }

    private Result clean() {
        this.pendingCacheFetch.clear();
        this.delayedReads.clear();
        Iterator<ReaderHead> it = this.readerHeads.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().releaseAll();
            } catch (KReferenceException e) {
                return Result.internalFailure(new IllegalStateException("OUR problem - cRef problem"));
            }
        }
        this.readerHeads.clear();
        Iterator<Pair<KBlock, CacheKReference>> it2 = this.cacheRef.values().iterator();
        while (it2.hasNext()) {
            if (it2.next().getValue1().isValid()) {
                return Result.internalFailure(new IllegalStateException("OUR problem - cRef problem"));
            }
        }
        this.cacheRef.clear();
        this.systemRef.clear();
        return Result.success(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readFromBlock(long j, KRange kRange, KReference<byte[]> kReference, ReadCallback readCallback) {
        if (kRange instanceof KBlock) {
            readCallback.success(Result.success(kReference));
        } else if (kRange instanceof KPiece) {
            RangeKReference createInstance = RangeKReference.createInstance(kReference, j, (KPiece) kRange);
            readCallback.success(Result.success(createInstance));
            silentRelease(createInstance);
        }
    }

    private void checkCRef(CacheKReference cacheKReference) {
        if (cacheKReference.isValid()) {
            return;
        }
        IllegalStateException illegalStateException = new IllegalStateException("OUR cRef problem");
        fail(Result.internalFailure(illegalStateException));
        throw illegalStateException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void silentRelease(KReference<byte[]> kReference) {
        try {
            kReference.release();
        } catch (KReferenceException e) {
            IllegalStateException illegalStateException = new IllegalStateException("external ref problem - someone double release");
            fail(Result.internalFailure(illegalStateException));
            throw illegalStateException;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void silentRelease(CacheKReference cacheKReference) {
        try {
            cacheKReference.release();
        } catch (KReferenceException e) {
            IllegalStateException illegalStateException = new IllegalStateException("OUR cRef problem");
            fail(Result.internalFailure(illegalStateException));
            throw illegalStateException;
        }
    }

    @Override // se.sics.nstream.storage.cache.KCache
    public KCacheReport report() {
        return new SimpleKCacheReport(this.cacheRef.size(), this.systemRef.size());
    }

    static {
        $assertionsDisabled = !SimpleKCache.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) SimpleKCache.class);
    }
}
