package org.apache.spark.shuffle.writer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.spark.Aggregator;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.impl.TrackingBlockStatus;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssSendFailedException;
import org.apache.uniffle.common.exception.RssWaitFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.collect.Lists;
import org.apache.uniffle.shaded.com.google.common.collect.Maps;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.apache.uniffle.shaded.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.uniffle.shaded.org.apache.commons.collections4.CollectionUtils;
import org.apache.uniffle.storage.util.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/spark/shuffle/writer/RssShuffleWriter.class */
public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
    private static final String DUMMY_HOST = "dummy_host";
    private static final int DUMMY_PORT = 99999;
    public static final String DEFAULT_ERROR_MESSAGE = "Default Error Message";
    private final String appId;
    private final int shuffleId;
    private final ShuffleHandleInfo shuffleHandleInfo;
    private WriteBufferManager bufferManager;
    private String taskId;
    private final int numMaps;
    private final ShuffleDependency<K, V, C> shuffleDependency;
    private final Partitioner partitioner;
    private final RssShuffleManager shuffleManager;
    private final boolean shouldPartition;
    private final long sendCheckTimeout;
    private final long sendCheckInterval;
    private final int bitmapSplitNum;
    private Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds;
    private final ShuffleWriteClient shuffleWriteClient;
    private final Set<ShuffleServerInfo> shuffleServersForData;
    private final long[] partitionLengths;
    protected final boolean isMemoryShuffleEnabled;
    private final Function<String, Boolean> taskFailureCallback;
    private final Set<Long> blockIds;
    private TaskContext taskContext;
    private SparkConf sparkConf;
    private boolean blockFailSentRetryEnabled;
    private int blockFailSentRetryMaxTimes;
    protected final long taskAttemptId;
    protected final ShuffleWriteMetrics shuffleWriteMetrics;
    private final BlockingQueue<Object> finishEventQueue;
    private TaskAttemptAssignment taskAttemptAssignment;
    private final Supplier<ShuffleManagerClient> managerClientSupplier;
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleWriter.class);
    private static final Set<StatusCode> STATUS_CODE_WITHOUT_BLOCK_RESEND = Sets.newHashSet(StatusCode.NO_REGISTER);

    @VisibleForTesting
    public RssShuffleWriter(String str, int i, String str2, long j, WriteBufferManager writeBufferManager, ShuffleWriteMetrics shuffleWriteMetrics, RssShuffleManager rssShuffleManager, SparkConf sparkConf, ShuffleWriteClient shuffleWriteClient, Supplier<ShuffleManagerClient> supplier, RssShuffleHandle<K, V, C> rssShuffleHandle, ShuffleHandleInfo shuffleHandleInfo, TaskContext taskContext) {
        this(str, i, str2, j, shuffleWriteMetrics, rssShuffleManager, sparkConf, shuffleWriteClient, supplier, rssShuffleHandle, (Function<String, Boolean>) str3 -> {
            return true;
        }, shuffleHandleInfo, taskContext);
        this.bufferManager = writeBufferManager;
        this.taskAttemptAssignment = new TaskAttemptAssignment(j, shuffleHandleInfo);
    }

    private RssShuffleWriter(String str, int i, String str2, long j, ShuffleWriteMetrics shuffleWriteMetrics, RssShuffleManager rssShuffleManager, SparkConf sparkConf, ShuffleWriteClient shuffleWriteClient, Supplier<ShuffleManagerClient> supplier, RssShuffleHandle<K, V, C> rssShuffleHandle, Function<String, Boolean> function, ShuffleHandleInfo shuffleHandleInfo, TaskContext taskContext) {
        this.blockIds = Sets.newConcurrentHashSet();
        this.blockFailSentRetryMaxTimes = 1;
        this.finishEventQueue = new LinkedBlockingQueue();
        LOG.info("RssShuffle start write taskAttemptId[{}] data with RssHandle[appId {}, shuffleId {}].", new Object[]{Long.valueOf(j), rssShuffleHandle.getAppId(), Integer.valueOf(rssShuffleHandle.getShuffleId())});
        this.shuffleManager = rssShuffleManager;
        this.appId = str;
        this.shuffleId = i;
        this.taskId = str2;
        this.taskAttemptId = j;
        this.numMaps = rssShuffleHandle.getNumMaps();
        this.shuffleWriteMetrics = shuffleWriteMetrics;
        this.shuffleDependency = rssShuffleHandle.getDependency();
        this.partitioner = this.shuffleDependency.partitioner();
        this.shouldPartition = this.partitioner.numPartitions() > 1;
        this.sendCheckTimeout = ((Long) sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS)).longValue();
        this.sendCheckInterval = ((Long) sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS)).longValue();
        this.bitmapSplitNum = ((Integer) sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM)).intValue();
        this.serverToPartitionToBlockIds = Maps.newHashMap();
        this.shuffleWriteClient = shuffleWriteClient;
        this.shuffleServersForData = shuffleHandleInfo.getServers();
        this.partitionLengths = new long[this.partitioner.numPartitions()];
        Arrays.fill(this.partitionLengths, 0L);
        this.isMemoryShuffleEnabled = isMemoryShuffleEnabled(sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()));
        this.taskFailureCallback = function;
        this.shuffleHandleInfo = shuffleHandleInfo;
        this.taskContext = taskContext;
        this.sparkConf = sparkConf;
        this.managerClientSupplier = supplier;
        this.blockFailSentRetryEnabled = sparkConf.getBoolean("spark." + RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(), RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue().booleanValue());
        this.blockFailSentRetryMaxTimes = ((Integer) RssSparkConfig.toRssConf(sparkConf).get(RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES)).intValue();
    }

    public RssShuffleWriter(String str, int i, String str2, long j, ShuffleWriteMetrics shuffleWriteMetrics, RssShuffleManager rssShuffleManager, SparkConf sparkConf, ShuffleWriteClient shuffleWriteClient, Supplier<ShuffleManagerClient> supplier, RssShuffleHandle<K, V, C> rssShuffleHandle, Function<String, Boolean> function, TaskContext taskContext) {
        this(str, i, str2, j, shuffleWriteMetrics, rssShuffleManager, sparkConf, shuffleWriteClient, supplier, rssShuffleHandle, function, rssShuffleManager.getShuffleHandleInfo(rssShuffleHandle), taskContext);
        this.bufferManager = new WriteBufferManager(i, str2, j, new BufferManagerOptions(sparkConf), rssShuffleHandle.getDependency().serializer(), taskContext.taskMemoryManager(), shuffleWriteMetrics, RssSparkConfig.toRssConf(sparkConf), (Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>>) this::processShuffleBlockInfos, (Function<Integer, List<ShuffleServerInfo>>) (v1) -> {
            return getPartitionAssignedServers(v1);
        });
        this.taskAttemptAssignment = new TaskAttemptAssignment(j, this.shuffleHandleInfo);
    }

    @VisibleForTesting
    protected List<ShuffleServerInfo> getPartitionAssignedServers(int i) {
        return this.taskAttemptAssignment.retrieve(i);
    }

    private boolean isMemoryShuffleEnabled(String str) {
        return StorageType.withMemory(StorageType.valueOf(str));
    }

    public void write(Iterator<Product2<K, V>> iterator) {
        try {
            writeImpl(iterator);
        } catch (Exception e) {
            this.taskFailureCallback.apply(this.taskId);
            if (!((Boolean) RssSparkConfig.toRssConf(this.sparkConf).get(RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED)).booleanValue()) {
                throw e;
            }
            throwFetchFailedIfNecessary(e);
        }
    }

    protected void writeImpl(Iterator<Product2<K, V>> iterator) {
        Iterator<Product2<K, V>> iterator2 = iterator;
        if (this.shuffleDependency.mapSideCombine()) {
            if (((Boolean) RssSparkConfig.toRssConf(this.sparkConf).get(RssSparkConfig.RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED)).booleanValue()) {
                iterator2 = ((Aggregator) this.shuffleDependency.aggregator().get()).combineValuesByKey(iterator, this.taskContext);
            } else {
                Function1 createCombiner = ((Aggregator) this.shuffleDependency.aggregator().get()).createCombiner();
                iterator2 = iterator.map(product2 -> {
                    return new Tuple2(product2._1(), createCombiner.apply(product2._2()));
                });
            }
        }
        long j = 0;
        while (iterator2.hasNext()) {
            j++;
            checkDataIfAnyFailure();
            Product2 product22 = (Product2) iterator2.next();
            List<ShuffleBlockInfo> addRecord = this.bufferManager.addRecord(getPartition(product22._1()), product22._1(), product22._2());
            if (addRecord != null && !addRecord.isEmpty()) {
                processShuffleBlockInfos(addRecord);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<ShuffleBlockInfo> clear = this.bufferManager.clear(1.0d);
        if (clear != null && !clear.isEmpty()) {
            processShuffleBlockInfos(clear);
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        checkAllBufferSpilled();
        checkSentRecordCount(j);
        checkBlockSendResult(new HashSet(this.blockIds));
        checkSentBlockCount();
        long currentTimeMillis3 = System.currentTimeMillis();
        long j2 = currentTimeMillis3 - currentTimeMillis2;
        if (!this.isMemoryShuffleEnabled) {
            sendCommit();
        }
        long writeTime = this.bufferManager.getWriteTime() + (System.currentTimeMillis() - currentTimeMillis);
        this.shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeTime));
        LOG.info("Finish write shuffle for appId[" + this.appId + "], shuffleId[" + this.shuffleId + "], taskId[" + this.taskId + "] with write " + writeTime + " ms, include checkSendResult[" + j2 + "], commit[" + (System.currentTimeMillis() - currentTimeMillis3) + "], " + this.bufferManager.getManagerCostInfo());
    }

    private void checkAllBufferSpilled() {
        if (this.bufferManager.getBuffers().size() > 0) {
            throw new RssSendFailedException("Potential data loss due to existing remaining data buffers that are not flushed. This should not happen.");
        }
    }

    private void checkSentRecordCount(long j) {
        if (j != this.bufferManager.getRecordCount()) {
            throw new RssSendFailedException("Potential record loss may have occurred while preparing to send blocks for task[" + this.taskId + "]");
        }
    }

    private void checkSentBlockCount() {
        long size = this.blockIds.size();
        long blockCount = this.bufferManager.getBlockCount();
        if (this.serverToPartitionToBlockIds == null) {
            throw new RssException("serverToPartitionToBlockIds should not be null");
        }
        HashSet hashSet = new HashSet();
        java.util.Iterator<Map<Integer, Set<Long>>> it = this.serverToPartitionToBlockIds.values().iterator();
        while (it.hasNext()) {
            it.next().values().forEach(set -> {
                hashSet.addAll(set);
            });
        }
        long size2 = hashSet.size();
        if (size != size2 || size != blockCount) {
            throw new RssSendFailedException("Potential block loss may occur for task[" + this.taskId + "]. BlockId number expected: " + size + ", serverTracked: " + size2 + ", bufferManagerTracked: " + blockCount);
        }
    }

    public long[] getPartitionLengths() {
        return new long[0];
    }

    @VisibleForTesting
    protected List<CompletableFuture<Long>> processShuffleBlockInfos(List<ShuffleBlockInfo> list) {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        list.forEach(shuffleBlockInfo -> {
            long blockId = shuffleBlockInfo.getBlockId();
            this.blockIds.add(Long.valueOf(blockId));
            int partitionId = shuffleBlockInfo.getPartitionId();
            shuffleBlockInfo.getShuffleServerInfos().forEach(shuffleServerInfo -> {
                this.serverToPartitionToBlockIds.computeIfAbsent(shuffleServerInfo, shuffleServerInfo -> {
                    return Maps.newHashMap();
                }).computeIfAbsent(Integer.valueOf(partitionId), num -> {
                    return Sets.newHashSet();
                }).add(Long.valueOf(blockId));
            });
            long[] jArr = this.partitionLengths;
            jArr[partitionId] = jArr[partitionId] + shuffleBlockInfo.getLength();
        });
        return postBlockEvent(list);
    }

    protected List<CompletableFuture<Long>> postBlockEvent(List<ShuffleBlockInfo> list) {
        ArrayList arrayList = new ArrayList();
        for (AddBlockEvent addBlockEvent : this.bufferManager.buildBlockEvents(list)) {
            if (this.blockFailSentRetryEnabled) {
                java.util.Iterator<ShuffleBlockInfo> it = addBlockEvent.getShuffleDataInfoList().iterator();
                while (it.hasNext()) {
                    it.next().withCompletionCallback((shuffleBlockInfo, z) -> {
                        if (z) {
                            this.bufferManager.releaseBlockResource(shuffleBlockInfo);
                        }
                    });
                }
            }
            addBlockEvent.addCallback(() -> {
                if (this.finishEventQueue.add(new Object())) {
                    return;
                }
                LOG.error("Add event " + addBlockEvent + " to finishEventQueue fail");
            });
            arrayList.add(this.shuffleManager.sendData(addBlockEvent));
        }
        return arrayList;
    }

    protected void internalCheckBlockSendResult() {
        checkBlockSendResult(this.blockIds);
    }

    @VisibleForTesting
    protected void checkBlockSendResult(Set<Long> set) {
        boolean z = false;
        try {
            long currentTimeMillis = System.currentTimeMillis() + this.sendCheckTimeout;
            while (true) {
                try {
                    this.finishEventQueue.clear();
                    checkDataIfAnyFailure();
                    set.removeAll(this.shuffleManager.getSuccessBlockIds(this.taskId));
                } catch (InterruptedException e) {
                    z = true;
                }
                if (!set.isEmpty()) {
                    if (this.finishEventQueue.isEmpty()) {
                        if (this.finishEventQueue.poll(Math.max(currentTimeMillis - System.currentTimeMillis(), 0L), TimeUnit.MILLISECONDS) == null) {
                            break;
                        }
                    }
                } else {
                    break;
                }
            }
            if (set.isEmpty()) {
            } else {
                String str = "Timeout: Task[" + this.taskId + "] failed because " + set.size() + " blocks can't be sent to shuffle server in " + this.sendCheckTimeout + " ms.";
                LOG.error(str);
                throw new RssWaitFailedException(str);
            }
        } finally {
            if (z) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void checkDataIfAnyFailure() {
        if (this.blockFailSentRetryEnabled) {
            collectFailedBlocksToResend();
            return;
        }
        String firstBlockFailure = getFirstBlockFailure();
        if (firstBlockFailure != null) {
            throw new RssSendFailedException("Fail to send the block. Error: " + firstBlockFailure);
        }
    }

    private String getFirstBlockFailure() {
        Set<Long> failedBlockIds = this.shuffleManager.getFailedBlockIds(this.taskId);
        if (failedBlockIds.isEmpty()) {
            return null;
        }
        List<TrackingBlockStatus> failedBlockStatus = this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).getFailedBlockStatus(failedBlockIds.iterator().next());
        String str = DEFAULT_ERROR_MESSAGE;
        if (CollectionUtils.isNotEmpty(failedBlockStatus)) {
            str = failedBlockStatus.get(0).getStatusCode().name();
        }
        LOG.error("Errors on sending blocks for task[{}]. {} blocks can't be sent to remote servers: {}", new Object[]{this.taskId, Integer.valueOf(failedBlockIds.size()), this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).getFaultyShuffleServers()});
        return str;
    }

    /* JADX WARN: Code restructure failed: missing block: B:32:0x0094, code lost:
    
        org.apache.spark.shuffle.writer.RssShuffleWriter.LOG.error("Partial blocks for taskId: [{}] retry exceeding the max retry times: [{}]. Fast fail! faulty server list: {}", new java.lang.Object[]{r8.taskId, java.lang.Integer.valueOf(r8.blockFailSentRetryMaxTimes), r0.stream().map((v0) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
            return lambda$collectFailedBlocksToResend$10(v0);
        }).collect(java.util.stream.Collectors.toSet())});
        r11 = true;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void collectFailedBlocksToResend() {
        /*
            Method dump skipped, instructions count: 430
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.spark.shuffle.writer.RssShuffleWriter.collectFailedBlocksToResend():void");
    }

    private void reassignOnPartitionNeedSplit(FailedBlockSendTracker failedBlockSendTracker) {
        HashMap hashMap = new HashMap();
        failedBlockSendTracker.removeAllTrackedPartitions().forEach(trackingPartitionStatus -> {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(trackingPartitionStatus.getPartitionId()), num -> {
                return new ArrayList();
            })).add(new ReceivingFailureServer(trackingPartitionStatus.getShuffleServerInfo().getId(), StatusCode.SUCCESS));
        });
        if (hashMap.isEmpty()) {
            return;
        }
        doReassignOnBlockSendFailure(hashMap, true);
    }

    private void doReassignOnBlockSendFailure(Map<Integer, List<ReceivingFailureServer>> map, boolean z) {
        LOG.info("Initiate reassignOnBlockSendFailure. failure partition servers: {}", map);
        try {
            RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure = this.managerClientSupplier.get().reassignOnBlockSendFailure(new RssReassignOnBlockSendFailureRequest(this.shuffleId, map, SparkEnv.get().executorId(), this.taskContext.taskAttemptId(), this.taskContext.stageId(), this.taskContext.stageAttemptNumber(), z));
            if (reassignOnBlockSendFailure.getStatusCode() != StatusCode.SUCCESS) {
                throw new RssException(String.format("Reassign failed. statusCode: %s, msg: %s", reassignOnBlockSendFailure.getStatusCode(), reassignOnBlockSendFailure.getMessage()));
            }
            MutableShuffleHandleInfo fromProto = MutableShuffleHandleInfo.fromProto(reassignOnBlockSendFailure.getHandle());
            this.taskAttemptAssignment.update(fromProto);
            LOG.info("Success to reassign. The latest available assignment is {}", fromProto.getAvailablePartitionServersForWriter());
        } catch (Exception e) {
            throw new RssException("Errors on reassign on block send failure. failure partition->servers : " + map, e);
        }
    }

    private void reassignAndResendBlocks(Set<TrackingBlockStatus> set) {
        ArrayList newArrayList = Lists.newArrayList();
        Map map = (Map) set.stream().collect(Collectors.groupingBy(trackingBlockStatus -> {
            return Integer.valueOf(trackingBlockStatus.getShuffleBlockInfo().getPartitionId());
        }));
        HashMap hashMap = new HashMap();
        for (Map.Entry<K, V> entry : map.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            for (Map.Entry<K, V> entry2 : ((Map) ((Map) ((List) entry.getValue()).stream().collect(Collectors.groupingBy(trackingBlockStatus2 -> {
                return trackingBlockStatus2.getShuffleServerInfo();
            }))).entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry3 -> {
                return (TrackingBlockStatus) ((List) entry3.getValue()).stream().findFirst().get();
            }))).entrySet()) {
                String id = ((ShuffleServerInfo) entry2.getKey()).getId();
                if (id.equals(getPartitionAssignedServers(intValue).get(0).getId())) {
                    hashMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                        return new ArrayList();
                    }).add(new ReceivingFailureServer(id, ((TrackingBlockStatus) entry2.getValue()).getStatusCode()));
                }
            }
        }
        if (!hashMap.isEmpty()) {
            doReassignOnBlockSendFailure(hashMap, false);
        }
        for (TrackingBlockStatus trackingBlockStatus3 : set) {
            ShuffleBlockInfo shuffleBlockInfo = trackingBlockStatus3.getShuffleBlockInfo();
            ShuffleServerInfo shuffleServerInfo = getPartitionAssignedServers(shuffleBlockInfo.getPartitionId()).get(0);
            if (trackingBlockStatus3.getShuffleServerInfo().getId().equals(shuffleServerInfo.getId())) {
                throw new RssException("No available replacement server for: " + trackingBlockStatus3.getShuffleServerInfo().getId());
            }
            clearFailedBlockState(shuffleBlockInfo);
            shuffleBlockInfo.incrRetryCnt();
            shuffleBlockInfo.reassignShuffleServers(Arrays.asList(shuffleServerInfo));
            newArrayList.add(shuffleBlockInfo);
        }
        processShuffleBlockInfos(newArrayList);
        LOG.info("Failed blocks have been resent to data pusher queue since reassignment has been finished successfully");
    }

    private void clearFailedBlockState(ShuffleBlockInfo shuffleBlockInfo) {
        this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).remove(shuffleBlockInfo.getBlockId());
        shuffleBlockInfo.getShuffleServerInfos().stream().forEach(shuffleServerInfo -> {
            this.serverToPartitionToBlockIds.get(shuffleServerInfo).get(Integer.valueOf(shuffleBlockInfo.getPartitionId())).remove(Long.valueOf(shuffleBlockInfo.getBlockId()));
        });
        long[] jArr = this.partitionLengths;
        int partitionId = shuffleBlockInfo.getPartitionId();
        jArr[partitionId] = jArr[partitionId] - shuffleBlockInfo.getLength();
        this.blockIds.remove(Long.valueOf(shuffleBlockInfo.getBlockId()));
    }

    @VisibleForTesting
    protected void sendCommit() {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future submit = newSingleThreadExecutor.submit(() -> {
            return Boolean.valueOf(this.shuffleWriteClient.sendCommit(this.shuffleServersForData, this.appId, this.shuffleId, this.numMaps));
        });
        int i = 200;
        long currentTimeMillis = System.currentTimeMillis();
        while (!submit.isDone()) {
            try {
                LOG.info("Wait commit to shuffle server for task[" + this.taskAttemptId + "] cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
                Uninterruptibles.sleepUninterruptibly(i, TimeUnit.MILLISECONDS);
                i = Math.min(i * 2, 5000);
            } catch (Throwable th) {
                newSingleThreadExecutor.shutdown();
                throw th;
            }
        }
        try {
            if (!((Boolean) submit.get()).booleanValue()) {
                throw new RssException("Failed to commit task to shuffle server");
            }
            newSingleThreadExecutor.shutdown();
        } catch (InterruptedException e) {
            LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
            newSingleThreadExecutor.shutdown();
        } catch (Exception e2) {
            throw new RssException("Exception happened when get commit status", e2);
        }
    }

    @VisibleForTesting
    protected <T> int getPartition(T t) {
        int i = 0;
        if (this.shouldPartition) {
            i = this.partitioner.getPartition(t);
        }
        return i;
    }

    public Option<MapStatus> stop(boolean z) {
        try {
            if (!z) {
                Option<MapStatus> empty = Option.empty();
                if (this.blockFailSentRetryEnabled) {
                    if (!z) {
                        this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).clearAndReleaseBlockResources();
                    } else if (CollectionUtils.isNotEmpty(this.shuffleManager.getFailedBlockIds(this.taskId))) {
                        LOG.error("Errors on stopping writer due to the remaining failed blockIds. This should not happen.");
                        return Option.empty();
                    }
                }
                if (this.bufferManager != null) {
                    this.bufferManager.freeAllMemory();
                }
                if (this.shuffleManager != null) {
                    this.shuffleManager.clearTaskMeta(this.taskId);
                }
                return empty;
            }
            long currentTimeMillis = System.currentTimeMillis();
            this.shuffleWriteClient.reportShuffleResult(this.serverToPartitionToBlockIds, this.appId, this.shuffleId, this.taskAttemptId, this.bitmapSplitNum);
            LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms", new Object[]{Long.valueOf(this.taskAttemptId), Integer.valueOf(this.bitmapSplitNum), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            Option<MapStatus> apply = Option.apply(MapStatus.apply(BlockManagerId.apply(this.appId + "_" + this.taskId, DUMMY_HOST, 99999, Option.apply(Long.toString(this.taskAttemptId))), this.partitionLengths, this.taskAttemptId));
            if (this.blockFailSentRetryEnabled) {
                if (!z) {
                    this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).clearAndReleaseBlockResources();
                } else if (CollectionUtils.isNotEmpty(this.shuffleManager.getFailedBlockIds(this.taskId))) {
                    LOG.error("Errors on stopping writer due to the remaining failed blockIds. This should not happen.");
                    return Option.empty();
                }
            }
            if (this.bufferManager != null) {
                this.bufferManager.freeAllMemory();
            }
            if (this.shuffleManager != null) {
                this.shuffleManager.clearTaskMeta(this.taskId);
            }
            return apply;
        } catch (Throwable th) {
            if (this.blockFailSentRetryEnabled) {
                if (!z) {
                    this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).clearAndReleaseBlockResources();
                } else if (CollectionUtils.isNotEmpty(this.shuffleManager.getFailedBlockIds(this.taskId))) {
                    LOG.error("Errors on stopping writer due to the remaining failed blockIds. This should not happen.");
                    return Option.empty();
                }
            }
            if (this.bufferManager != null) {
                this.bufferManager.freeAllMemory();
            }
            if (this.shuffleManager != null) {
                this.shuffleManager.clearTaskMeta(this.taskId);
            }
            throw th;
        }
    }

    @VisibleForTesting
    Map<Integer, Set<Long>> getPartitionToBlockIds() {
        return (Map) this.serverToPartitionToBlockIds.values().stream().flatMap(map -> {
            return map.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (set, set2) -> {
            HashSet hashSet = new HashSet(set);
            hashSet.addAll(set2);
            return hashSet;
        }));
    }

    @VisibleForTesting
    public WriteBufferManager getBufferManager() {
        return this.bufferManager;
    }

    private void throwFetchFailedIfNecessary(Exception exc) {
        if (exc instanceof RssSendFailedException) {
            if (this.managerClientSupplier.get().reportShuffleWriteFailure(new RssReportShuffleWriteFailureRequest(this.appId, this.shuffleId, this.taskContext.stageAttemptNumber(), Lists.newArrayList(this.shuffleManager.getBlockIdsFailedSendTracker(this.taskId).getFaultyShuffleServers()), exc.getMessage())).getReSubmitWholeStage()) {
                LOG.info("Whether the reassignment is successful: {}", Boolean.valueOf(this.managerClientSupplier.get().reassignOnStageResubmit(new RssReassignServersRequest(this.taskContext.stageId(), this.taskContext.stageAttemptNumber(), this.shuffleId, this.partitioner.numPartitions())).isNeedReassign()));
                throw new RssException((Throwable) RssSparkShuffleUtils.createFetchFailedException(this.shuffleId, -1, this.taskContext.stageAttemptNumber(), exc));
            }
        }
        throw new RssException(exc);
    }

    @VisibleForTesting
    protected void enableBlockFailSentRetry() {
        this.blockFailSentRetryEnabled = true;
    }

    @VisibleForTesting
    protected void setBlockFailSentRetryMaxTimes(int i) {
        this.blockFailSentRetryMaxTimes = i;
    }

    @VisibleForTesting
    protected void setTaskId(String str) {
        this.taskId = str;
    }

    @VisibleForTesting
    protected Map<ShuffleServerInfo, Map<Integer, Set<Long>>> getServerToPartitionToBlockIds() {
        return this.serverToPartitionToBlockIds;
    }

    @VisibleForTesting
    protected RssShuffleManager getShuffleManager() {
        return this.shuffleManager;
    }

    public TaskAttemptAssignment getTaskAttemptAssignment() {
        return this.taskAttemptAssignment;
    }
}
