/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.shuffle;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.RssStageResubmitManager;
import org.apache.spark.shuffle.ShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleHandle;
import org.apache.spark.shuffle.ShuffleHandleInfoManager;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.Spark3VersionUtils;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.apache.spark.shuffle.reader.RssShuffleReader;
import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.shuffle.writer.DataPusher;
import org.apache.spark.shuffle.writer.RssShuffleWriter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.storage.BlockManagerId;
import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.apache.uniffle.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.shaded.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.shuffle.RssShuffleClientFactory;
import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService;
import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;

public class RssShuffleManager
extends RssShuffleManagerBase {
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleManager.class);
    private final long heartbeatInterval;
    private final long heartbeatTimeout;
    private final int dataReplica;
    private final int dataReplicaWrite;
    private final int dataReplicaRead;
    private final boolean dataReplicaSkipEnabled;
    private final int dataTransferPoolSize;
    private final int dataCommitPoolSize;
    private final Map<String, Set<Long>> taskToSuccessBlockIds;
    private final Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker;
    private ScheduledExecutorService heartBeatScheduledExecutorService;
    private boolean heartbeatStarted = false;
    private final BlockIdLayout blockIdLayout;
    private final int maxFailures;
    private final boolean speculation;
    private String user;
    private String uuid;
    private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
    private DataPusher dataPusher;
    private final Map<Integer, Integer> shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
    private final Map<Integer, Integer> shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();
    private ShuffleManagerGrpcService service;
    private GrpcServer shuffleManagerServer;

    public RssShuffleManager(SparkConf conf, boolean isDriver) {
        this.sparkConf = conf;
        boolean supportsRelocation = Optional.ofNullable(SparkEnv.get()).map(env -> env.serializer().supportsRelocationOfSerializedObjects()).orElse(true);
        if (!supportsRelocation) {
            LOG.warn("RSSShuffleManager requires a serializer which supports relocations of serialized object. Please set spark.serializer to org.apache.spark.serializer.KryoSerializer instead");
        }
        this.user = this.sparkConf.get("spark.rss.quota.user", "user");
        this.uuid = this.sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
        this.dynamicConfEnabled = (Boolean)this.sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
        if (isDriver && this.dynamicConfEnabled) {
            RssShuffleManager.fetchAndApplyDynamicConf(this.sparkConf);
        }
        RssSparkShuffleUtils.validateRssClientConf(this.sparkConf);
        this.dataReplica = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
        this.dataReplicaWrite = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
        this.dataReplicaRead = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
        this.dataReplicaSkipEnabled = (Boolean)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
        LOG.info("Check quorum config [" + this.dataReplica + ":" + this.dataReplicaWrite + ":" + this.dataReplicaRead + ":" + this.dataReplicaSkipEnabled + "]");
        RssUtils.checkQuorumSetting(this.dataReplica, this.dataReplicaWrite, this.dataReplicaRead);
        this.heartbeatInterval = (Long)this.sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
        this.heartbeatTimeout = this.sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), this.heartbeatInterval / 2L);
        int retryMax = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
        this.clientType = (String)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
        this.dataDistributionType = RssShuffleManager.getDataDistributionType(this.sparkConf);
        RssConf rssConf = RssSparkConfig.toRssConf(this.sparkConf);
        RssUtils.setExtraJavaProperties(rssConf);
        this.maxConcurrencyPerPartitionToWrite = rssConf.get(RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
        this.maxFailures = this.sparkConf.getInt("spark.task.maxFailures", 4);
        this.speculation = this.sparkConf.getBoolean("spark.speculation", false);
        this.configureBlockIdLayout(this.sparkConf, rssConf);
        this.blockIdLayout = BlockIdLayout.from(rssConf);
        this.dataTransferPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
        this.dataCommitPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
        this.sparkConf.set("spark.shuffle.service.enabled", "false");
        this.sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "false");
        this.sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
        LOG.info("Disable external shuffle service in RssShuffleManager.");
        this.sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
        LOG.info("Disable local shuffle reader in RssShuffleManager.");
        this.sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
        LOG.info("Disable shuffle data locality in RssShuffleManager.");
        this.taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
        this.taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
        this.rssStageRetryEnabled = rssConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED);
        this.partitionReassignEnabled = rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
        this.rssStageRetryForFetchFailureEnabled = rssConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
        this.rssStageRetryForWriteFailureEnabled = rssConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
        if (this.rssStageRetryForFetchFailureEnabled || this.rssStageRetryForWriteFailureEnabled) {
            this.rssStageRetryEnabled = true;
            ArrayList<String> logTips = new ArrayList<String>();
            if (this.rssStageRetryForWriteFailureEnabled) {
                logTips.add("write");
            }
            if (this.rssStageRetryForWriteFailureEnabled) {
                logTips.add("fetch");
            }
            LOG.info("Activate the stage retry mechanism that will resubmit stage on {} failure", (Object)StringUtils.join(logTips, "/"));
        }
        if (this.partitionReassignEnabled && this.dataReplica > 1) {
            throw new RssException("The feature of task partition reassign is incompatible with multiple replicas mechanism.");
        }
        this.blockIdSelfManagedEnabled = rssConf.getBoolean(RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
        boolean bl = this.shuffleManagerRpcServiceEnabled = this.partitionReassignEnabled || this.rssStageRetryEnabled || this.blockIdSelfManagedEnabled;
        if (isDriver) {
            this.heartBeatScheduledExecutorService = ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
            if (this.shuffleManagerRpcServiceEnabled) {
                LOG.info("stage resubmit is supported and enabled");
                rssConf.set(RssBaseConf.RPC_SERVER_PORT, 0);
                ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(this, rssConf);
                this.service = factory.getService();
                this.shuffleManagerServer = factory.getServer(this.service);
                try {
                    this.shuffleManagerServer.start();
                    this.sparkConf.set(RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT, (Object)this.shuffleManagerServer.getPort());
                }
                catch (Exception e) {
                    LOG.error("Failed to start shuffle manager server", (Throwable)e);
                    throw new RssException(e);
                }
            }
        }
        if (this.shuffleManagerRpcServiceEnabled) {
            this.getOrCreateShuffleManagerClientSupplier();
        }
        int unregisterThreadPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
        int unregisterTimeoutSec = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_TIMEOUT_SEC);
        int unregisterRequestTimeoutSec = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
        long retryIntervalMax = (Long)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
        int heartBeatThreadNum = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
        this.shuffleWriteClient = RssShuffleClientFactory.getInstance().createShuffleWriteClient((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((ShuffleClientFactory.WriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)RssShuffleClientFactory.newWriteBuilder().blockIdSelfManagedEnabled(this.blockIdSelfManagedEnabled)).managerClientSupplier(this.managerClientSupplier)).clientType(this.clientType)).retryMax(retryMax)).retryIntervalMax(retryIntervalMax)).heartBeatThreadNum(heartBeatThreadNum)).replica(this.dataReplica)).replicaWrite(this.dataReplicaWrite)).replicaRead(this.dataReplicaRead)).replicaSkipEnabled(this.dataReplicaSkipEnabled)).dataTransferPoolSize(this.dataTransferPoolSize)).dataCommitPoolSize(this.dataCommitPoolSize)).unregisterThreadPoolSize(unregisterThreadPoolSize)).unregisterTimeSec(unregisterTimeoutSec)).unregisterRequestTimeSec(unregisterRequestTimeoutSec)).rssConf(rssConf));
        this.registerCoordinator();
        LOG.info("Rss data pusher is starting...");
        int poolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
        int keepAliveTime = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
        this.dataPusher = new DataPusher(this.shuffleWriteClient, this.taskToSuccessBlockIds, this.taskToFailedBlockSendTracker, this.failedTaskIds, poolSize, keepAliveTime);
        this.partitionReassignMaxServerNum = rssConf.get(RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
        this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
        this.rssStageResubmitManager = new RssStageResubmitManager();
    }

    public CompletableFuture<Long> sendData(AddBlockEvent event) {
        if (this.dataPusher != null && event != null) {
            return this.dataPusher.send(event);
        }
        return new CompletableFuture<Long>();
    }

    @VisibleForTesting
    protected static ShuffleDataDistributionType getDataDistributionType(SparkConf sparkConf) {
        RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
        if (((Boolean)sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED())).booleanValue() && !rssConf.containsKey(RssClientConf.DATA_DISTRIBUTION_TYPE.key())) {
            return ShuffleDataDistributionType.LOCAL_ORDER;
        }
        return rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
    }

    @VisibleForTesting
    RssShuffleManager(SparkConf conf, boolean isDriver, DataPusher dataPusher, Map<String, Set<Long>> taskToSuccessBlockIds, Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker) {
        this.sparkConf = conf;
        this.clientType = (String)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
        RssConf rssConf = RssSparkConfig.toRssConf(this.sparkConf);
        this.dataDistributionType = rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
        this.blockIdLayout = BlockIdLayout.from(rssConf);
        this.maxConcurrencyPerPartitionToWrite = rssConf.get(RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
        this.maxFailures = this.sparkConf.getInt("spark.task.maxFailures", 4);
        this.speculation = this.sparkConf.getBoolean("spark.speculation", false);
        this.configureBlockIdLayout(this.sparkConf, rssConf);
        this.heartbeatInterval = (Long)this.sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
        this.heartbeatTimeout = this.sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), this.heartbeatInterval / 2L);
        this.dataReplica = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
        this.dataReplicaWrite = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
        this.dataReplicaRead = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
        this.dataReplicaSkipEnabled = (Boolean)this.sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
        LOG.info("Check quorum config [" + this.dataReplica + ":" + this.dataReplicaWrite + ":" + this.dataReplicaRead + ":" + this.dataReplicaSkipEnabled + "]");
        RssUtils.checkQuorumSetting(this.dataReplica, this.dataReplicaWrite, this.dataReplicaRead);
        int retryMax = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
        long retryIntervalMax = (Long)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
        int heartBeatThreadNum = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
        this.dataTransferPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
        this.dataCommitPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
        int unregisterThreadPoolSize = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
        int unregisterTimeoutSec = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_TIMEOUT_SEC);
        int unregisterRequestTimeoutSec = (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
        RssShuffleClientFactory rssShuffleClientFactory = RssShuffleClientFactory.getInstance();
        RssShuffleClientFactory.getInstance();
        this.shuffleWriteClient = rssShuffleClientFactory.createShuffleWriteClient((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)((RssShuffleClientFactory.ExtendWriteClientBuilder)RssShuffleClientFactory.newWriteBuilder().clientType(this.clientType)).retryMax(retryMax)).retryIntervalMax(retryIntervalMax)).heartBeatThreadNum(heartBeatThreadNum)).replica(this.dataReplica)).replicaWrite(this.dataReplicaWrite)).replicaRead(this.dataReplicaRead)).replicaSkipEnabled(this.dataReplicaSkipEnabled)).dataTransferPoolSize(this.dataTransferPoolSize)).dataCommitPoolSize(this.dataCommitPoolSize)).unregisterThreadPoolSize(unregisterThreadPoolSize)).unregisterTimeSec(unregisterTimeoutSec)).unregisterRequestTimeSec(unregisterRequestTimeoutSec)).rssConf(rssConf));
        this.taskToSuccessBlockIds = taskToSuccessBlockIds;
        this.heartBeatScheduledExecutorService = null;
        this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
        this.dataPusher = dataPusher;
        this.partitionReassignMaxServerNum = rssConf.get(RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
        this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
        this.rssStageResubmitManager = new RssStageResubmitManager();
    }

    public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<K, V, C> dependency) {
        MutableShuffleHandleInfo shuffleHandleInfo;
        if (dependency.partitioner().numPartitions() > this.blockIdLayout.maxNumPartitions) {
            throw new RssException("Cannot register shuffle with " + dependency.partitioner().numPartitions() + " partitions because the configured block id layout supports at most " + this.blockIdLayout.maxNumPartitions + " partitions.");
        }
        if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
            throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object relocation.");
        }
        if (this.id.get() == null) {
            this.id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + this.uuid);
            this.appId = (String)this.id.get();
            this.dataPusher.setRssAppId((String)this.id.get());
        }
        LOG.info("Generate application id used in rss: " + (String)this.id.get());
        if (dependency.partitioner().numPartitions() == 0) {
            this.shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
            this.shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
            LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, return the empty RssShuffleHandle directly");
            Broadcast<SimpleShuffleHandleInfo> hdlInfoBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(RssSparkShuffleUtils.getActiveSparkContext(), shuffleId, Collections.emptyMap(), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
            return new RssShuffleHandle<K, V, C>(shuffleId, (String)this.id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
        }
        String storageType = this.sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
        RemoteStorageInfo defaultRemoteStorage = RssShuffleManager.getDefaultRemoteStorageInfo(this.sparkConf);
        RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage((String)this.id.get(), defaultRemoteStorage, this.dynamicConfEnabled, storageType, this.shuffleWriteClient);
        Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(this.sparkConf);
        ClientUtils.validateClientType(this.clientType);
        assignmentTags.add(this.clientType);
        int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(this.sparkConf);
        int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(this.sparkConf);
        Map<Integer, List<ShuffleServerInfo>> partitionToServers = this.requestShuffleAssignment(shuffleId, dependency.partitioner().numPartitions(), 1, requiredShuffleServerNumber, estimateTaskConcurrency, this.rssStageResubmitManager.getServerIdBlackList(), 0);
        this.startHeartbeat();
        this.shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
        this.shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
        if (this.shuffleManagerRpcServiceEnabled && this.rssStageRetryEnabled) {
            shuffleHandleInfo = new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
            StageAttemptShuffleHandleInfo handleInfo = new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, shuffleHandleInfo);
            this.shuffleHandleInfoManager.register(shuffleId, handleInfo);
        } else if (this.shuffleManagerRpcServiceEnabled && this.partitionReassignEnabled) {
            shuffleHandleInfo = new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
            this.shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
        }
        Broadcast<SimpleShuffleHandleInfo> hdlInfoBd = RssSparkShuffleUtils.broadcastShuffleHdlInfo(RssSparkShuffleUtils.getActiveSparkContext(), shuffleId, partitionToServers, remoteStorage);
        LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum[" + partitionToServers.size() + "], shuffleServerForResult: " + partitionToServers);
        return new RssShuffleHandle<K, V, C>(shuffleId, (String)this.id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
    }

    public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
        if (!(handle instanceof RssShuffleHandle)) {
            throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
        }
        RssShuffleHandle rssHandle = (RssShuffleHandle)handle;
        this.setPusherAppId(rssHandle);
        int shuffleId = rssHandle.getShuffleId();
        ShuffleWriteMetrics writeMetrics = metrics != null ? new WriteMetrics(metrics) : context.taskMetrics().shuffleWriteMetrics();
        String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
        return new RssShuffleWriter(rssHandle.getAppId(), shuffleId, taskId, this.getTaskAttemptIdForBlockId(context.partitionId(), context.attemptNumber()), writeMetrics, this, this.sparkConf, this.shuffleWriteClient, this.managerClientSupplier, rssHandle, this::markFailedTask, context);
    }

    @Override
    public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
        RssShuffleManager.configureBlockIdLayout(sparkConf, rssConf, this.maxFailures, this.speculation);
    }

    @Override
    public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
        return RssShuffleManager.getTaskAttemptIdForBlockId(mapIndex, attemptNo, this.maxFailures, this.speculation, this.blockIdLayout.taskAttemptIdBits);
    }

    public void setPusherAppId(RssShuffleHandle rssShuffleHandle) {
        if (this.id.get() == null) {
            this.id.compareAndSet(null, rssShuffleHandle.getAppId());
            this.dataPusher.setRssAppId((String)this.id.get());
        }
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle handle, int startPartition, int endPartition, TaskContext context, ShuffleReadMetricsReporter metrics) {
        return this.getReader(handle, 0, Integer.MAX_VALUE, startPartition, endPartition, context, metrics);
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle handle, int startMapIndex, int endMapIndex, int startPartition, int endPartition, TaskContext context, ShuffleReadMetricsReporter metrics) {
        long start = System.currentTimeMillis();
        Roaring64NavigableMap taskIdBitmap = this.getExpectedTasksByExecutorId(handle.shuffleId(), startPartition, endPartition, startMapIndex, endMapIndex);
        LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, and request expected blockIds from " + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" + startPartition + ", " + endPartition + "]");
        return this.getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics, taskIdBitmap);
    }

    public <K, C> ShuffleReader<K, C> getReaderForRange(ShuffleHandle handle, int startMapIndex, int endMapIndex, int startPartition, int endPartition, TaskContext context, ShuffleReadMetricsReporter metrics) {
        long start = System.currentTimeMillis();
        Roaring64NavigableMap taskIdBitmap = this.getExpectedTasksByRange(handle.shuffleId(), startPartition, endPartition, startMapIndex, endMapIndex);
        LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, and request expected blockIds from " + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" + startPartition + ", " + endPartition + "]");
        return this.getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics, taskIdBitmap);
    }

    public <K, C> ShuffleReader<K, C> getReaderImpl(ShuffleHandle handle, int startMapIndex, int endMapIndex, int startPartition, int endPartition, TaskContext context, ShuffleReadMetricsReporter metrics, Roaring64NavigableMap taskIdBitmap) {
        if (!(handle instanceof RssShuffleHandle)) {
            throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
        }
        RssShuffleHandle rssShuffleHandle = (RssShuffleHandle)handle;
        int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
        int shuffleId = rssShuffleHandle.getShuffleId();
        ShuffleHandleInfo shuffleHandleInfo = this.getShuffleHandleInfo(rssShuffleHandle);
        Map<ShuffleServerInfo, Set<Integer>> serverToPartitions = this.getPartitionDataServers(shuffleHandleInfo, startPartition, endPartition);
        long start = System.currentTimeMillis();
        Roaring64NavigableMap blockIdBitmap = this.getShuffleResultForMultiPart(this.clientType, serverToPartitions, rssShuffleHandle.getAppId(), shuffleId, context.stageAttemptNumber(), shuffleHandleInfo.createPartitionReplicaTracking());
        LOG.info("Get shuffle blockId cost " + (System.currentTimeMillis() - start) + " ms, and get " + blockIdBitmap.getLongCardinality() + " blockIds for shuffleId[" + shuffleId + "], startPartition[" + startPartition + "], endPartition[" + endPartition + "]");
        ShuffleReadMetrics readMetrics = metrics != null ? new ReadMetrics(metrics) : context.taskMetrics().shuffleReadMetrics();
        RemoteStorageInfo shuffleRemoteStorageInfo = rssShuffleHandle.getRemoteStorage();
        LOG.info("Shuffle reader using remote storage {}", (Object)shuffleRemoteStorageInfo);
        String shuffleRemoteStoragePath = shuffleRemoteStorageInfo.getPath();
        Configuration readerHadoopConf = RssSparkShuffleUtils.getRemoteStorageHadoopConf(this.sparkConf, shuffleRemoteStorageInfo);
        return new RssShuffleReader(startPartition, endPartition, startMapIndex, endMapIndex, context, rssShuffleHandle, shuffleRemoteStoragePath, readerHadoopConf, partitionNum, RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition, this.blockIdLayout), taskIdBitmap, readMetrics, this.managerClientSupplier, RssSparkConfig.toRssConf(this.sparkConf), this.dataDistributionType, shuffleHandleInfo.getAllPartitionServersForReader());
    }

    private Map<ShuffleServerInfo, Set<Integer>> getPartitionDataServers(ShuffleHandleInfo shuffleHandleInfo, int startPartition, int endPartition) {
        Map<Integer, List<ShuffleServerInfo>> allPartitionToServers = shuffleHandleInfo.getAllPartitionServersForReader();
        Map<Integer, List<ShuffleServerInfo>> requirePartitionToServers = allPartitionToServers.entrySet().stream().filter(x -> (Integer)x.getKey() >= startPartition && (Integer)x.getKey() < endPartition).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Map<ShuffleServerInfo, Set<Integer>> serverToPartitions = RssUtils.generateServerToPartitions(requirePartitionToServers);
        return serverToPartitions;
    }

    @SuppressFBWarnings(value={"REC_CATCH_EXCEPTION"})
    private Roaring64NavigableMap getExpectedTasksByExecutorId(int shuffleId, int startPartition, int endPartition, int startMapIndex, int endMapIndex) {
        Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(new long[0]);
        Iterator mapStatusIter = null;
        try {
            mapStatusIter = Spark3VersionUtils.MAJOR_VERSION > 3 || Spark3VersionUtils.MINOR_VERSION > 2 || Spark3VersionUtils.MINOR_VERSION == 2 && !Spark3VersionUtils.isSpark320() || Spark3VersionUtils.MINOR_VERSION == 1 ? (Iterator)SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke((Object)SparkEnv.get().mapOutputTracker(), shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) : (Spark3VersionUtils.isSpark320() ? (Iterator)MapOutputTracker.class.getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke((Object)SparkEnv.get().mapOutputTracker(), shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) : (Iterator)SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByExecutorId", Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke((Object)SparkEnv.get().mapOutputTracker(), shuffleId, startPartition, endPartition));
        }
        catch (Exception e) {
            throw new RssException(e);
        }
        while (mapStatusIter.hasNext()) {
            Tuple2 tuple2 = (Tuple2)mapStatusIter.next();
            if (!((BlockManagerId)tuple2._1()).topologyInfo().isDefined()) {
                throw new RssException("Can't get expected taskAttemptId");
            }
            taskIdBitmap.add(Long.parseLong((String)((BlockManagerId)tuple2._1()).topologyInfo().get()));
        }
        return taskIdBitmap;
    }

    private Roaring64NavigableMap getExpectedTasksByRange(int shuffleId, int startPartition, int endPartition, int startMapIndex, int endMapIndex) {
        Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(new long[0]);
        Iterator mapStatusIter = null;
        try {
            mapStatusIter = (Iterator)SparkEnv.get().mapOutputTracker().getClass().getDeclaredMethod("getMapSizesByRange", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE).invoke((Object)SparkEnv.get().mapOutputTracker(), shuffleId, startMapIndex, endMapIndex, startPartition, endPartition);
        }
        catch (Exception e) {
            throw new RssException(e);
        }
        while (mapStatusIter.hasNext()) {
            Tuple2 tuple2 = (Tuple2)mapStatusIter.next();
            if (!((BlockManagerId)tuple2._1()).topologyInfo().isDefined()) {
                throw new RssException("Can't get expected taskAttemptId");
            }
            taskIdBitmap.add(Long.parseLong((String)((BlockManagerId)tuple2._1()).topologyInfo().get()));
        }
        return taskIdBitmap;
    }

    @Override
    public boolean unregisterShuffle(int shuffleId) {
        try {
            super.unregisterShuffle(shuffleId);
            if (SparkEnv.get().executorId().equals("driver")) {
                this.shuffleWriteClient.unregisterShuffle((String)this.id.get(), shuffleId);
                this.shuffleIdToPartitionNum.remove(shuffleId);
                this.shuffleIdToNumMapTasks.remove(shuffleId);
                if (this.service != null) {
                    this.service.unregisterShuffle(shuffleId);
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Errors on unregistering from remote shuffle-servers", (Throwable)e);
        }
        return true;
    }

    public ShuffleBlockResolver shuffleBlockResolver() {
        throw new RssException("RssShuffleManager.shuffleBlockResolver is not implemented");
    }

    @Override
    public void stop() {
        super.stop();
        if (this.heartBeatScheduledExecutorService != null) {
            this.heartBeatScheduledExecutorService.shutdownNow();
        }
        if (this.shuffleWriteClient != null) {
            this.shuffleWriteClient.unregisterShuffle(this.getAppId());
            this.shuffleWriteClient.close();
        }
        if (this.dataPusher != null) {
            try {
                this.dataPusher.close();
            }
            catch (IOException e) {
                LOG.warn("Errors on closing data pusher", (Throwable)e);
            }
        }
        if (this.shuffleManagerServer != null) {
            try {
                this.shuffleManagerServer.stop();
            }
            catch (InterruptedException e) {
                LOG.info("shuffle manager server is interrupted during stop");
            }
        }
    }

    public void clearTaskMeta(String taskId) {
        this.taskToSuccessBlockIds.remove(taskId);
        this.taskToFailedBlockSendTracker.remove(taskId);
    }

    @VisibleForTesting
    protected void registerCoordinator() {
        String coordinators = this.sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM.key());
        LOG.info("Start Registering coordinators {}", (Object)coordinators);
        this.shuffleWriteClient.registerCoordinators(coordinators, (Long)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX), (Integer)this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX));
    }

    private synchronized void startHeartbeat() {
        this.shuffleWriteClient.registerApplicationInfo((String)this.id.get(), this.heartbeatTimeout, this.user);
        if (!this.heartbeatStarted) {
            this.heartBeatScheduledExecutorService.scheduleAtFixedRate(() -> {
                try {
                    String appId = (String)this.id.get();
                    this.shuffleWriteClient.sendAppHeartbeat(appId, this.heartbeatTimeout);
                    LOG.info("Finish send heartbeat to coordinator and servers");
                }
                catch (Exception e) {
                    LOG.warn("Fail to send heartbeat to coordinator and servers", (Throwable)e);
                }
            }, this.heartbeatInterval / 2L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            this.heartbeatStarted = true;
        }
    }

    public Set<Long> getFailedBlockIds(String taskId) {
        FailedBlockSendTracker blockIdsFailedSendTracker = this.getBlockIdsFailedSendTracker(taskId);
        if (blockIdsFailedSendTracker == null) {
            return Collections.emptySet();
        }
        return blockIdsFailedSendTracker.getFailedBlockIds();
    }

    public Set<Long> getSuccessBlockIds(String taskId) {
        Set<Long> result = this.taskToSuccessBlockIds.get(taskId);
        if (result == null) {
            result = Collections.emptySet();
        }
        return result;
    }

    @Override
    public String getAppId() {
        return (String)this.id.get();
    }

    @Override
    public int getPartitionNum(int shuffleId) {
        return this.shuffleIdToPartitionNum.getOrDefault(shuffleId, 0);
    }

    @Override
    public int getNumMaps(int shuffleId) {
        return this.shuffleIdToNumMapTasks.getOrDefault(shuffleId, 0);
    }

    @VisibleForTesting
    public void setAppId(String appId) {
        this.id = new AtomicReference<String>(appId);
    }

    public boolean markFailedTask(String taskId) {
        LOG.info("Mark the task: {} failed.", (Object)taskId);
        this.failedTaskIds.add(taskId);
        return true;
    }

    public boolean isValidTask(String taskId) {
        return !this.failedTaskIds.contains(taskId);
    }

    private Roaring64NavigableMap getShuffleResultForMultiPart(String clientType, Map<ShuffleServerInfo, Set<Integer>> serverToPartitions, String appId, int shuffleId, int stageAttemptId, PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
        HashSet<Integer> failedPartitions = Sets.newHashSet();
        try {
            return this.shuffleWriteClient.getShuffleResultForMultiPart(clientType, serverToPartitions, appId, shuffleId, failedPartitions, replicaRequirementTracking);
        }
        catch (RssFetchFailedException e) {
            throw RssSparkShuffleUtils.reportRssFetchFailedException(this.managerClientSupplier, e, this.sparkConf, appId, shuffleId, stageAttemptId, failedPartitions);
        }
    }

    public FailedBlockSendTracker getBlockIdsFailedSendTracker(String taskId) {
        return this.taskToFailedBlockSendTracker.get(taskId);
    }

    @VisibleForTesting
    public void setDataPusher(DataPusher dataPusher) {
        this.dataPusher = dataPusher;
    }

    @VisibleForTesting
    public Map<String, Set<Long>> getTaskToSuccessBlockIds() {
        return this.taskToSuccessBlockIds;
    }

    @VisibleForTesting
    public Map<String, FailedBlockSendTracker> getTaskToFailedBlockSendTracker() {
        return this.taskToFailedBlockSendTracker;
    }

    public static class WriteMetrics
    extends ShuffleWriteMetrics {
        private ShuffleWriteMetricsReporter reporter;

        public WriteMetrics(ShuffleWriteMetricsReporter reporter) {
            this.reporter = reporter;
        }

        public void incBytesWritten(long v) {
            this.reporter.incBytesWritten(v);
        }

        public void incRecordsWritten(long v) {
            this.reporter.incRecordsWritten(v);
        }

        public void incWriteTime(long v) {
            this.reporter.incWriteTime(v);
        }
    }

    static class ReadMetrics
    extends ShuffleReadMetrics {
        private ShuffleReadMetricsReporter reporter;

        ReadMetrics(ShuffleReadMetricsReporter reporter) {
            this.reporter = reporter;
        }

        public void incRemoteBytesRead(long v) {
            this.reporter.incRemoteBytesRead(v);
        }

        public void incFetchWaitTime(long v) {
            this.reporter.incFetchWaitTime(v);
        }

        public void incRecordsRead(long v) {
            this.reporter.incRecordsRead(v);
        }
    }
}

