package org.apache.uniffle.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/impl/ShuffleWriteClientImpl.class */
public class ShuffleWriteClientImpl implements ShuffleWriteClient {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleWriteClientImpl.class);
    private String clientType;
    private int retryMax;
    private long retryIntervalMax;
    private List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
    private Map<String, Map<Integer, Set<ShuffleServerInfo>>> shuffleServerInfoMap = JavaUtils.newConcurrentMap();
    private CoordinatorClientFactory coordinatorClientFactory;
    private ExecutorService heartBeatExecutorService;
    private int replica;
    private int replicaWrite;
    private int replicaRead;
    private boolean replicaSkipEnabled;
    private int dataCommitPoolSize;
    private final ExecutorService dataTransferPool;
    private final int unregisterThreadPoolSize;
    private final int unregisterRequestTimeSec;
    private Set<ShuffleServerInfo> defectiveServers;
    private RssConf rssConf;
    private BlockIdLayout blockIdLayout;

    public ShuffleWriteClientImpl(ShuffleClientFactory.WriteClientBuilder writeClientBuilder) {
        this.dataCommitPoolSize = -1;
        if (writeClientBuilder.getRssConf() == null) {
            writeClientBuilder.rssConf(new RssConf());
        }
        if (writeClientBuilder.getUnregisterThreadPoolSize() == 0) {
            writeClientBuilder.unregisterThreadPoolSize(10);
        }
        if (writeClientBuilder.getUnregisterRequestTimeSec() == 0) {
            writeClientBuilder.unregisterRequestTimeSec(10);
        }
        this.clientType = writeClientBuilder.getClientType();
        this.retryMax = writeClientBuilder.getRetryMax();
        this.retryIntervalMax = writeClientBuilder.getRetryIntervalMax();
        this.coordinatorClientFactory = CoordinatorClientFactory.getInstance();
        this.heartBeatExecutorService = ThreadUtils.getDaemonFixedThreadPool(writeClientBuilder.getHeartBeatThreadNum(), "client-heartbeat");
        this.replica = writeClientBuilder.getReplica();
        this.replicaWrite = writeClientBuilder.getReplicaWrite();
        this.replicaRead = writeClientBuilder.getReplicaRead();
        this.replicaSkipEnabled = writeClientBuilder.isReplicaSkipEnabled();
        this.dataTransferPool = ThreadUtils.getDaemonFixedThreadPool(writeClientBuilder.getDataTransferPoolSize(), "client-data-transfer");
        this.dataCommitPoolSize = writeClientBuilder.getDataCommitPoolSize();
        this.unregisterThreadPoolSize = writeClientBuilder.getUnregisterThreadPoolSize();
        this.unregisterRequestTimeSec = writeClientBuilder.getUnregisterRequestTimeSec();
        if (this.replica > 1) {
            this.defectiveServers = Sets.newConcurrentHashSet();
        }
        this.rssConf = writeClientBuilder.getRssConf();
        this.blockIdLayout = BlockIdLayout.from(this.rssConf);
    }

    private boolean sendShuffleDataAsync(String str, Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> map, Map<ShuffleServerInfo, List<Long>> map2, Map<Long, AtomicInteger> map3, FailedBlockSendTracker failedBlockSendTracker, boolean z, Supplier<Boolean> supplier) {
        if (map2 == null) {
            return true;
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> entry : map.entrySet()) {
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                if (((Boolean) supplier.get()).booleanValue()) {
                    LOG.info("The upstream task has been failed. Abort this data send.");
                    return true;
                }
                ShuffleServerInfo shuffleServerInfo = (ShuffleServerInfo) entry.getKey();
                try {
                    RssSendShuffleDataRequest rssSendShuffleDataRequest = new RssSendShuffleDataRequest(str, this.retryMax, this.retryIntervalMax, (Map) entry.getValue());
                    long currentTimeMillis = System.currentTimeMillis();
                    RssSendShuffleDataResponse sendShuffleData = getShuffleServerClient(shuffleServerInfo).sendShuffleData(rssSendShuffleDataRequest);
                    String format = String.format("ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)", Integer.valueOf(((List) map2.get(shuffleServerInfo)).size()), shuffleServerInfo.getId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    if (sendShuffleData.getStatusCode() != StatusCode.SUCCESS) {
                        recordFailedBlocks(failedBlockSendTracker, map, shuffleServerInfo, sendShuffleData.getStatusCode());
                        if (this.defectiveServers != null) {
                            this.defectiveServers.add(shuffleServerInfo);
                        }
                        LOG.warn("{}, it failed wth statusCode[{}]", format, sendShuffleData.getStatusCode());
                        return false;
                    }
                    ((List) map2.get(shuffleServerInfo)).forEach(l -> {
                        ((AtomicInteger) map3.get(l)).incrementAndGet();
                    });
                    if (this.defectiveServers != null) {
                        this.defectiveServers.remove(shuffleServerInfo);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} successfully.", format);
                    }
                    return true;
                } catch (Exception e) {
                    recordFailedBlocks(failedBlockSendTracker, map, shuffleServerInfo, StatusCode.INTERNAL_ERROR);
                    if (this.defectiveServers != null) {
                        this.defectiveServers.add(shuffleServerInfo);
                    }
                    LOG.warn("Send: " + ((List) map2.get(shuffleServerInfo)).size() + " blocks to [" + shuffleServerInfo.getId() + "] failed.", e);
                    return false;
                }
            }, this.dataTransferPool));
        }
        boolean waitUntilDoneOrFail = ClientUtils.waitUntilDoneOrFail(arrayList, z);
        if (!waitUntilDoneOrFail) {
            LOG.error("Some shuffle data can't be sent to shuffle-server, is fast fail: {}, cancelled task size: {}", Boolean.valueOf(z), Integer.valueOf(arrayList.size()));
        }
        return waitUntilDoneOrFail;
    }

    void recordFailedBlocks(FailedBlockSendTracker failedBlockSendTracker, Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> map, ShuffleServerInfo shuffleServerInfo, StatusCode statusCode) {
        map.getOrDefault(shuffleServerInfo, Collections.emptyMap()).values().stream().flatMap(map2 -> {
            return map2.values().stream();
        }).flatMap((v0) -> {
            return v0.stream();
        }).forEach(shuffleBlockInfo -> {
            failedBlockSendTracker.add(shuffleBlockInfo, shuffleServerInfo, statusCode);
        });
    }

    void genServerToBlocks(ShuffleBlockInfo shuffleBlockInfo, List<ShuffleServerInfo> list, int i, Collection<ShuffleServerInfo> collection, Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> map, Map<ShuffleServerInfo, List<Long>> map2, boolean z) {
        Stream<ShuffleServerInfo> stream;
        if (i <= 0) {
            return;
        }
        if (z && CollectionUtils.isNotEmpty(this.defectiveServers)) {
            Stream<ShuffleServerInfo> filter = list.stream().filter(shuffleServerInfo -> {
                return !this.defectiveServers.contains(shuffleServerInfo);
            });
            Stream<ShuffleServerInfo> stream2 = list.stream();
            Set<ShuffleServerInfo> set = this.defectiveServers;
            set.getClass();
            stream = Stream.concat(filter, stream2.filter((v1) -> {
                return r2.contains(v1);
            }));
        } else {
            stream = list.stream();
        }
        if (collection != null) {
            stream = stream.filter(shuffleServerInfo2 -> {
                return !collection.contains(shuffleServerInfo2);
            });
        }
        Stream<ShuffleServerInfo> limit = stream.limit(i);
        if (collection != null) {
            collection.getClass();
            limit = limit.peek((v1) -> {
                r1.add(v1);
            });
        }
        limit.forEach(shuffleServerInfo3 -> {
            ((List) map2.computeIfAbsent(shuffleServerInfo3, shuffleServerInfo3 -> {
                return Lists.newArrayList();
            })).add(Long.valueOf(shuffleBlockInfo.getBlockId()));
            ((List) ((Map) ((Map) map.computeIfAbsent(shuffleServerInfo3, shuffleServerInfo4 -> {
                return Maps.newHashMap();
            })).computeIfAbsent(Integer.valueOf(shuffleBlockInfo.getShuffleId()), num -> {
                return Maps.newHashMap();
            })).computeIfAbsent(Integer.valueOf(shuffleBlockInfo.getPartitionId()), num2 -> {
                return Lists.newArrayList();
            })).add(shuffleBlockInfo);
        });
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public SendShuffleDataResult sendShuffleData(String str, List<ShuffleBlockInfo> list, Supplier<Boolean> supplier) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        HashMap newHashMap4 = Maps.newHashMap();
        for (ShuffleBlockInfo shuffleBlockInfo : list) {
            List<ShuffleServerInfo> shuffleServerInfos = shuffleBlockInfo.getShuffleServerInfos();
            if (this.replicaSkipEnabled) {
                HashSet newHashSet = Sets.newHashSet();
                genServerToBlocks(shuffleBlockInfo, shuffleServerInfos, this.replicaWrite, newHashSet, newHashMap, newHashMap3, true);
                genServerToBlocks(shuffleBlockInfo, shuffleServerInfos, this.replica - this.replicaWrite, newHashSet, newHashMap2, newHashMap4, false);
            } else {
                genServerToBlocks(shuffleBlockInfo, shuffleServerInfos, shuffleServerInfos.size(), null, newHashMap, newHashMap3, false);
            }
        }
        HashMap newHashMap5 = Maps.newHashMap();
        newHashMap3.values().forEach(list2 -> {
            list2.forEach(l -> {
            });
        });
        newHashMap4.values().forEach(list3 -> {
            list3.forEach(l -> {
            });
        });
        FailedBlockSendTracker failedBlockSendTracker = new FailedBlockSendTracker();
        if (!sendShuffleDataAsync(str, newHashMap, newHashMap3, newHashMap5, failedBlockSendTracker, newHashMap2.isEmpty(), supplier) && !newHashMap2.isEmpty() && !supplier.get().booleanValue()) {
            LOG.info("The sending of primary round is failed partially, so start the secondary round");
            sendShuffleDataAsync(str, newHashMap2, newHashMap4, newHashMap5, failedBlockSendTracker, true, supplier);
        }
        HashSet newHashSet2 = Sets.newHashSet();
        newHashMap5.entrySet().forEach(entry -> {
            if (((AtomicInteger) entry.getValue()).get() >= this.replicaWrite) {
                newHashSet2.add(entry.getKey());
                failedBlockSendTracker.remove(((Long) entry.getKey()).longValue());
            }
        });
        return new SendShuffleDataResult(newHashSet2, failedBlockSendTracker);
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public boolean sendCommit(Set<ShuffleServerInfo> set, String str, int i, int i2) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(this.dataCommitPoolSize == -1 ? set.size() : this.dataCommitPoolSize);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            forkJoinPool.submit(() -> {
                set.parallelStream().forEach(shuffleServerInfo -> {
                    RssSendCommitRequest rssSendCommitRequest = new RssSendCommitRequest(str, i);
                    String str2 = "Failed to commit shuffle data to " + shuffleServerInfo + " for shuffleId[" + i + "]";
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        RssSendCommitResponse sendCommit = getShuffleServerClient(shuffleServerInfo).sendCommit(rssSendCommitRequest);
                        if (sendCommit.getStatusCode() != StatusCode.SUCCESS) {
                            String str3 = str2 + " with statusCode " + sendCommit.getStatusCode();
                            LOG.error(str3);
                            throw new Exception(str3);
                        }
                        int commitCount = sendCommit.getCommitCount();
                        LOG.info("Successfully sendCommit for appId[" + str + "], shuffleId[" + i + "] to ShuffleServer[" + shuffleServerInfo.getId() + "], cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms, got committed maps[" + commitCount + "], map number of stage is " + i2);
                        if (commitCount >= i2) {
                            RssFinishShuffleResponse finishShuffle = getShuffleServerClient(shuffleServerInfo).finishShuffle(new RssFinishShuffleRequest(str, i));
                            if (finishShuffle.getStatusCode() != StatusCode.SUCCESS) {
                                String str4 = "Failed to finish shuffle to " + shuffleServerInfo + " for shuffleId[" + i + "] with statusCode " + finishShuffle.getStatusCode();
                                LOG.error(str4);
                                throw new Exception(str4);
                            }
                            LOG.info("Successfully finish shuffle to " + shuffleServerInfo + " for shuffleId[" + i + "]");
                        }
                        atomicInteger.incrementAndGet();
                    } catch (Exception e) {
                        LOG.error(str2, e);
                    }
                });
            }).join();
            forkJoinPool.shutdownNow();
            return atomicInteger.get() == set.size();
        } catch (Throwable th) {
            forkJoinPool.shutdownNow();
            throw th;
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void registerShuffle(ShuffleServerInfo shuffleServerInfo, String str, int i, List<PartitionRange> list, RemoteStorageInfo remoteStorageInfo, ShuffleDataDistributionType shuffleDataDistributionType, int i2) {
        String str2 = null;
        try {
            str2 = UserGroupInformation.getCurrentUser().getShortUserName();
        } catch (Exception e) {
            LOG.error("Error on getting user from ugi.", e);
        }
        throwExceptionIfNecessary(getShuffleServerClient(shuffleServerInfo).registerShuffle(new RssRegisterShuffleRequest(str, i, list, remoteStorageInfo, str2, shuffleDataDistributionType, i2)), "Error happened when registerShuffle with appId[" + str + "], shuffleId[" + i + "], " + shuffleServerInfo);
        addShuffleServer(str, i, shuffleServerInfo);
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void registerCoordinators(String str) {
        this.coordinatorClients.addAll(this.coordinatorClientFactory.createCoordinatorClient(ClientType.valueOf(this.clientType), str));
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public Map<String, String> fetchClientConf(int i) {
        RssFetchClientConfResponse rssFetchClientConfResponse = new RssFetchClientConfResponse(StatusCode.INTERNAL_ERROR, "Empty coordinator clients");
        Iterator<CoordinatorClient> it = this.coordinatorClients.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            CoordinatorClient next = it.next();
            rssFetchClientConfResponse = next.fetchClientConf(new RssFetchClientConfRequest(i));
            if (rssFetchClientConfResponse.getStatusCode() == StatusCode.SUCCESS) {
                LOG.info("Success to get conf from {}", next.getDesc());
                break;
            }
            LOG.warn("Fail to get conf from {}", next.getDesc());
        }
        return rssFetchClientConfResponse.getClientConf();
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public RemoteStorageInfo fetchRemoteStorage(String str) {
        RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo("");
        Iterator<CoordinatorClient> it = this.coordinatorClients.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            CoordinatorClient next = it.next();
            RssFetchRemoteStorageResponse fetchRemoteStorage = next.fetchRemoteStorage(new RssFetchRemoteStorageRequest(str));
            if (fetchRemoteStorage.getStatusCode() == StatusCode.SUCCESS) {
                remoteStorageInfo = fetchRemoteStorage.getRemoteStorageInfo();
                LOG.info("Success to get storage {} from {}", remoteStorageInfo, next.getDesc());
                break;
            }
            LOG.warn("Fail to get conf from {}", next.getDesc());
        }
        return remoteStorageInfo;
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public ShuffleAssignmentsInfo getShuffleAssignments(String str, int i, int i2, int i3, Set<String> set, int i4, int i5) {
        return getShuffleAssignments(str, i, i2, i3, set, i4, i5, Sets.newConcurrentHashSet());
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public ShuffleAssignmentsInfo getShuffleAssignments(String str, int i, int i2, int i3, Set<String> set, int i4, int i5, Set<String> set2) {
        RssGetShuffleAssignmentsRequest rssGetShuffleAssignmentsRequest = new RssGetShuffleAssignmentsRequest(str, i, i2, i3, this.replica, set, i4, i5, set2);
        RssGetShuffleAssignmentsResponse rssGetShuffleAssignmentsResponse = new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
        Iterator<CoordinatorClient> it = this.coordinatorClients.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            CoordinatorClient next = it.next();
            try {
                rssGetShuffleAssignmentsResponse = next.getShuffleAssignments(rssGetShuffleAssignmentsRequest);
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
            if (rssGetShuffleAssignmentsResponse.getStatusCode() == StatusCode.SUCCESS) {
                LOG.info("Success to get shuffle server assignment from {}", next.getDesc());
                break;
            }
        }
        throwExceptionIfNecessary(rssGetShuffleAssignmentsResponse, "Error happened when getShuffleAssignments with appId[" + str + "], shuffleId[" + i + "], numMaps[" + i2 + "], partitionNumPerRange[" + i3 + "] to coordinator. Error message: " + rssGetShuffleAssignmentsResponse.getMessage());
        return new ShuffleAssignmentsInfo(rssGetShuffleAssignmentsResponse.getPartitionToServers(), rssGetShuffleAssignmentsResponse.getServerToPartitionRanges());
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void reportShuffleResult(Map<ShuffleServerInfo, Map<Integer, Set<Long>>> map, String str, int i, long j, int i2) {
        Map<Long, Integer> createBlockReportTracker = createBlockReportTracker(map);
        for (Map.Entry<ShuffleServerInfo, Map<Integer, Set<Long>>> entry : map.entrySet()) {
            Map<Integer, Set<Long>> value = entry.getValue();
            if (!value.isEmpty()) {
                RssReportShuffleResultRequest rssReportShuffleResultRequest = new RssReportShuffleResultRequest(str, i, j, (Map) value.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return new ArrayList((Collection) entry2.getValue());
                })), i2);
                ShuffleServerInfo key = entry.getKey();
                try {
                    RssReportShuffleResultResponse reportShuffleResult = getShuffleServerClient(key).reportShuffleResult(rssReportShuffleResultRequest);
                    if (reportShuffleResult.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.info("Report shuffle result to " + key + " for appId[" + str + "], shuffleId[" + i + "] successfully");
                    } else {
                        LOG.warn("Report shuffle result to " + key + " for appId[" + str + "], shuffleId[" + i + "] failed with " + reportShuffleResult.getStatusCode());
                        recordFailedBlockIds(createBlockReportTracker, value);
                    }
                } catch (Exception e) {
                    LOG.warn("Report shuffle result is failed to " + key + " for appId[" + str + "], shuffleId[" + i + "]");
                    recordFailedBlockIds(createBlockReportTracker, value);
                }
            }
        }
        if (createBlockReportTracker.values().stream().anyMatch(num -> {
            return num.intValue() < this.replicaWrite;
        })) {
            throw new RssException("Quorum check of report shuffle result is failed for appId[" + str + "], shuffleId[" + i + "]");
        }
    }

    private void recordFailedBlockIds(Map<Long, Integer> map, Map<Integer, Set<Long>> map2) {
        map2.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach(l -> {
        });
    }

    private Map<Long, Integer> createBlockReportTracker(Map<ShuffleServerInfo, Map<Integer, Set<Long>>> map) {
        HashMap hashMap = new HashMap();
        Iterator<Map<Integer, Set<Long>>> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator<Set<Long>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                for (Long l : it2.next()) {
                    hashMap.put(l, Integer.valueOf(((Integer) hashMap.getOrDefault(l, 0)).intValue() + 1));
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public Roaring64NavigableMap getShuffleResult(String str, Set<ShuffleServerInfo> set, String str2, int i, int i2) {
        RssGetShuffleResultRequest rssGetShuffleResultRequest = new RssGetShuffleResultRequest(str2, i, i2, this.blockIdLayout);
        boolean z = false;
        Roaring64NavigableMap bitmapOf = Roaring64NavigableMap.bitmapOf(new long[0]);
        int i3 = 0;
        Iterator<ShuffleServerInfo> it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ShuffleServerInfo next = it.next();
            try {
                RssGetShuffleResultResponse shuffleResult = getShuffleServerClient(next).getShuffleResult(rssGetShuffleResultRequest);
                if (shuffleResult.getStatusCode() == StatusCode.SUCCESS) {
                    bitmapOf.or(shuffleResult.getBlockIdBitmap());
                    i3++;
                    if (i3 >= this.replicaRead) {
                        z = true;
                        break;
                    }
                } else {
                    continue;
                }
            } catch (Exception e) {
                LOG.warn("Get shuffle result is failed from " + next + " for appId[" + str2 + "], shuffleId[" + i + "]");
            }
        }
        if (z) {
            return bitmapOf;
        }
        throw new RssFetchFailedException("Get shuffle result is failed for appId[" + str2 + "], shuffleId[" + i + "]");
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public Roaring64NavigableMap getShuffleResultForMultiPart(String str, Map<ShuffleServerInfo, Set<Integer>> map, String str2, int i, Set<Integer> set) {
        HashMap newHashMap = Maps.newHashMap();
        Roaring64NavigableMap bitmapOf = Roaring64NavigableMap.bitmapOf(new long[0]);
        for (Map.Entry<ShuffleServerInfo, Set<Integer>> entry : map.entrySet()) {
            ShuffleServerInfo key = entry.getKey();
            HashSet<Integer> newHashSet = Sets.newHashSet();
            for (Integer num : entry.getValue()) {
                newHashMap.putIfAbsent(num, 0);
                if (((Integer) newHashMap.get(num)).intValue() < this.replicaRead) {
                    newHashSet.add(num);
                }
            }
            try {
                RssGetShuffleResultResponse shuffleResultForMultiPart = getShuffleServerClient(key).getShuffleResultForMultiPart(new RssGetShuffleResultForMultiPartRequest(str2, i, newHashSet, this.blockIdLayout));
                if (shuffleResultForMultiPart.getStatusCode() == StatusCode.SUCCESS) {
                    bitmapOf.or(shuffleResultForMultiPart.getBlockIdBitmap());
                    for (Integer num2 : newHashSet) {
                        newHashMap.put(num2, Integer.valueOf(((Integer) newHashMap.get(num2)).intValue() + 1));
                    }
                }
            } catch (Exception e) {
                set.addAll(newHashSet);
                LOG.warn("Get shuffle result is failed from " + key + " for appId[" + str2 + "], shuffleId[" + i + "], requestPartitions" + newHashSet);
            }
        }
        if (newHashMap.entrySet().stream().allMatch(entry2 -> {
            return ((Integer) entry2.getValue()).intValue() >= this.replicaRead;
        })) {
            return bitmapOf;
        }
        throw new RssFetchFailedException("Get shuffle result is failed for appId[" + str2 + "], shuffleId[" + i + "]");
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void registerApplicationInfo(String str, long j, String str2) {
        RssApplicationInfoRequest rssApplicationInfoRequest = new RssApplicationInfoRequest(str, j, str2);
        ThreadUtils.executeTasks(this.heartBeatExecutorService, this.coordinatorClients, coordinatorClient -> {
            try {
                if (coordinatorClient.registerApplicationInfo(rssApplicationInfoRequest).getStatusCode() != StatusCode.SUCCESS) {
                    LOG.error("Failed to send applicationInfo to " + coordinatorClient.getDesc());
                } else {
                    LOG.info("Successfully send applicationInfo to " + coordinatorClient.getDesc());
                }
                return null;
            } catch (Exception e) {
                LOG.warn("Error happened when send applicationInfo to " + coordinatorClient.getDesc(), e);
                return null;
            }
        }, j, "register application");
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void sendAppHeartbeat(String str, long j) {
        RssAppHeartBeatRequest rssAppHeartBeatRequest = new RssAppHeartBeatRequest(str, j);
        ThreadUtils.executeTasks(this.heartBeatExecutorService, getAllShuffleServers(str), shuffleServerInfo -> {
            try {
                if (ShuffleServerClientFactory.getInstance().getShuffleServerClient(this.clientType, shuffleServerInfo, this.rssConf).sendHeartBeat(rssAppHeartBeatRequest).getStatusCode() != StatusCode.SUCCESS) {
                    LOG.warn("Failed to send heartbeat to " + shuffleServerInfo);
                }
                return null;
            } catch (Exception e) {
                LOG.warn("Error happened when send heartbeat to " + shuffleServerInfo, e);
                return null;
            }
        }, j, "send heartbeat to shuffle server");
        ThreadUtils.executeTasks(this.heartBeatExecutorService, this.coordinatorClients, coordinatorClient -> {
            try {
                if (coordinatorClient.sendAppHeartBeat(rssAppHeartBeatRequest).getStatusCode() != StatusCode.SUCCESS) {
                    LOG.warn("Failed to send heartbeat to " + coordinatorClient.getDesc());
                } else {
                    LOG.info("Successfully send heartbeat to " + coordinatorClient.getDesc());
                }
                return null;
            } catch (Exception e) {
                LOG.warn("Error happened when send heartbeat to " + coordinatorClient.getDesc(), e);
                return null;
            }
        }, j, "send heartbeat to coordinator");
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void close() {
        this.heartBeatExecutorService.shutdownNow();
        this.coordinatorClients.forEach((v0) -> {
            v0.close();
        });
        this.dataTransferPool.shutdownNow();
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void unregisterShuffle(String str, int i) {
        Set<ShuffleServerInfo> set;
        int i2 = this.unregisterRequestTimeSec * 1000;
        RssUnregisterShuffleRequest rssUnregisterShuffleRequest = new RssUnregisterShuffleRequest(str, i);
        Map<Integer, Set<ShuffleServerInfo>> map = this.shuffleServerInfoMap.get(str);
        if (map == null || (set = map.get(Integer.valueOf(i))) == null) {
            return;
        }
        ExecutorService executorService = null;
        try {
            executorService = ThreadUtils.getDaemonFixedThreadPool(Math.min(this.unregisterThreadPoolSize, set.size()), "unregister-shuffle");
            ThreadUtils.executeTasks(executorService, set, shuffleServerInfo -> {
                try {
                    if (ShuffleServerClientFactory.getInstance().getShuffleServerClient(this.clientType, shuffleServerInfo, this.rssConf).unregisterShuffle(rssUnregisterShuffleRequest).getStatusCode() != StatusCode.SUCCESS) {
                        LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
                    }
                    return null;
                } catch (Exception e) {
                    LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
                    return null;
                }
            }, i2, "unregister shuffle server");
            if (executorService != null) {
                executorService.shutdownNow();
            }
            removeShuffleServer(str, i);
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdownNow();
            }
            removeShuffleServer(str, i);
            throw th;
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleWriteClient
    public void unregisterShuffle(String str) {
        int i = this.unregisterRequestTimeSec * 1000;
        RssUnregisterShuffleByAppIdRequest rssUnregisterShuffleByAppIdRequest = new RssUnregisterShuffleByAppIdRequest(str);
        if (str == null || this.shuffleServerInfoMap.get(str) == null) {
            return;
        }
        Set<ShuffleServerInfo> allShuffleServers = getAllShuffleServers(str);
        ExecutorService executorService = null;
        try {
            executorService = ThreadUtils.getDaemonFixedThreadPool(Math.min(this.unregisterThreadPoolSize, allShuffleServers.size()), "unregister-shuffle");
            ThreadUtils.executeTasks(executorService, allShuffleServers, shuffleServerInfo -> {
                try {
                    if (ShuffleServerClientFactory.getInstance().getShuffleServerClient(this.clientType, shuffleServerInfo, this.rssConf).unregisterShuffleByAppId(rssUnregisterShuffleByAppIdRequest).getStatusCode() != StatusCode.SUCCESS) {
                        LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
                    }
                    return null;
                } catch (Exception e) {
                    LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
                    return null;
                }
            }, i, "unregister shuffle server");
            if (executorService != null) {
                executorService.shutdownNow();
            }
            this.shuffleServerInfoMap.remove(str);
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdownNow();
            }
            this.shuffleServerInfoMap.remove(str);
            throw th;
        }
    }

    private void throwExceptionIfNecessary(ClientResponse clientResponse, String str) {
        if (clientResponse == null || clientResponse.getStatusCode() == StatusCode.SUCCESS) {
            return;
        }
        LOG.error(str);
        throw new RssException(str);
    }

    Set<ShuffleServerInfo> getAllShuffleServers(String str) {
        Map<Integer, Set<ShuffleServerInfo>> map = this.shuffleServerInfoMap.get(str);
        if (map == null) {
            return Collections.emptySet();
        }
        HashSet newHashSet = Sets.newHashSet();
        Collection<Set<ShuffleServerInfo>> values = map.values();
        newHashSet.getClass();
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
        return newHashSet;
    }

    @VisibleForTesting
    public ShuffleServerClient getShuffleServerClient(ShuffleServerInfo shuffleServerInfo) {
        return ShuffleServerClientFactory.getInstance().getShuffleServerClient(this.clientType, shuffleServerInfo, this.rssConf);
    }

    @VisibleForTesting
    Set<ShuffleServerInfo> getDefectiveServers() {
        return this.defectiveServers;
    }

    void addShuffleServer(String str, int i, ShuffleServerInfo shuffleServerInfo) {
        Map<Integer, Set<ShuffleServerInfo>> map = this.shuffleServerInfoMap.get(str);
        if (map == null) {
            map = JavaUtils.newConcurrentMap();
            this.shuffleServerInfoMap.put(str, map);
        }
        Set<ShuffleServerInfo> set = map.get(Integer.valueOf(i));
        if (set == null) {
            set = Sets.newConcurrentHashSet();
            map.put(Integer.valueOf(i), set);
        }
        set.add(shuffleServerInfo);
    }

    @VisibleForTesting
    void removeShuffleServer(String str, int i) {
        Map<Integer, Set<ShuffleServerInfo>> map = this.shuffleServerInfoMap.get(str);
        if (map != null) {
            map.remove(Integer.valueOf(i));
        }
    }
}
