package org.apache.spark.shuffle;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.reader.RssShuffleReader;
import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.shuffle.writer.DataPusher;
import org.apache.spark.shuffle.writer.RssShuffleWriter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.storage.BlockManagerId;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.apache.uniffle.shaded.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/spark/shuffle/RssShuffleManager.class */
public class RssShuffleManager extends RssShuffleManagerBase {
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleManager.class);
    private final String clientType;
    private final long heartbeatInterval;
    private final long heartbeatTimeout;
    private final int dataReplica;
    private final int dataReplicaWrite;
    private final int dataReplicaRead;
    private final boolean dataReplicaSkipEnabled;
    private final int dataTransferPoolSize;
    private final int dataCommitPoolSize;
    private final Map<String, Set<Long>> taskToSuccessBlockIds;
    private final Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker;
    private ScheduledExecutorService heartBeatScheduledExecutorService;
    private boolean dynamicConfEnabled;
    private final ShuffleDataDistributionType dataDistributionType;
    private final BlockIdLayout blockIdLayout;
    private final int maxConcurrencyPerPartitionToWrite;
    private final int maxFailures;
    private final boolean speculation;
    private String user;
    private String uuid;
    private DataPusher dataPusher;
    private ShuffleManagerGrpcService service;
    private GrpcServer shuffleManagerServer;
    protected SparkConf sparkConf;
    protected ShuffleWriteClient shuffleWriteClient;
    private ShuffleManagerClient shuffleManagerClient;
    private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
    private boolean rssResubmitStage;
    private boolean taskBlockSendFailureRetryEnabled;
    private boolean shuffleManagerRpcServiceEnabled;
    private Set<String> failuresShuffleServerIds;
    private Map<String, Boolean> serverAssignedInfos;
    private AtomicReference<String> id = new AtomicReference<>();
    private boolean heartbeatStarted = false;
    private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
    private final Map<Integer, Integer> shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
    private final Map<Integer, Integer> shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/shuffle/RssShuffleManager$ReadMetrics.class */
    public static class ReadMetrics extends ShuffleReadMetrics {
        private ShuffleReadMetricsReporter reporter;

        ReadMetrics(ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
            this.reporter = shuffleReadMetricsReporter;
        }

        public void incRemoteBytesRead(long j) {
            this.reporter.incRemoteBytesRead(j);
        }

        public void incFetchWaitTime(long j) {
            this.reporter.incFetchWaitTime(j);
        }

        public void incRecordsRead(long j) {
            this.reporter.incRecordsRead(j);
        }
    }

    /* loaded from: input_file:org/apache/spark/shuffle/RssShuffleManager$WriteMetrics.class */
    public static class WriteMetrics extends ShuffleWriteMetrics {
        private ShuffleWriteMetricsReporter reporter;

        public WriteMetrics(ShuffleWriteMetricsReporter shuffleWriteMetricsReporter) {
            this.reporter = shuffleWriteMetricsReporter;
        }

        public void incBytesWritten(long j) {
            this.reporter.incBytesWritten(j);
        }

        public void incRecordsWritten(long j) {
            this.reporter.incRecordsWritten(j);
        }

        public void incWriteTime(long j) {
            this.reporter.incWriteTime(j);
        }
    }

    public RssShuffleManager(SparkConf sparkConf, boolean z) {
        this.sparkConf = sparkConf;
        if (!((Boolean) Optional.ofNullable(SparkEnv.get()).map(sparkEnv -> {
            return Boolean.valueOf(sparkEnv.serializer().supportsRelocationOfSerializedObjects());
        }).orElse(true)).booleanValue()) {
            LOG.warn("RSSShuffleManager requires a serializer which supports relocations of serialized object. Please set spark.serializer to org.apache.spark.serializer.KryoSerializer instead");
        }
        this.user = this.sparkConf.get("spark.rss.quota.user", "user");
        this.uuid = this.sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
        this.dynamicConfEnabled = ((Boolean) this.sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED)).booleanValue();
        if (z && this.dynamicConfEnabled) {
            fetchAndApplyDynamicConf(this.sparkConf);
        }
        RssSparkShuffleUtils.validateRssClientConf(this.sparkConf);
        this.dataReplica = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA)).intValue();
        this.dataReplicaWrite = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE)).intValue();
        this.dataReplicaRead = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ)).intValue();
        this.dataReplicaSkipEnabled = ((Boolean) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED)).booleanValue();
        LOG.info("Check quorum config [" + this.dataReplica + ":" + this.dataReplicaWrite + ":" + this.dataReplicaRead + ":" + this.dataReplicaSkipEnabled + "]");
        RssUtils.checkQuorumSetting(this.dataReplica, this.dataReplicaWrite, this.dataReplicaRead);
        this.heartbeatInterval = ((Long) this.sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL)).longValue();
        this.heartbeatTimeout = this.sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), this.heartbeatInterval / 2);
        int intValue = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)).intValue();
        this.clientType = (String) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
        this.dataDistributionType = getDataDistributionType(this.sparkConf);
        RssConf rssConf = RssSparkConfig.toRssConf(this.sparkConf);
        this.maxConcurrencyPerPartitionToWrite = ((Integer) rssConf.get(RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)).intValue();
        this.maxFailures = this.sparkConf.getInt("spark.task.maxFailures", 4);
        this.speculation = this.sparkConf.getBoolean("spark.speculation", false);
        configureBlockIdLayout(this.sparkConf, rssConf);
        this.blockIdLayout = BlockIdLayout.from(rssConf);
        long longValue = ((Long) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)).longValue();
        int intValue2 = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM)).intValue();
        this.dataTransferPoolSize = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE)).intValue();
        this.dataCommitPoolSize = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE)).intValue();
        this.shuffleWriteClient = ShuffleClientFactory.getInstance().createShuffleWriteClient(ShuffleClientFactory.newWriteBuilder().clientType(this.clientType).retryMax(intValue).retryIntervalMax(longValue).heartBeatThreadNum(intValue2).replica(this.dataReplica).replicaWrite(this.dataReplicaWrite).replicaRead(this.dataReplicaRead).replicaSkipEnabled(this.dataReplicaSkipEnabled).dataTransferPoolSize(this.dataTransferPoolSize).dataCommitPoolSize(this.dataCommitPoolSize).unregisterThreadPoolSize(((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE)).intValue()).unregisterRequestTimeSec(((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC)).intValue()).rssConf(rssConf));
        registerCoordinator();
        this.sparkConf.set("spark.shuffle.service.enabled", "false");
        this.sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "false");
        LOG.info("Disable external shuffle service in RssShuffleManager.");
        this.sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
        LOG.info("Disable local shuffle reader in RssShuffleManager.");
        this.sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
        LOG.info("Disable shuffle data locality in RssShuffleManager.");
        this.taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
        this.taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
        this.rssResubmitStage = rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && RssSparkShuffleUtils.isStageResubmitSupported();
        this.taskBlockSendFailureRetryEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
        this.shuffleManagerRpcServiceEnabled = this.taskBlockSendFailureRetryEnabled || this.rssResubmitStage;
        if (z) {
            this.heartBeatScheduledExecutorService = ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
            if (this.shuffleManagerRpcServiceEnabled) {
                LOG.info("stage resubmit is supported and enabled");
                rssConf.set(RssBaseConf.RPC_SERVER_PORT, 0);
                ShuffleManagerServerFactory shuffleManagerServerFactory = new ShuffleManagerServerFactory(this, rssConf);
                this.service = shuffleManagerServerFactory.getService();
                this.shuffleManagerServer = shuffleManagerServerFactory.getServer(this.service);
                try {
                    this.shuffleManagerServer.start();
                    this.sparkConf.set(RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT, Integer.valueOf(this.shuffleManagerServer.getPort()));
                } catch (Exception e) {
                    LOG.error("Failed to start shuffle manager server", e);
                    throw new RssException(e);
                }
            }
        }
        LOG.info("Rss data pusher is starting...");
        this.dataPusher = new DataPusher(this.shuffleWriteClient, this.taskToSuccessBlockIds, this.taskToFailedBlockSendTracker, this.failedTaskIds, ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE)).intValue(), ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE)).intValue());
        this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
        this.failuresShuffleServerIds = Sets.newHashSet();
        this.serverAssignedInfos = JavaUtils.newConcurrentMap();
    }

    public CompletableFuture<Long> sendData(AddBlockEvent addBlockEvent) {
        return (this.dataPusher == null || addBlockEvent == null) ? new CompletableFuture<>() : this.dataPusher.send(addBlockEvent);
    }

    @VisibleForTesting
    protected static ShuffleDataDistributionType getDataDistributionType(SparkConf sparkConf) {
        RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
        return (!((Boolean) sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED())).booleanValue() || rssConf.containsKey(RssClientConf.DATA_DISTRIBUTION_TYPE.key())) ? (ShuffleDataDistributionType) rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE) : ShuffleDataDistributionType.LOCAL_ORDER;
    }

    @VisibleForTesting
    RssShuffleManager(SparkConf sparkConf, boolean z, DataPusher dataPusher, Map<String, Set<Long>> map, Map<String, FailedBlockSendTracker> map2) {
        this.sparkConf = sparkConf;
        this.clientType = (String) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
        RssConf rssConf = RssSparkConfig.toRssConf(this.sparkConf);
        this.dataDistributionType = (ShuffleDataDistributionType) rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
        this.blockIdLayout = BlockIdLayout.from(rssConf);
        this.maxConcurrencyPerPartitionToWrite = ((Integer) rssConf.get(RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)).intValue();
        this.maxFailures = this.sparkConf.getInt("spark.task.maxFailures", 4);
        this.speculation = this.sparkConf.getBoolean("spark.speculation", false);
        configureBlockIdLayout(this.sparkConf, rssConf);
        this.heartbeatInterval = ((Long) this.sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL)).longValue();
        this.heartbeatTimeout = this.sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), this.heartbeatInterval / 2);
        this.dataReplica = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA)).intValue();
        this.dataReplicaWrite = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE)).intValue();
        this.dataReplicaRead = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ)).intValue();
        this.dataReplicaSkipEnabled = ((Boolean) this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED)).booleanValue();
        LOG.info("Check quorum config [" + this.dataReplica + ":" + this.dataReplicaWrite + ":" + this.dataReplicaRead + ":" + this.dataReplicaSkipEnabled + "]");
        RssUtils.checkQuorumSetting(this.dataReplica, this.dataReplicaWrite, this.dataReplicaRead);
        int intValue = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)).intValue();
        long longValue = ((Long) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)).longValue();
        int intValue2 = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM)).intValue();
        this.dataTransferPoolSize = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE)).intValue();
        this.dataCommitPoolSize = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE)).intValue();
        this.shuffleWriteClient = ShuffleClientFactory.getInstance().createShuffleWriteClient(ShuffleClientFactory.newWriteBuilder().clientType(this.clientType).retryMax(intValue).retryIntervalMax(longValue).heartBeatThreadNum(intValue2).replica(this.dataReplica).replicaWrite(this.dataReplicaWrite).replicaRead(this.dataReplicaRead).replicaSkipEnabled(this.dataReplicaSkipEnabled).dataTransferPoolSize(this.dataTransferPoolSize).dataCommitPoolSize(this.dataCommitPoolSize).unregisterThreadPoolSize(((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE)).intValue()).unregisterRequestTimeSec(((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC)).intValue()).rssConf(rssConf));
        this.taskToSuccessBlockIds = map;
        this.heartBeatScheduledExecutorService = null;
        this.taskToFailedBlockSendTracker = map2;
        this.dataPusher = dataPusher;
    }

    public <K, V, C> ShuffleHandle registerShuffle(int i, ShuffleDependency<K, V, C> shuffleDependency) {
        if (shuffleDependency.partitioner().numPartitions() > this.blockIdLayout.maxNumPartitions) {
            throw new RssException("Cannot register shuffle with " + shuffleDependency.partitioner().numPartitions() + " partitions because the configured block id layout supports at most " + this.blockIdLayout.maxNumPartitions + " partitions.");
        }
        if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
            throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + i + ", because the serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object relocation.");
        }
        if (this.id.get() == null) {
            this.id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + this.uuid);
            this.dataPusher.setRssAppId(this.id.get());
        }
        LOG.info("Generate application id used in rss: " + this.id.get());
        if (shuffleDependency.partitioner().numPartitions() == 0) {
            this.shuffleIdToPartitionNum.putIfAbsent(Integer.valueOf(i), 0);
            this.shuffleIdToNumMapTasks.putIfAbsent(Integer.valueOf(i), Integer.valueOf(shuffleDependency.rdd().partitions().length));
            LOG.info("RegisterShuffle with ShuffleId[" + i + "], partitionNum is 0, return the empty RssShuffleHandle directly");
            return new RssShuffleHandle(i, this.id.get(), shuffleDependency.rdd().getNumPartitions(), shuffleDependency, RssSparkShuffleUtils.broadcastShuffleHdlInfo(RssSparkShuffleUtils.getActiveSparkContext(), i, Collections.emptyMap(), RemoteStorageInfo.EMPTY_REMOTE_STORAGE));
        }
        RemoteStorageInfo fetchRemoteStorage = ClientUtils.fetchRemoteStorage(this.id.get(), getDefaultRemoteStorageInfo(this.sparkConf), this.dynamicConfEnabled, this.sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()), this.shuffleWriteClient);
        Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(this.sparkConf);
        ClientUtils.validateClientType(this.clientType);
        assignmentTags.add(this.clientType);
        int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(this.sparkConf);
        long longValue = ((Long) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL)).longValue();
        int intValue = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES)).intValue();
        int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(this.sparkConf);
        try {
            Map map = (Map) RetryUtils.retry(() -> {
                ShuffleAssignmentsInfo shuffleAssignments = this.shuffleWriteClient.getShuffleAssignments(this.id.get(), i, shuffleDependency.partitioner().numPartitions(), 1, assignmentTags, requiredShuffleServerNumber, estimateTaskConcurrency);
                registerShuffleServers(this.id.get(), i, shuffleAssignments.getServerToPartitionRanges(), fetchRemoteStorage);
                return shuffleAssignments.getPartitionToServers();
            }, longValue, intValue);
            startHeartbeat();
            this.shuffleIdToPartitionNum.putIfAbsent(Integer.valueOf(i), Integer.valueOf(shuffleDependency.partitioner().numPartitions()));
            this.shuffleIdToNumMapTasks.putIfAbsent(Integer.valueOf(i), Integer.valueOf(shuffleDependency.rdd().partitions().length));
            if (this.shuffleManagerRpcServiceEnabled) {
                this.shuffleIdToShuffleHandleInfo.put(Integer.valueOf(i), new ShuffleHandleInfo(i, map, fetchRemoteStorage));
            }
            Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo = RssSparkShuffleUtils.broadcastShuffleHdlInfo(RssSparkShuffleUtils.getActiveSparkContext(), i, map, fetchRemoteStorage);
            LOG.info("RegisterShuffle with ShuffleId[" + i + "], partitionNum[" + map.size() + "], shuffleServerForResult: " + map);
            return new RssShuffleHandle(i, this.id.get(), shuffleDependency.rdd().getNumPartitions(), shuffleDependency, broadcastShuffleHdlInfo);
        } catch (Throwable th) {
            throw new RssException("registerShuffle failed!", th);
        }
    }

    public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle shuffleHandle, long j, TaskContext taskContext, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter) {
        if (!(shuffleHandle instanceof RssShuffleHandle)) {
            throw new RssException("Unexpected ShuffleHandle:" + shuffleHandle.getClass().getName());
        }
        RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) shuffleHandle;
        setPusherAppId(rssShuffleHandle);
        int shuffleId = rssShuffleHandle.getShuffleId();
        ShuffleWriteMetrics writeMetrics = shuffleWriteMetricsReporter != null ? new WriteMetrics(shuffleWriteMetricsReporter) : taskContext.taskMetrics().shuffleWriteMetrics();
        String str = "" + taskContext.taskAttemptId() + "_" + taskContext.attemptNumber();
        LOG.info("RssHandle appId {} shuffleId {} ", rssShuffleHandle.getAppId(), Integer.valueOf(rssShuffleHandle.getShuffleId()));
        return new RssShuffleWriter(rssShuffleHandle.getAppId(), shuffleId, str, getTaskAttemptIdForBlockId(taskContext.partitionId(), taskContext.attemptNumber()), writeMetrics, this, this.sparkConf, this.shuffleWriteClient, rssShuffleHandle, this::markFailedTask, taskContext);
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerBase
    public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
        configureBlockIdLayout(sparkConf, rssConf, this.maxFailures, this.speculation);
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerBase
    public long getTaskAttemptIdForBlockId(int i, int i2) {
        return getTaskAttemptIdForBlockId(i, i2, this.maxFailures, this.speculation, this.blockIdLayout.taskAttemptIdBits);
    }

    public void setPusherAppId(RssShuffleHandle rssShuffleHandle) {
        if (this.id.get() == null) {
            this.id.compareAndSet(null, rssShuffleHandle.getAppId());
            this.dataPusher.setRssAppId(this.id.get());
        }
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle shuffleHandle, int i, int i2, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        return getReader(shuffleHandle, 0, Integer.MAX_VALUE, i, i2, taskContext, shuffleReadMetricsReporter);
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle shuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        long currentTimeMillis = System.currentTimeMillis();
        Roaring64NavigableMap expectedTasksByExecutorId = getExpectedTasksByExecutorId(shuffleHandle.shuffleId(), i3, i4, i, i2);
        LOG.info("Get taskId cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms, and request expected blockIds from " + expectedTasksByExecutorId.getLongCardinality() + " tasks for shuffleId[" + shuffleHandle.shuffleId() + "], partitionId[" + i3 + ", " + i4 + "]");
        return getReaderImpl(shuffleHandle, i, i2, i3, i4, taskContext, shuffleReadMetricsReporter, expectedTasksByExecutorId);
    }

    public <K, C> ShuffleReader<K, C> getReaderForRange(ShuffleHandle shuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        long currentTimeMillis = System.currentTimeMillis();
        Roaring64NavigableMap expectedTasksByRange = getExpectedTasksByRange(shuffleHandle.shuffleId(), i3, i4, i, i2);
        LOG.info("Get taskId cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms, and request expected blockIds from " + expectedTasksByRange.getLongCardinality() + " tasks for shuffleId[" + shuffleHandle.shuffleId() + "], partitionId[" + i3 + ", " + i4 + "]");
        return getReaderImpl(shuffleHandle, i, i2, i3, i4, taskContext, shuffleReadMetricsReporter, expectedTasksByRange);
    }

    public <K, C> ShuffleReader<K, C> getReaderImpl(ShuffleHandle shuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter, Roaring64NavigableMap roaring64NavigableMap) {
        if (!(shuffleHandle instanceof RssShuffleHandle)) {
            throw new RssException("Unexpected ShuffleHandle:" + shuffleHandle.getClass().getName());
        }
        RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle) shuffleHandle;
        int numPartitions = rssShuffleHandle.getDependency().partitioner().numPartitions();
        int shuffleId = rssShuffleHandle.getShuffleId();
        Map<Integer, List<ShuffleServerInfo>> partitionToServers = getShuffleHandleInfo(rssShuffleHandle).getPartitionToServers();
        Map<ShuffleServerInfo, Set<Integer>> generateServerToPartitions = RssUtils.generateServerToPartitions((Map) partitionToServers.entrySet().stream().filter(entry -> {
            return ((Integer) entry.getKey()).intValue() >= i3 && ((Integer) entry.getKey()).intValue() < i4;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        long currentTimeMillis = System.currentTimeMillis();
        Roaring64NavigableMap shuffleResultForMultiPart = getShuffleResultForMultiPart(this.clientType, generateServerToPartitions, rssShuffleHandle.getAppId(), shuffleId, taskContext.stageAttemptNumber());
        LOG.info("Get shuffle blockId cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms, and get " + shuffleResultForMultiPart.getLongCardinality() + " blockIds for shuffleId[" + shuffleId + "], startPartition[" + i3 + "], endPartition[" + i4 + "]");
        ShuffleReadMetrics readMetrics = shuffleReadMetricsReporter != null ? new ReadMetrics(shuffleReadMetricsReporter) : taskContext.taskMetrics().shuffleReadMetrics();
        RemoteStorageInfo remoteStorage = rssShuffleHandle.getRemoteStorage();
        LOG.info("Shuffle reader using remote storage {}", remoteStorage);
        return new RssShuffleReader(i3, i4, i, i2, taskContext, rssShuffleHandle, remoteStorage.getPath(), RssSparkShuffleUtils.getRemoteStorageHadoopConf(this.sparkConf, remoteStorage), numPartitions, RssUtils.generatePartitionToBitmap(shuffleResultForMultiPart, i3, i4, this.blockIdLayout), roaring64NavigableMap, readMetrics, RssSparkConfig.toRssConf(this.sparkConf), this.dataDistributionType, partitionToServers);
    }

    @SuppressFBWarnings({"REC_CATCH_EXCEPTION"})
    private Roaring64NavigableMap getExpectedTasksByExecutorId(int i, int i2, int i3, int i4, int i5) {
        Roaring64NavigableMap bitmapOf = Roaring64NavigableMap.bitmapOf(new long[0]);
        try {
            Iterator iterator = (Spark3VersionUtils.MAJOR_VERSION > 3 || Spark3VersionUtils.MINOR_VERSION > 2 || (Spark3VersionUtils.MINOR_VERSION == 2 && !Spark3VersionUtils.isSpark320()) || Spark3VersionUtils.MINOR_VERSION == 1) ? (Iterator) SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke(SparkEnv.get().mapOutputTracker(), Integer.valueOf(i), Integer.valueOf(i4), Integer.valueOf(i5), Integer.valueOf(i2), Integer.valueOf(i3)) : Spark3VersionUtils.isSpark320() ? (Iterator) MapOutputTracker.class.getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke(SparkEnv.get().mapOutputTracker(), Integer.valueOf(i), Integer.valueOf(i4), Integer.valueOf(i5), Integer.valueOf(i2), Integer.valueOf(i3)) : (Iterator) SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke(SparkEnv.get().mapOutputTracker(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
            while (iterator.hasNext()) {
                Tuple2 tuple2 = (Tuple2) iterator.next();
                if (!((BlockManagerId) tuple2._1()).topologyInfo().isDefined()) {
                    throw new RssException("Can't get expected taskAttemptId");
                }
                bitmapOf.add(Long.parseLong((String) ((BlockManagerId) tuple2._1()).topologyInfo().get()));
            }
            return bitmapOf;
        } catch (Exception e) {
            throw new RssException(e);
        }
    }

    private Roaring64NavigableMap getExpectedTasksByRange(int i, int i2, int i3, int i4, int i5) {
        Roaring64NavigableMap bitmapOf = Roaring64NavigableMap.bitmapOf(new long[0]);
        try {
            Iterator iterator = (Iterator) SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByRange", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke(SparkEnv.get().mapOutputTracker(), Integer.valueOf(i), Integer.valueOf(i4), Integer.valueOf(i5), Integer.valueOf(i2), Integer.valueOf(i3));
            while (iterator.hasNext()) {
                Tuple2 tuple2 = (Tuple2) iterator.next();
                if (!((BlockManagerId) tuple2._1()).topologyInfo().isDefined()) {
                    throw new RssException("Can't get expected taskAttemptId");
                }
                bitmapOf.add(Long.parseLong((String) ((BlockManagerId) tuple2._1()).topologyInfo().get()));
            }
            return bitmapOf;
        } catch (Exception e) {
            throw new RssException(e);
        }
    }

    public boolean unregisterShuffle(int i) {
        try {
            if (SparkEnv.get().executorId().equals("driver")) {
                this.shuffleWriteClient.unregisterShuffle(this.id.get(), i);
                this.shuffleIdToPartitionNum.remove(Integer.valueOf(i));
                this.shuffleIdToNumMapTasks.remove(Integer.valueOf(i));
                if (this.service != null) {
                    this.service.unregisterShuffle(i);
                }
            }
            return true;
        } catch (Exception e) {
            LOG.warn("Errors on unregister to remote shuffle-servers", e);
            return true;
        }
    }

    public ShuffleBlockResolver shuffleBlockResolver() {
        throw new RssException("RssShuffleManager.shuffleBlockResolver is not implemented");
    }

    public void stop() {
        if (this.heartBeatScheduledExecutorService != null) {
            this.heartBeatScheduledExecutorService.shutdownNow();
        }
        if (this.shuffleWriteClient != null) {
            this.shuffleWriteClient.unregisterShuffle(getAppId());
            this.shuffleWriteClient.close();
        }
        if (this.dataPusher != null) {
            try {
                this.dataPusher.close();
            } catch (IOException e) {
                LOG.warn("Errors on closing data pusher", e);
            }
        }
        if (this.shuffleManagerServer != null) {
            try {
                this.shuffleManagerServer.stop();
            } catch (InterruptedException e2) {
                LOG.info("shuffle manager server is interrupted during stop");
            }
        }
    }

    public void clearTaskMeta(String str) {
        this.taskToSuccessBlockIds.remove(str);
        this.taskToFailedBlockSendTracker.remove(str);
    }

    @VisibleForTesting
    protected void registerShuffleServers(String str, int i, Map<ShuffleServerInfo, List<PartitionRange>> map, RemoteStorageInfo remoteStorageInfo) {
        if (map == null || map.isEmpty()) {
            return;
        }
        LOG.info("Start to register shuffleId[" + i + "]");
        long currentTimeMillis = System.currentTimeMillis();
        map.entrySet().stream().forEach(entry -> {
            this.shuffleWriteClient.registerShuffle((ShuffleServerInfo) entry.getKey(), str, i, (List) entry.getValue(), remoteStorageInfo, this.dataDistributionType, this.maxConcurrencyPerPartitionToWrite);
        });
        LOG.info("Finish register shuffleId[" + i + "] with " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
    }

    @VisibleForTesting
    protected void registerCoordinator() {
        String str = this.sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM.key());
        LOG.info("Start Registering coordinators {}", str);
        this.shuffleWriteClient.registerCoordinators(str);
    }

    @VisibleForTesting
    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    private synchronized void startHeartbeat() {
        this.shuffleWriteClient.registerApplicationInfo(this.id.get(), this.heartbeatTimeout, this.user);
        if (this.heartbeatStarted) {
            return;
        }
        this.heartBeatScheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                this.shuffleWriteClient.sendAppHeartbeat(this.id.get(), this.heartbeatTimeout);
                LOG.info("Finish send heartbeat to coordinator and servers");
            } catch (Exception e) {
                LOG.warn("Fail to send heartbeat to coordinator and servers", e);
            }
        }, this.heartbeatInterval / 2, this.heartbeatInterval, TimeUnit.MILLISECONDS);
        this.heartbeatStarted = true;
    }

    public Set<Long> getFailedBlockIds(String str) {
        FailedBlockSendTracker blockIdsFailedSendTracker = getBlockIdsFailedSendTracker(str);
        return blockIdsFailedSendTracker == null ? Collections.emptySet() : blockIdsFailedSendTracker.getFailedBlockIds();
    }

    public Set<Long> getSuccessBlockIds(String str) {
        Set<Long> set = this.taskToSuccessBlockIds.get(str);
        if (set == null) {
            set = Collections.emptySet();
        }
        return set;
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public String getAppId() {
        return this.id.get();
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public int getMaxFetchFailures() {
        return Math.max(1, this.sparkConf.getInt("spark.task.maxFailures", 4) - 1);
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public int getPartitionNum(int i) {
        return this.shuffleIdToPartitionNum.getOrDefault(Integer.valueOf(i), 0).intValue();
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public int getNumMaps(int i) {
        return this.shuffleIdToNumMapTasks.getOrDefault(Integer.valueOf(i), 0).intValue();
    }

    @VisibleForTesting
    public void setAppId(String str) {
        this.id = new AtomicReference<>(str);
    }

    public boolean markFailedTask(String str) {
        LOG.info("Mark the task: {} failed.", str);
        this.failedTaskIds.add(str);
        return true;
    }

    public boolean isValidTask(String str) {
        return !this.failedTaskIds.contains(str);
    }

    private Roaring64NavigableMap getShuffleResultForMultiPart(String str, Map<ShuffleServerInfo, Set<Integer>> map, String str2, int i, int i2) {
        HashSet newHashSet = Sets.newHashSet();
        try {
            return this.shuffleWriteClient.getShuffleResultForMultiPart(str, map, str2, i, newHashSet);
        } catch (RssFetchFailedException e) {
            throw RssSparkShuffleUtils.reportRssFetchFailedException(e, this.sparkConf, str2, i, i2, newHashSet);
        }
    }

    public FailedBlockSendTracker getBlockIdsFailedSendTracker(String str) {
        return this.taskToFailedBlockSendTracker.get(str);
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int i) {
        return this.shuffleIdToShuffleHandleInfo.get(Integer.valueOf(i));
    }

    private ShuffleManagerClient createShuffleManagerClient(String str, int i) {
        return ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(ClientType.GRPC, str, i);
    }

    public ShuffleHandleInfo getShuffleHandleInfo(RssShuffleHandle<?, ?, ?> rssShuffleHandle) {
        return this.shuffleManagerRpcServiceEnabled ? getRemoteShuffleHandleInfo(rssShuffleHandle.getShuffleId()) : new ShuffleHandleInfo(rssShuffleHandle.getShuffleId(), rssShuffleHandle.getPartitionToServers(), rssShuffleHandle.getRemoteStorage());
    }

    private synchronized ShuffleHandleInfo getRemoteShuffleHandleInfo(int i) {
        RssConf rssConf = RssSparkConfig.toRssConf(this.sparkConf);
        String string = rssConf.getString(Constants.DRIVER_HOST, "");
        int intValue = ((Integer) rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT)).intValue();
        if (this.shuffleManagerClient == null) {
            this.shuffleManagerClient = createShuffleManagerClient(string, intValue);
        }
        RssPartitionToShuffleServerResponse partitionToShufflerServer = this.shuffleManagerClient.getPartitionToShufflerServer(new RssPartitionToShuffleServerRequest(i));
        return new ShuffleHandleInfo(i, partitionToShufflerServer.getPartitionToServers(), partitionToShufflerServer.getRemoteStorageInfo());
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public void addFailuresShuffleServerInfos(String str) {
        this.failuresShuffleServerIds.add(str);
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public synchronized boolean reassignAllShuffleServersForWholeStage(int i, int i2, int i3, int i4) {
        String str = i + "_" + i2;
        if (this.serverAssignedInfos.computeIfAbsent(str, str2 -> {
            return false;
        }).booleanValue()) {
            LOG.info("The Stage:{} has been reassigned in an Attempt{},Return without performing any operation", Integer.valueOf(i), Integer.valueOf(i2));
            return false;
        }
        int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(this.sparkConf);
        int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(this.sparkConf);
        this.shuffleWriteClient.unregisterShuffle(this.id.get(), i3);
        Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment = requestShuffleAssignment(i3, i4, 1, requiredShuffleServerNumber, estimateTaskConcurrency, this.failuresShuffleServerIds, null);
        try {
            unregisterAllMapOutput(i3);
            this.shuffleIdToShuffleHandleInfo.put(Integer.valueOf(i3), new ShuffleHandleInfo(i3, requestShuffleAssignment, getRemoteStorageInfo()));
            this.serverAssignedInfos.put(str, true);
            return true;
        } catch (SparkException e) {
            LOG.error("Clear MapoutTracker Meta failed!");
            throw new RssException("Clear MapoutTracker Meta failed!", e);
        }
    }

    @Override // org.apache.uniffle.shuffle.manager.RssShuffleManagerInterface
    public ShuffleServerInfo reassignFaultyShuffleServerForTasks(int i, Set<Integer> set, String str) {
        ShuffleHandleInfo shuffleHandleInfo = this.shuffleIdToShuffleHandleInfo.get(Integer.valueOf(i));
        synchronized (shuffleHandleInfo) {
            if (shuffleHandleInfo.isExistingFaultyServer(str)) {
                return shuffleHandleInfo.useExistingReassignmentForMultiPartitions(set, str);
            }
            ShuffleServerInfo reassignShuffleServerForTask = reassignShuffleServerForTask(i, set, str);
            if (reassignShuffleServerForTask != null) {
                shuffleHandleInfo.createNewReassignmentForMultiPartitions(set, str, reassignShuffleServerForTask);
            }
            LOG.info("Reassign shuffle-server from {} -> {} for shuffleId: {}, partitionIds: {}", new Object[]{str, reassignShuffleServerForTask, Integer.valueOf(i), set});
            return reassignShuffleServerForTask;
        }
    }

    private ShuffleServerInfo reassignShuffleServerForTask(int i, Set<Integer> set, String str) {
        HashSet newHashSet = Sets.newHashSet(str);
        newHashSet.addAll(this.failuresShuffleServerIds);
        AtomicReference atomicReference = new AtomicReference();
        requestShuffleAssignment(i, 1, 1, 1, 1, newHashSet, shuffleAssignmentsInfo -> {
            if (shuffleAssignmentsInfo == null) {
                return null;
            }
            ShuffleServerInfo shuffleServerInfo = shuffleAssignmentsInfo.getPartitionToServers().values().stream().findFirst().get().get(0);
            atomicReference.set(shuffleServerInfo);
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            java.util.Iterator it = set.iterator();
            while (it.hasNext()) {
                Integer num = (Integer) it.next();
                hashMap.put(num, Arrays.asList(shuffleServerInfo));
                arrayList.add(new PartitionRange(num.intValue(), num.intValue()));
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.put(shuffleServerInfo, arrayList);
            return new ShuffleAssignmentsInfo(hashMap, hashMap2);
        });
        return (ShuffleServerInfo) atomicReference.get();
    }

    private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(int i, int i2, int i3, int i4, int i5, Set<String> set, Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> function) {
        Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(this.sparkConf);
        ClientUtils.validateClientType(this.clientType);
        assignmentTags.add(this.clientType);
        long longValue = ((Long) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL)).longValue();
        int intValue = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES)).intValue();
        set.addAll(this.failuresShuffleServerIds);
        try {
            return (Map) RetryUtils.retry(() -> {
                ShuffleAssignmentsInfo shuffleAssignments = this.shuffleWriteClient.getShuffleAssignments(this.id.get(), i, i2, i3, assignmentTags, i4, i5, set);
                if (function != null) {
                    shuffleAssignments = (ShuffleAssignmentsInfo) function.apply(shuffleAssignments);
                }
                registerShuffleServers(this.id.get(), i, shuffleAssignments.getServerToPartitionRanges(), getRemoteStorageInfo());
                return shuffleAssignments.getPartitionToServers();
            }, longValue, intValue);
        } catch (Throwable th) {
            throw new RssException("registerShuffle failed!", th);
        }
    }

    private RemoteStorageInfo getRemoteStorageInfo() {
        String str = this.sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
        return ClientUtils.fetchRemoteStorage(this.id.get(), new RemoteStorageInfo(this.sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), "")), this.dynamicConfEnabled, str, this.shuffleWriteClient);
    }

    public boolean isRssResubmitStage() {
        return this.rssResubmitStage;
    }

    @VisibleForTesting
    public void setDataPusher(DataPusher dataPusher) {
        this.dataPusher = dataPusher;
    }
}
