package org.apache.spark.shuffle.reader;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Aggregator;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.util.CompletionIterator;
import org.apache.spark.util.CompletionIterator$;
import org.apache.spark.util.collection.ExternalSorter;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.com.google.common.collect.Lists;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Product2;
import scala.collection.AbstractIterator;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/spark/shuffle/reader/RssShuffleReader.class */
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class);
    private final Map<Integer, List<ShuffleServerInfo>> partitionToShuffleServers;
    private String appId;
    private int shuffleId;
    private int startPartition;
    private int endPartition;
    private TaskContext context;
    private ShuffleDependency<K, ?, C> shuffleDependency;
    private int numMaps;
    private Serializer serializer;
    private String taskId;
    private String basePath;
    private int partitionNum;
    private Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks;
    private Roaring64NavigableMap taskIdBitmap;
    private Configuration hadoopConf;
    private int mapStartIndex;
    private int mapEndIndex;
    private ShuffleReadMetrics readMetrics;
    private RssConf rssConf;
    private ShuffleDataDistributionType dataDistributionType;

    /* loaded from: input_file:org/apache/spark/shuffle/reader/RssShuffleReader$MultiPartitionIterator.class */
    class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
        Iterator<CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>>> iterator;
        CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>> dataIterator;

        MultiPartitionIterator() {
            ArrayList newArrayList = Lists.newArrayList();
            if (RssShuffleReader.this.numMaps <= 0) {
                return;
            }
            for (int i = RssShuffleReader.this.startPartition; i < RssShuffleReader.this.endPartition; i++) {
                if (((Roaring64NavigableMap) RssShuffleReader.this.partitionToExpectBlocks.get(Integer.valueOf(i))).isEmpty()) {
                    RssShuffleReader.LOG.info("{} partition is empty partition", Integer.valueOf(i));
                } else {
                    List<ShuffleServerInfo> list = (List) RssShuffleReader.this.partitionToShuffleServers.get(Integer.valueOf(i));
                    boolean z = (RssShuffleReader.this.mapStartIndex == 0 && RssShuffleReader.this.mapEndIndex == Integer.MAX_VALUE && list.size() <= 1) ? false : true;
                    RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator(RssShuffleReader.this.shuffleDependency.serializer(), ShuffleClientFactory.getInstance().createShuffleReadClient(ShuffleClientFactory.newReadBuilder().appId(RssShuffleReader.this.appId).shuffleId(RssShuffleReader.this.shuffleId).partitionId(i).basePath(RssShuffleReader.this.basePath).partitionNumPerRange(1).partitionNum(RssShuffleReader.this.partitionNum).blockIdBitmap((Roaring64NavigableMap) RssShuffleReader.this.partitionToExpectBlocks.get(Integer.valueOf(i))).taskIdBitmap(RssShuffleReader.this.taskIdBitmap).shuffleServerInfoList(list).hadoopConf(RssShuffleReader.this.hadoopConf).shuffleDataDistributionType(RssShuffleReader.this.dataDistributionType).expectedTaskIdsBitmapFilterEnable(z).retryMax(RssShuffleReader.this.rssConf.getInteger(RssClientConfig.RSS_CLIENT_RETRY_MAX, 50)).retryIntervalMax(RssShuffleReader.this.rssConf.getLong(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 10000L)).rssConf(RssShuffleReader.this.rssConf)), RssShuffleReader.this.readMetrics, RssShuffleReader.this.rssConf);
                    newArrayList.add(CompletionIterator$.MODULE$.apply(rssShuffleDataIterator, () -> {
                        RssShuffleReader.this.context.taskMetrics().mergeShuffleReadMetrics();
                        return rssShuffleDataIterator.cleanup();
                    }));
                }
            }
            this.iterator = newArrayList.iterator();
            if (this.iterator.hasNext()) {
                this.dataIterator = this.iterator.next();
                this.iterator.remove();
            }
            RssShuffleReader.this.context.addTaskCompletionListener(taskContext -> {
                if (this.dataIterator != null) {
                    this.dataIterator.completion();
                }
                this.iterator.forEachRemaining((v0) -> {
                    v0.completion();
                });
            });
        }

        public boolean hasNext() {
            if (this.dataIterator == null) {
                return false;
            }
            while (!this.dataIterator.hasNext()) {
                if (!this.iterator.hasNext()) {
                    return false;
                }
                this.dataIterator = this.iterator.next();
                this.iterator.remove();
            }
            return this.dataIterator.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Product2<K, C> m1184next() {
            return (Product2) this.dataIterator.next();
        }
    }

    public RssShuffleReader(int i, int i2, int i3, int i4, TaskContext taskContext, RssShuffleHandle<K, ?, C> rssShuffleHandle, String str, Configuration configuration, int i5, Map<Integer, Roaring64NavigableMap> map, Roaring64NavigableMap roaring64NavigableMap, ShuffleReadMetrics shuffleReadMetrics, RssConf rssConf, ShuffleDataDistributionType shuffleDataDistributionType, Map<Integer, List<ShuffleServerInfo>> map2) {
        this.appId = rssShuffleHandle.getAppId();
        this.startPartition = i;
        this.endPartition = i2;
        this.mapStartIndex = i3;
        this.mapEndIndex = i4;
        this.context = taskContext;
        this.numMaps = rssShuffleHandle.getNumMaps();
        this.shuffleDependency = rssShuffleHandle.getDependency();
        this.shuffleId = this.shuffleDependency.shuffleId();
        this.serializer = rssShuffleHandle.getDependency().serializer();
        this.taskId = "" + taskContext.taskAttemptId() + "_" + taskContext.attemptNumber();
        this.basePath = str;
        this.partitionNum = i5;
        this.partitionToExpectBlocks = map;
        this.taskIdBitmap = roaring64NavigableMap;
        this.hadoopConf = configuration;
        this.readMetrics = shuffleReadMetrics;
        this.partitionToShuffleServers = map2;
        this.rssConf = rssConf;
        this.dataDistributionType = shuffleDataDistributionType;
    }

    public scala.collection.Iterator<Product2<K, C>> read() {
        scala.collection.Iterator iterator;
        LOG.info("Shuffle read started:" + getReadInfo());
        scala.collection.Iterator multiPartitionIterator = new MultiPartitionIterator();
        scala.collection.Iterator combineCombinersByKey = this.shuffleDependency.aggregator().isDefined() ? this.shuffleDependency.mapSideCombine() ? ((Aggregator) this.shuffleDependency.aggregator().get()).combineCombinersByKey(multiPartitionIterator, this.context) : ((Aggregator) this.shuffleDependency.aggregator().get()).combineValuesByKey(multiPartitionIterator, this.context) : multiPartitionIterator;
        if (this.shuffleDependency.keyOrdering().isDefined()) {
            final ExternalSorter externalSorter = new ExternalSorter(this.context, Option.empty(), Option.empty(), this.shuffleDependency.keyOrdering(), this.serializer);
            LOG.info("Inserting aggregated records to sorter");
            long currentTimeMillis = System.currentTimeMillis();
            externalSorter.insertAll(combineCombinersByKey);
            LOG.info("Inserted aggregated records to sorter: millis:" + (System.currentTimeMillis() - currentTimeMillis));
            this.context.taskMetrics().incMemoryBytesSpilled(externalSorter.memoryBytesSpilled());
            this.context.taskMetrics().incPeakExecutionMemory(externalSorter.peakMemoryUsedBytes());
            this.context.taskMetrics().incDiskBytesSpilled(externalSorter.diskBytesSpilled());
            AbstractFunction0<BoxedUnit> abstractFunction0 = new AbstractFunction0<BoxedUnit>() { // from class: org.apache.spark.shuffle.reader.RssShuffleReader.1
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public BoxedUnit m1183apply() {
                    externalSorter.stop();
                    return BoxedUnit.UNIT;
                }
            };
            this.context.addTaskCompletionListener(new AbstractFunction1<TaskContext, Void>() { // from class: org.apache.spark.shuffle.reader.RssShuffleReader.2
                public Void apply(TaskContext taskContext) {
                    externalSorter.stop();
                    return null;
                }
            });
            iterator = CompletionIterator$.MODULE$.apply(externalSorter.iterator(), abstractFunction0);
        } else {
            iterator = combineCombinersByKey;
        }
        if (!(iterator instanceof InterruptibleIterator)) {
            iterator = new InterruptibleIterator(this.context, iterator);
        }
        if (this.rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && this.rssConf.getInteger(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT, 0) > 0) {
            iterator = RssFetchFailedIterator.newBuilder().appId(this.appId).shuffleId(this.shuffleId).partitionId(this.startPartition).stageAttemptId(this.context.stageAttemptNumber()).reportServerHost(this.rssConf.getString(Constants.DRIVER_HOST, "")).port(((Integer) this.rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT)).intValue()).build(iterator);
        }
        return iterator;
    }

    private String getReadInfo() {
        return "appId=" + this.appId + ", shuffleId=" + this.shuffleId + ",taskId=" + this.taskId + ", partitions: [" + this.startPartition + ", " + this.endPartition + "), maps: [" + this.mapStartIndex + ", " + this.mapEndIndex + ")";
    }

    @VisibleForTesting
    public Configuration getHadoopConf() {
        return this.hadoopConf;
    }
}
