package org.apache.uniffle.client.impl;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.collect.Lists;
import org.apache.uniffle.shaded.com.google.common.collect.Queues;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.apache.uniffle.shaded.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/impl/ShuffleReadClientImpl.class */
public class ShuffleReadClientImpl implements ShuffleReadClient {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleReadClientImpl.class);
    private List<ShuffleServerInfo> shuffleServerInfoList;
    private int shuffleId;
    private int partitionId;
    private ByteBuffer readBuffer;
    private ShuffleDataResult sdr;
    private Roaring64NavigableMap blockIdBitmap;
    private Roaring64NavigableMap taskIdBitmap;
    private Roaring64NavigableMap pendingBlockIds;
    private Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf(new long[0]);
    private Queue<BufferSegment> bufferSegmentQueue = Queues.newLinkedBlockingQueue();
    private AtomicLong readDataTime = new AtomicLong(0);
    private AtomicLong copyTime = new AtomicLong(0);
    private AtomicLong crcCheckTime = new AtomicLong(0);
    private ClientReadHandler clientReadHandler;
    private IdHelper idHelper;
    private BlockIdLayout blockIdLayout;

    public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder readClientBuilder) {
        if (readClientBuilder.getShuffleDataDistributionType() == null) {
            readClientBuilder.shuffleDataDistributionType(ShuffleDataDistributionType.NORMAL);
        }
        if (readClientBuilder.getHadoopConf() == null) {
            readClientBuilder.hadoopConf(new Configuration());
        }
        if (readClientBuilder.getRssConf() == null || readClientBuilder.getRssConf().getKeySet().equals(Sets.newHashSet(RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(), RssClientConf.BLOCKID_PARTITION_ID_BITS.key(), RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key()))) {
            RssConf rssConf = readClientBuilder.getRssConf() == null ? new RssConf() : readClientBuilder.getRssConf();
            rssConf.set(RssClientConf.RSS_STORAGE_TYPE, readClientBuilder.getStorageType());
            rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, Integer.valueOf(readClientBuilder.getIndexReadLimit()));
            rssConf.set(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(readClientBuilder.getReadBufferSize()));
            if (!rssConf.contains(RssClientConf.BLOCKID_SEQUENCE_NO_BITS)) {
                rssConf.setInteger(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, BlockIdLayout.DEFAULT.sequenceNoBits);
            }
            if (!rssConf.contains(RssClientConf.BLOCKID_PARTITION_ID_BITS)) {
                rssConf.setInteger(RssClientConf.BLOCKID_PARTITION_ID_BITS, BlockIdLayout.DEFAULT.partitionIdBits);
            }
            if (!rssConf.contains(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS)) {
                rssConf.setInteger(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, BlockIdLayout.DEFAULT.taskAttemptIdBits);
            }
            readClientBuilder.rssConf(rssConf);
            readClientBuilder.offHeapEnable(false);
            readClientBuilder.expectedTaskIdsBitmapFilterEnable(false);
            if (readClientBuilder.getClientType() == null) {
                readClientBuilder.clientType((ClientType) rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
            }
        } else {
            int intValue = ((Integer) readClientBuilder.getRssConf().get(RssClientConf.RSS_INDEX_READ_LIMIT)).intValue();
            String str = (String) readClientBuilder.getRssConf().get(RssClientConf.RSS_STORAGE_TYPE);
            long sizeAsBytes = readClientBuilder.getRssConf().getSizeAsBytes(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(), RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
            if (sizeAsBytes > 2147483647L) {
                LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
                sizeAsBytes = 2147483647L;
            }
            boolean booleanValue = ((Boolean) readClientBuilder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE)).booleanValue();
            readClientBuilder.indexReadLimit(intValue);
            readClientBuilder.storageType(str);
            readClientBuilder.readBufferSize(sizeAsBytes);
            readClientBuilder.offHeapEnable(booleanValue);
            if (readClientBuilder.getClientType() == null) {
                readClientBuilder.clientType((ClientType) readClientBuilder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
            }
        }
        if (readClientBuilder.getIdHelper() == null) {
            readClientBuilder.idHelper(new DefaultIdHelper(BlockIdLayout.from(readClientBuilder.getRssConf())));
        }
        init(readClientBuilder);
    }

    private void init(ShuffleClientFactory.ReadClientBuilder readClientBuilder) {
        this.shuffleId = readClientBuilder.getShuffleId();
        this.partitionId = readClientBuilder.getPartitionId();
        this.blockIdBitmap = readClientBuilder.getBlockIdBitmap();
        this.taskIdBitmap = readClientBuilder.getTaskIdBitmap();
        this.idHelper = readClientBuilder.getIdHelper();
        this.shuffleServerInfoList = readClientBuilder.getShuffleServerInfoList();
        this.blockIdLayout = BlockIdLayout.from(readClientBuilder.getRssConf());
        CreateShuffleReadHandlerRequest createShuffleReadHandlerRequest = new CreateShuffleReadHandlerRequest();
        createShuffleReadHandlerRequest.setStorageType(readClientBuilder.getStorageType());
        createShuffleReadHandlerRequest.setAppId(readClientBuilder.getAppId());
        createShuffleReadHandlerRequest.setShuffleId(this.shuffleId);
        createShuffleReadHandlerRequest.setPartitionId(this.partitionId);
        createShuffleReadHandlerRequest.setIndexReadLimit(readClientBuilder.getIndexReadLimit());
        createShuffleReadHandlerRequest.setPartitionNumPerRange(readClientBuilder.getPartitionNumPerRange());
        createShuffleReadHandlerRequest.setPartitionNum(readClientBuilder.getPartitionNum());
        createShuffleReadHandlerRequest.setReadBufferSize((int) readClientBuilder.getReadBufferSize());
        createShuffleReadHandlerRequest.setStorageBasePath(readClientBuilder.getBasePath());
        createShuffleReadHandlerRequest.setShuffleServerInfoList(this.shuffleServerInfoList);
        createShuffleReadHandlerRequest.setHadoopConf(readClientBuilder.getHadoopConf());
        createShuffleReadHandlerRequest.setExpectBlockIds(this.blockIdBitmap);
        createShuffleReadHandlerRequest.setProcessBlockIds(this.processedBlockIds);
        createShuffleReadHandlerRequest.setDistributionType(readClientBuilder.getShuffleDataDistributionType());
        createShuffleReadHandlerRequest.setIdHelper(this.idHelper);
        createShuffleReadHandlerRequest.setExpectTaskIds(this.taskIdBitmap);
        createShuffleReadHandlerRequest.setClientConf(readClientBuilder.getRssConf());
        createShuffleReadHandlerRequest.setClientType(readClientBuilder.getClientType());
        createShuffleReadHandlerRequest.setRetryMax(readClientBuilder.getRetryMax());
        createShuffleReadHandlerRequest.setRetryIntervalMax(readClientBuilder.getRetryIntervalMax());
        if (readClientBuilder.isExpectedTaskIdsBitmapFilterEnable()) {
            createShuffleReadHandlerRequest.useExpectedTaskIdsBitmapFilter();
        }
        if (readClientBuilder.isOffHeapEnable()) {
            createShuffleReadHandlerRequest.enableOffHeap();
        }
        ArrayList newArrayList = Lists.newArrayList();
        this.blockIdBitmap.forEach(j -> {
            if (this.taskIdBitmap.contains(this.idHelper.getTaskAttemptId(j))) {
                return;
            }
            newArrayList.add(Long.valueOf(j));
        });
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            this.blockIdBitmap.removeLong(((Long) it.next()).longValue());
        }
        this.pendingBlockIds = RssUtils.cloneBitMap(this.blockIdBitmap);
        this.clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(createShuffleReadHandlerRequest);
    }

    @Override // org.apache.uniffle.client.api.ShuffleReadClient
    public CompressedShuffleBlock readShuffleBlockData() {
        BufferSegment poll;
        while (!this.blockIdBitmap.isEmpty() && !this.pendingBlockIds.isEmpty()) {
            if (this.bufferSegmentQueue.isEmpty() && read() <= 0) {
                return null;
            }
            while (true) {
                poll = this.bufferSegmentQueue.poll();
                if (poll == null) {
                    break;
                }
                if (!this.processedBlockIds.contains(poll.getBlockId()) && this.blockIdBitmap.contains(poll.getBlockId()) && this.taskIdBitmap.contains(poll.getTaskAttemptId())) {
                    long j = -1;
                    long j2 = -1;
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        j = poll.getCrc();
                        j2 = ChecksumUtils.getCrc32(this.readBuffer, poll.getOffset(), poll.getLength());
                        this.crcCheckTime.addAndGet(System.currentTimeMillis() - currentTimeMillis);
                    } catch (Exception e) {
                        LOG.warn("Can't read data for " + this.blockIdLayout.asBlockId(poll.getBlockId()), e);
                    }
                    if (j == j2) {
                        this.processedBlockIds.addLong(poll.getBlockId());
                        this.pendingBlockIds.removeLong(poll.getBlockId());
                        this.clientReadHandler.updateConsumedBlockInfo(poll, false);
                        break;
                    }
                    String str = "Unexpected crc value for " + this.blockIdLayout.asBlockId(poll.getBlockId()) + ", expected:" + j + ", actual:" + j2;
                    if (this.shuffleServerInfoList.size() <= 1) {
                        throw new RssFetchFailedException(str);
                    }
                    LOG.warn(str);
                    this.clientReadHandler.updateConsumedBlockInfo(poll, true);
                } else {
                    this.clientReadHandler.updateConsumedBlockInfo(poll, true);
                    this.processedBlockIds.addLong(poll.getBlockId());
                    this.pendingBlockIds.removeLong(poll.getBlockId());
                }
            }
            if (poll != null) {
                ByteBuffer duplicate = this.readBuffer.duplicate();
                duplicate.position(poll.getOffset());
                duplicate.limit(poll.getOffset() + poll.getLength());
                return new CompressedShuffleBlock(duplicate, poll.getUncompressLength());
            }
        }
        return null;
    }

    @VisibleForTesting
    protected Roaring64NavigableMap getProcessedBlockIds() {
        return this.processedBlockIds;
    }

    private int read() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.sdr != null) {
            this.sdr.release();
            this.sdr = null;
        }
        this.sdr = this.clientReadHandler.readShuffleData();
        this.readDataTime.addAndGet(System.currentTimeMillis() - currentTimeMillis);
        if (this.sdr == null) {
            return 0;
        }
        if (this.readBuffer != null) {
            RssUtils.releaseByteBuffer(this.readBuffer);
        }
        this.readBuffer = this.sdr.getDataBuffer();
        if (this.readBuffer == null || this.readBuffer.capacity() == 0) {
            return 0;
        }
        this.bufferSegmentQueue.addAll(this.sdr.getBufferSegments());
        return this.sdr.getBufferSegments().size();
    }

    @Override // org.apache.uniffle.client.api.ShuffleReadClient
    public void checkProcessedBlockIds() {
        RssUtils.checkProcessedBlockIds(this.blockIdBitmap, this.processedBlockIds);
    }

    @Override // org.apache.uniffle.client.api.ShuffleReadClient
    public void close() {
        if (this.sdr != null) {
            this.sdr.release();
        }
        if (this.readBuffer != null) {
            RssUtils.releaseByteBuffer(this.readBuffer);
        }
        if (this.clientReadHandler != null) {
            this.clientReadHandler.close();
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleReadClient
    public void logStatics() {
        LOG.info("Metrics for shuffleId[" + this.shuffleId + "], partitionId[" + this.partitionId + "], read data cost " + this.readDataTime + " ms, copy data cost " + this.copyTime + " ms, crc check cost " + this.crcCheckTime + " ms");
        this.clientReadHandler.logConsumedBlockInfo();
    }
}
