package org.apache.spark.shuffle.writer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.shaded.com.google.common.collect.Queues;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/shuffle/writer/DataPusher.class */
public class DataPusher implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataPusher.class);
    private final ExecutorService executorService;
    private final ShuffleWriteClient shuffleWriteClient;
    private final Map<String, Set<Long>> taskToSuccessBlockIds;
    Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker;
    private String rssAppId;
    private final Set<String> failedTaskIds;

    public DataPusher(ShuffleWriteClient shuffleWriteClient, Map<String, Set<Long>> map, Map<String, FailedBlockSendTracker> map2, Set<String> set, int i, int i2) {
        this.shuffleWriteClient = shuffleWriteClient;
        this.taskToSuccessBlockIds = map;
        this.taskToFailedBlockSendTracker = map2;
        this.failedTaskIds = set;
        this.executorService = new ThreadPoolExecutor(i, i * 2, i2, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(Integer.MAX_VALUE), ThreadUtils.getThreadFactory(getClass().getName()));
    }

    public CompletableFuture<Long> send(AddBlockEvent addBlockEvent) {
        if (this.rssAppId == null) {
            throw new RssException("RssAppId should be set.");
        }
        return CompletableFuture.supplyAsync(() -> {
            String taskId = addBlockEvent.getTaskId();
            List<ShuffleBlockInfo> shuffleDataInfoList = addBlockEvent.getShuffleDataInfoList();
            SendShuffleDataResult sendShuffleDataResult = null;
            try {
                sendShuffleDataResult = this.shuffleWriteClient.sendShuffleData(this.rssAppId, shuffleDataInfoList, () -> {
                    return Boolean.valueOf(!isValidTask(taskId));
                });
                putBlockId(this.taskToSuccessBlockIds, taskId, sendShuffleDataResult.getSuccessBlockIds());
                putFailedBlockSendTracker(this.taskToFailedBlockSendTracker, taskId, sendShuffleDataResult.getFailedBlockSendTracker());
                Set<Long> emptySet = sendShuffleDataResult.getSuccessBlockIds() == null ? Collections.emptySet() : sendShuffleDataResult.getSuccessBlockIds();
                for (ShuffleBlockInfo shuffleBlockInfo : shuffleDataInfoList) {
                    shuffleBlockInfo.executeCompletionCallback(emptySet.contains(Long.valueOf(shuffleBlockInfo.getBlockId())));
                }
                Iterator it = ((List) Optional.of(addBlockEvent.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST)).iterator();
                while (it.hasNext()) {
                    ((Runnable) it.next()).run();
                }
                return (Long) shuffleDataInfoList.stream().map(shuffleBlockInfo2 -> {
                    return Long.valueOf(shuffleBlockInfo2.getFreeMemory());
                }).reduce((l, l2) -> {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }).get();
            } catch (Throwable th) {
                Set<Long> emptySet2 = sendShuffleDataResult.getSuccessBlockIds() == null ? Collections.emptySet() : sendShuffleDataResult.getSuccessBlockIds();
                for (ShuffleBlockInfo shuffleBlockInfo3 : shuffleDataInfoList) {
                    shuffleBlockInfo3.executeCompletionCallback(emptySet2.contains(Long.valueOf(shuffleBlockInfo3.getBlockId())));
                }
                Iterator it2 = ((List) Optional.of(addBlockEvent.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST)).iterator();
                while (it2.hasNext()) {
                    ((Runnable) it2.next()).run();
                }
                throw th;
            }
        }, this.executorService).exceptionally(th -> {
            LOGGER.error("Unexpected exceptions occurred while sending shuffle data", th);
            return null;
        });
    }

    private synchronized void putBlockId(Map<String, Set<Long>> map, String str, Set<Long> set) {
        if (set == null || set.isEmpty()) {
            return;
        }
        map.computeIfAbsent(str, str2 -> {
            return Sets.newConcurrentHashSet();
        }).addAll(set);
    }

    private synchronized void putFailedBlockSendTracker(Map<String, FailedBlockSendTracker> map, String str, FailedBlockSendTracker failedBlockSendTracker) {
        if (failedBlockSendTracker == null) {
            return;
        }
        map.computeIfAbsent(str, str2 -> {
            return new FailedBlockSendTracker();
        }).merge(failedBlockSendTracker);
    }

    public boolean isValidTask(String str) {
        return !this.failedTaskIds.contains(str);
    }

    public void setRssAppId(String str) {
        this.rssAppId = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.executorService != null) {
            try {
                ThreadUtils.shutdownThreadPool(this.executorService, 5);
            } catch (InterruptedException e) {
                LOGGER.error("Errors on shutdown thread pool of [{}].", getClass().getSimpleName());
            }
        }
    }
}
