/*
 * Decompiled with CFR 0.152.
 */
package org.apache.uniffle.storage.handler.impl;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PrefetchableClientReadHandler
extends AbstractClientReadHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PrefetchableClientReadHandler.class);
    private boolean prefetchEnabled;
    private int prefetchQueueCapacity;
    private int prefetchTimeoutSec;
    private LinkedBlockingQueue<Optional<ShuffleDataResult>> prefetchResultQueue;
    private ExecutorService prefetchExecutors;
    private AtomicBoolean abnormalFetchTag;
    private AtomicBoolean finishedTag;
    private AtomicInteger queueingNumber;
    private AtomicLong fetchTime;

    public PrefetchableClientReadHandler(Optional<PrefetchOption> prefetchOptional) {
        if (prefetchOptional.isPresent()) {
            PrefetchOption option = prefetchOptional.get();
            if (option.capacity <= 0) {
                throw new RssException("Illegal prefetch capacity: " + option.capacity);
            }
            LOG.info("Prefetch is enabled, capacity: {}", (Object)option.capacity);
            this.prefetchEnabled = true;
            this.prefetchQueueCapacity = option.capacity;
            this.prefetchTimeoutSec = option.timeoutSec;
            this.prefetchResultQueue = new LinkedBlockingQueue(option.capacity);
            this.prefetchExecutors = Executors.newFixedThreadPool(1);
            this.abnormalFetchTag = new AtomicBoolean(false);
            this.finishedTag = new AtomicBoolean(false);
            this.queueingNumber = new AtomicInteger(0);
            this.fetchTime = new AtomicLong(0L);
        } else {
            this.prefetchEnabled = false;
        }
    }

    protected abstract ShuffleDataResult doReadShuffleData();

    @Override
    public ShuffleDataResult readShuffleData() {
        if (!this.prefetchEnabled) {
            return this.doReadShuffleData();
        }
        int free = this.prefetchQueueCapacity - this.prefetchResultQueue.size() - this.queueingNumber.get();
        for (int i = 0; i < free; ++i) {
            this.queueingNumber.incrementAndGet();
            this.prefetchExecutors.submit(() -> {
                long start = System.currentTimeMillis();
                try {
                    if (this.abnormalFetchTag.get() || this.finishedTag.get()) {
                        return;
                    }
                    ShuffleDataResult result = this.doReadShuffleData();
                    if (result == null) {
                        this.finishedTag.set(true);
                    }
                    this.prefetchResultQueue.offer(Optional.ofNullable(result));
                }
                catch (Exception e) {
                    this.abnormalFetchTag.set(true);
                    LOG.error("Errors on doing readShuffleData", (Throwable)e);
                }
                finally {
                    this.queueingNumber.decrementAndGet();
                    this.fetchTime.addAndGet(System.currentTimeMillis() - start);
                }
            });
        }
        long start = System.currentTimeMillis();
        do {
            if (this.abnormalFetchTag.get()) {
                throw new RssException("Fast fail due to the fetch failure");
            }
            try {
                Optional<ShuffleDataResult> optionalShuffleDataResult = this.prefetchResultQueue.poll(10L, TimeUnit.MILLISECONDS);
                if (optionalShuffleDataResult != null) {
                    if (optionalShuffleDataResult.isPresent()) {
                        return optionalShuffleDataResult.get();
                    }
                    return null;
                }
            }
            catch (InterruptedException e) {
                return null;
            }
        } while (System.currentTimeMillis() - start <= (long)(this.prefetchTimeoutSec * 1000));
        throw new RssException("Unexpected duration of reading shuffle data. Fast fail!");
    }

    @Override
    public void close() {
        super.close();
        if (this.prefetchExecutors != null) {
            this.prefetchExecutors.shutdown();
        }
    }

    @Override
    public void logConsumedBlockInfo() {
        LOG.info("Metrics for shuffleId[{}], partitionId[{}], background fetch cost {} ms", new Object[]{this.shuffleId, this.partitionId, this.fetchTime});
        super.logConsumedBlockInfo();
    }

    public static class PrefetchOption {
        private int capacity;
        private int timeoutSec;

        public PrefetchOption(int capacity, int timeoutSec) {
            this.capacity = capacity;
            this.timeoutSec = timeoutSec;
        }
    }
}

