package org.apache.uniffle.client.impl.grpc;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RetryableRequest;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.com.google.common.collect.Lists;
import org.apache.uniffle.com.google.protobuf.ByteString;
import org.apache.uniffle.com.google.protobuf.UnsafeByteOperations;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.io.netty.buffer.Unpooled;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.ShuffleServerGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.class */
public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServerClient {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcClient.class);
    protected static final long FAILED_REQUIRE_ID = -1;
    protected long rpcTimeout;
    private ShuffleServerGrpc.ShuffleServerBlockingStub blockingStub;
    protected Random random;
    protected static final int BACK_OFF_BASE = 2000;

    @VisibleForTesting
    public ShuffleServerGrpcClient(String str, int i) {
        this(str, i, RssClientConf.RPC_MAX_ATTEMPTS.defaultValue().intValue(), RssClientConf.RPC_TIMEOUT_MS.defaultValue().longValue());
    }

    public ShuffleServerGrpcClient(RssConf rssConf, String str, int i) {
        this(str, i, rssConf == null ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue().intValue() : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS), rssConf == null ? RssClientConf.RPC_TIMEOUT_MS.defaultValue().longValue() : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
    }

    public ShuffleServerGrpcClient(String str, int i, int i2, long j) {
        this(str, i, i2, j, true);
    }

    public ShuffleServerGrpcClient(String str, int i, int i2, long j, boolean z) {
        super(str, i, i2, z);
        this.random = new Random();
        this.blockingStub = ShuffleServerGrpc.newBlockingStub(this.channel);
        this.rpcTimeout = j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public ShuffleServerGrpc.ShuffleServerBlockingStub getBlockingStub() {
        return (ShuffleServerGrpc.ShuffleServerBlockingStub) this.blockingStub.withDeadlineAfter(this.rpcTimeout, TimeUnit.MILLISECONDS);
    }

    private RssProtos.ShuffleRegisterResponse doRegisterShuffle(String str, int i, List<PartitionRange> list, RemoteStorageInfo remoteStorageInfo, String str2, ShuffleDataDistributionType shuffleDataDistributionType, int i2) {
        RssProtos.ShuffleRegisterRequest.Builder newBuilder = RssProtos.ShuffleRegisterRequest.newBuilder();
        newBuilder.setAppId(str).setShuffleId(i).setUser(str2).setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(shuffleDataDistributionType.name())).setMaxConcurrencyPerPartitionToWrite(i2).addAllPartitionRanges(toShufflePartitionRanges(list));
        RssProtos.RemoteStorage.Builder newBuilder2 = RssProtos.RemoteStorage.newBuilder();
        newBuilder2.setPath(remoteStorageInfo.getPath());
        Map<String, String> confItems = remoteStorageInfo.getConfItems();
        if (!confItems.isEmpty()) {
            RssProtos.RemoteStorageConfItem.Builder newBuilder3 = RssProtos.RemoteStorageConfItem.newBuilder();
            for (Map.Entry<String, String> entry : confItems.entrySet()) {
                newBuilder3.setKey(entry.getKey()).setValue(entry.getValue());
                newBuilder2.addRemoteStorageConf(newBuilder3.build());
            }
        }
        newBuilder.setRemoteStorage(newBuilder2.build());
        return getBlockingStub().registerShuffle(newBuilder.build());
    }

    private RssProtos.ShuffleCommitResponse doSendCommit(String str, int i) {
        RssProtos.ShuffleCommitRequest build = RssProtos.ShuffleCommitRequest.newBuilder().setAppId(str).setShuffleId(i).build();
        int i2 = 0;
        while (i2 <= this.maxRetryAttempts) {
            try {
                return getBlockingStub().commitShuffleTask(build);
            } catch (Exception e) {
                i2++;
                LOG.warn("Send commit to host[" + this.host + "], port[" + this.port + "] failed, try again, retryNum[" + i2 + "]", e);
            }
        }
        throw new RssException("Send commit to host[" + this.host + "], port[" + this.port + "] failed");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private RssProtos.AppHeartBeatResponse doSendHeartBeat(String str, long j) {
        return ((ShuffleServerGrpc.ShuffleServerBlockingStub) this.blockingStub.withDeadlineAfter(j, TimeUnit.MILLISECONDS)).appHeartbeat(RssProtos.AppHeartBeatRequest.newBuilder().setAppId(str).build());
    }

    @VisibleForTesting
    public long requirePreAllocation(String str, int i, int i2, long j) throws Exception {
        return requirePreAllocation(str, 0, Collections.emptyList(), i, i2, j);
    }

    public long requirePreAllocation(String str, int i, List<Integer> list, int i2, int i3, long j) {
        RssProtos.RequireBufferRequest build = RssProtos.RequireBufferRequest.newBuilder().setShuffleId(i).addAllPartitionIds(list).setAppId(str).setRequireSize(i2).build();
        long currentTimeMillis = System.currentTimeMillis();
        int i4 = 0;
        long j2 = -1;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Requiring buffer for appId: {}, shuffleId: {}, partitionIds: {} with {} bytes from {}:{}", new Object[]{str, Integer.valueOf(i), list, Integer.valueOf(i2), this.host, Integer.valueOf(this.port)});
        }
        while (true) {
            try {
                RssProtos.RequireBufferResponse requireBuffer = getBlockingStub().requireBuffer(build);
                if (requireBuffer.getStatus() != RssProtos.StatusCode.NO_BUFFER && requireBuffer.getStatus() != RssProtos.StatusCode.NO_BUFFER_FOR_HUGE_PARTITION) {
                    if (requireBuffer.getStatus() == RssProtos.StatusCode.SUCCESS) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Require preAllocated size of {} from {}:{}, cost: {}(ms)", new Object[]{Integer.valueOf(i2), this.host, Integer.valueOf(this.port), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                        }
                        j2 = requireBuffer.getRequireBufferId();
                    } else if (requireBuffer.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
                        throw new NotRetryException("Can't require " + i2 + " bytes from " + this.host + ":" + this.port + ", statusCode=" + requireBuffer.getStatus() + ", errorMsg:" + requireBuffer.getRetMsg());
                    }
                    return j2;
                }
                if (i4 >= i3) {
                    LOG.warn("ShuffleServer " + this.host + ":" + this.port + " is full and can't send shuffle data successfully due to " + requireBuffer.getStatus() + " after retry " + i3 + " times, cost: {}(ms)", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    return -1L;
                }
                try {
                    LOG.info("Can't require buffer for appId: {}, shuffleId: {}, partitionIds: {} with {} bytes from {}:{} due to {}, sleep and try[{}] again", new Object[]{str, Integer.valueOf(i), list, Integer.valueOf(i2), this.host, Integer.valueOf(this.port), requireBuffer.getStatus(), Integer.valueOf(i4)});
                    Thread.sleep(Math.min(j, (2000 * (1 << Math.min(i4, 16))) + this.random.nextInt(BACK_OFF_BASE)));
                } catch (Exception e) {
                    LOG.warn("Exception happened when requiring pre-allocated buffer from {}:{}", new Object[]{this.host, Integer.valueOf(this.port), e});
                }
                i4++;
            } catch (Exception e2) {
                LOG.error("Exception happened when requiring pre-allocated buffer from {}:{}", new Object[]{this.host, Integer.valueOf(this.port), e2});
                return -1L;
            }
        }
    }

    private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId(String str) {
        return this.blockingStub.unregisterShuffleByAppId(RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(str).build());
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(RssUnregisterShuffleByAppIdRequest rssUnregisterShuffleByAppIdRequest) {
        RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId = doUnregisterShuffleByAppId(rssUnregisterShuffleByAppIdRequest.getAppId());
        switch (doUnregisterShuffleByAppId.getStatus()) {
            case SUCCESS:
                return new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS);
            default:
                String format = String.format("Errors on unregister app to %s:%s for appId[%s], error: %s", this.host, Integer.valueOf(this.port), rssUnregisterShuffleByAppIdRequest.getAppId(), doUnregisterShuffleByAppId.getRetMsg());
                LOG.error(format);
                throw new RssException(format);
        }
    }

    private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String str, int i) {
        return this.blockingStub.unregisterShuffle(RssProtos.ShuffleUnregisterRequest.newBuilder().setAppId(str).setShuffleId(i).build());
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleRequest rssUnregisterShuffleRequest) {
        RssProtos.ShuffleUnregisterResponse doUnregisterShuffle = doUnregisterShuffle(rssUnregisterShuffleRequest.getAppId(), rssUnregisterShuffleRequest.getShuffleId());
        switch (doUnregisterShuffle.getStatus()) {
            case SUCCESS:
                return new RssUnregisterShuffleResponse(StatusCode.SUCCESS);
            default:
                String format = String.format("Errors on unregister shuffle to %s:%s for appId[%s].shuffleId[%], error: %s", this.host, Integer.valueOf(this.port), rssUnregisterShuffleRequest.getAppId(), Integer.valueOf(rssUnregisterShuffleRequest.getShuffleId()), doUnregisterShuffle.getRetMsg());
                LOG.error(format);
                throw new RssException(format);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest rssRegisterShuffleRequest) {
        RssProtos.ShuffleRegisterResponse doRegisterShuffle = doRegisterShuffle(rssRegisterShuffleRequest.getAppId(), rssRegisterShuffleRequest.getShuffleId(), rssRegisterShuffleRequest.getPartitionRanges(), rssRegisterShuffleRequest.getRemoteStorageInfo(), rssRegisterShuffleRequest.getUser(), rssRegisterShuffleRequest.getDataDistributionType(), rssRegisterShuffleRequest.getMaxConcurrencyPerPartitionToWrite());
        switch (doRegisterShuffle.getStatus()) {
            case SUCCESS:
                return new RssRegisterShuffleResponse(StatusCode.SUCCESS);
            default:
                String str = "Can't register shuffle to " + this.host + ":" + this.port + " for appId[" + rssRegisterShuffleRequest.getAppId() + "], shuffleId[" + rssRegisterShuffleRequest.getShuffleId() + "], errorMsg:" + doRegisterShuffle.getRetMsg();
                LOG.error(str);
                throw new RssException(str);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest rssSendShuffleDataRequest) {
        String appId = rssSendShuffleDataRequest.getAppId();
        Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = rssSendShuffleDataRequest.getShuffleIdToBlocks();
        boolean z = true;
        AtomicReference atomicReference = new AtomicReference(StatusCode.INTERNAL_ERROR);
        for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> entry : shuffleIdToBlocks.entrySet()) {
            ArrayList newArrayList = Lists.newArrayList();
            int i = 0;
            int i2 = 0;
            int intValue = entry.getKey().intValue();
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry2 : entry.getValue().entrySet()) {
                ArrayList newArrayList2 = Lists.newArrayList();
                for (ShuffleBlockInfo shuffleBlockInfo : entry2.getValue()) {
                    newArrayList2.add(RssProtos.ShuffleBlock.newBuilder().setBlockId(shuffleBlockInfo.getBlockId()).setCrc(shuffleBlockInfo.getCrc()).setLength(shuffleBlockInfo.getLength()).setTaskAttemptId(shuffleBlockInfo.getTaskAttemptId()).setUncompressLength(shuffleBlockInfo.getUncompressLength()).setData(UnsafeByteOperations.unsafeWrap(shuffleBlockInfo.getData().nioBuffer())).build());
                    i += shuffleBlockInfo.getSize();
                    i2++;
                }
                newArrayList.add(RssProtos.ShuffleData.newBuilder().setPartitionId(entry2.getKey().intValue()).addAllBlock(newArrayList2).build());
                arrayList.add(entry2.getKey());
            }
            int i3 = i;
            int i4 = i2;
            try {
                RetryUtils.retryWithCondition(() -> {
                    long requirePreAllocation = requirePreAllocation(appId, intValue, arrayList, i3, rssSendShuffleDataRequest.getRetryMax() / this.maxRetryAttempts, rssSendShuffleDataRequest.getRetryIntervalMax());
                    if (requirePreAllocation == -1) {
                        throw new RssException(String.format("requirePreAllocation failed! size[%s], host[%s], port[%s]", Integer.valueOf(i3), this.host, Integer.valueOf(this.port)));
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    RssProtos.SendShuffleDataResponse sendShuffleData = getBlockingStub().sendShuffleData(RssProtos.SendShuffleDataRequest.newBuilder().setAppId(appId).setShuffleId(((Integer) entry.getKey()).intValue()).setRequireBufferId(requirePreAllocation).addAllShuffleData(newArrayList).setTimestamp(currentTimeMillis).build());
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms for " + i3 + " bytes with " + i4 + " blocks", this.host, Integer.valueOf(this.port));
                    }
                    if (sendShuffleData.getStatus() == RssProtos.StatusCode.SUCCESS) {
                        return sendShuffleData;
                    }
                    String str = "Can't send shuffle data with " + i4 + " blocks to " + this.host + ":" + this.port + ", statusCode=" + sendShuffleData.getStatus() + ", errorMsg:" + sendShuffleData.getRetMsg();
                    atomicReference.set(StatusCode.fromCode(Integer.valueOf(sendShuffleData.getStatus().getNumber())));
                    if (sendShuffleData.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
                        throw new NotRetryException(str);
                    }
                    throw new RssException(str);
                }, null, rssSendShuffleDataRequest.getRetryIntervalMax(), this.maxRetryAttempts, th -> {
                    return Boolean.valueOf(!(th instanceof OutOfMemoryError));
                });
            } catch (Throwable th2) {
                LOG.warn("Failed to send shuffle data due to ", th2);
                z = false;
            }
        }
        return z ? new RssSendShuffleDataResponse(StatusCode.SUCCESS) : new RssSendShuffleDataResponse((StatusCode) atomicReference.get());
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssSendCommitResponse sendCommit(RssSendCommitRequest rssSendCommitRequest) {
        RssProtos.ShuffleCommitResponse doSendCommit = doSendCommit(rssSendCommitRequest.getAppId(), rssSendCommitRequest.getShuffleId());
        if (doSendCommit.getStatus() != RssProtos.StatusCode.SUCCESS) {
            String str = "Can't commit shuffle data to " + this.host + ":" + this.port + " for [appId=" + rssSendCommitRequest.getAppId() + ", shuffleId=" + rssSendCommitRequest.getShuffleId() + "], errorMsg:" + doSendCommit.getRetMsg();
            LOG.error(str);
            throw new RssException(str);
        }
        RssSendCommitResponse rssSendCommitResponse = new RssSendCommitResponse(StatusCode.SUCCESS);
        rssSendCommitResponse.setCommitCount(doSendCommit.getCommitCount());
        return rssSendCommitResponse;
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssAppHeartBeatResponse sendHeartBeat(RssAppHeartBeatRequest rssAppHeartBeatRequest) {
        RssProtos.AppHeartBeatResponse doSendHeartBeat = doSendHeartBeat(rssAppHeartBeatRequest.getAppId(), rssAppHeartBeatRequest.getTimeoutMs());
        if (doSendHeartBeat.getStatus() == RssProtos.StatusCode.SUCCESS) {
            return new RssAppHeartBeatResponse(StatusCode.SUCCESS);
        }
        LOG.error("Can't send heartbeat to " + this.host + ":" + this.port + " for [appId=" + rssAppHeartBeatRequest.getAppId() + ", timeout=" + rssAppHeartBeatRequest.getTimeoutMs() + "ms], errorMsg:" + doSendHeartBeat.getRetMsg());
        return new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR);
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssFinishShuffleResponse finishShuffle(RssFinishShuffleRequest rssFinishShuffleRequest) {
        RssProtos.FinishShuffleRequest build = RssProtos.FinishShuffleRequest.newBuilder().setAppId(rssFinishShuffleRequest.getAppId()).setShuffleId(rssFinishShuffleRequest.getShuffleId()).build();
        long currentTimeMillis = System.currentTimeMillis();
        RssProtos.FinishShuffleResponse finishShuffle = getBlockingStub().finishShuffle(build);
        if (finishShuffle.getStatus() != RssProtos.StatusCode.SUCCESS) {
            String str = "Can't finish shuffle process to " + this.host + ":" + this.port + " for [appId=" + rssFinishShuffleRequest.getAppId() + ", shuffleId=" + rssFinishShuffleRequest.getShuffleId() + "], errorMsg:" + finishShuffle.getRetMsg();
            LOG.error(str);
            throw new RssException(str);
        }
        LOG.info("FinishShuffle to {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.port), "appId[" + rssFinishShuffleRequest.getAppId() + "], shuffleId[" + rssFinishShuffleRequest.getShuffleId() + "]", Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        return new RssFinishShuffleResponse(StatusCode.SUCCESS);
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssReportShuffleResultResponse reportShuffleResult(RssReportShuffleResultRequest rssReportShuffleResultRequest) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<Integer, List<Long>> entry : rssReportShuffleResultRequest.getPartitionToBlockIds().entrySet()) {
            List<Long> value = entry.getValue();
            if (value != null && !value.isEmpty()) {
                newArrayList.add(RssProtos.PartitionToBlockIds.newBuilder().setPartitionId(entry.getKey().intValue()).addAllBlockIds(entry.getValue()).build());
            }
        }
        RssProtos.ReportShuffleResultResponse doReportShuffleResult = doReportShuffleResult(RssProtos.ReportShuffleResultRequest.newBuilder().setAppId(rssReportShuffleResultRequest.getAppId()).setShuffleId(rssReportShuffleResultRequest.getShuffleId()).setTaskAttemptId(rssReportShuffleResultRequest.getTaskAttemptId()).setBitmapNum(rssReportShuffleResultRequest.getBitmapNum()).addAllPartitionToBlockIds(newArrayList).build());
        switch (doReportShuffleResult.getStatus()) {
            case SUCCESS:
                return new RssReportShuffleResultResponse(StatusCode.SUCCESS);
            default:
                String str = "Can't report shuffle result to " + this.host + ":" + this.port + " for [appId=" + rssReportShuffleResultRequest.getAppId() + ", shuffleId=" + rssReportShuffleResultRequest.getShuffleId() + ", errorMsg:" + doReportShuffleResult.getRetMsg();
                LOG.error(str);
                throw new RssException(str);
        }
    }

    private RssProtos.ReportShuffleResultResponse doReportShuffleResult(RssProtos.ReportShuffleResultRequest reportShuffleResultRequest) {
        int i = 0;
        while (i < this.maxRetryAttempts) {
            try {
                return getBlockingStub().reportShuffleResult(reportShuffleResultRequest);
            } catch (Exception e) {
                i++;
                LOG.warn("Report shuffle result to host[" + this.host + "], port[" + this.port + "] failed, try again, retryNum[" + i + "]", e);
            }
        }
        throw new RssException("Report shuffle result to host[" + this.host + "], port[" + this.port + "] failed");
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest rssGetShuffleResultRequest) {
        RssProtos.GetShuffleResultResponse shuffleResult = getBlockingStub().getShuffleResult(RssProtos.GetShuffleResultRequest.newBuilder().setAppId(rssGetShuffleResultRequest.getAppId()).setShuffleId(rssGetShuffleResultRequest.getShuffleId()).setPartitionId(rssGetShuffleResultRequest.getPartitionId()).setBlockIdLayout(RssProtos.BlockIdLayout.newBuilder().setSequenceNoBits(rssGetShuffleResultRequest.getBlockIdLayout().sequenceNoBits).setPartitionIdBits(rssGetShuffleResultRequest.getBlockIdLayout().partitionIdBits).setTaskAttemptIdBits(rssGetShuffleResultRequest.getBlockIdLayout().taskAttemptIdBits).build()).build());
        switch (shuffleResult.getStatus()) {
            case SUCCESS:
                try {
                    return new RssGetShuffleResultResponse(StatusCode.SUCCESS, shuffleResult.getSerializedBitmap().toByteArray());
                } catch (Exception e) {
                    throw new RssException(e);
                }
            default:
                String str = "Can't get shuffle result from " + this.host + ":" + this.port + " for [appId=" + rssGetShuffleResultRequest.getAppId() + ", shuffleId=" + rssGetShuffleResultRequest.getShuffleId() + ", errorMsg:" + shuffleResult.getRetMsg();
                LOG.error(str);
                throw new RssFetchFailedException(str);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleResultResponse getShuffleResultForMultiPart(RssGetShuffleResultForMultiPartRequest rssGetShuffleResultForMultiPartRequest) {
        RssProtos.GetShuffleResultForMultiPartResponse shuffleResultForMultiPart = getBlockingStub().getShuffleResultForMultiPart(RssProtos.GetShuffleResultForMultiPartRequest.newBuilder().setAppId(rssGetShuffleResultForMultiPartRequest.getAppId()).setShuffleId(rssGetShuffleResultForMultiPartRequest.getShuffleId()).addAllPartitions(rssGetShuffleResultForMultiPartRequest.getPartitions()).setBlockIdLayout(RssProtos.BlockIdLayout.newBuilder().setSequenceNoBits(rssGetShuffleResultForMultiPartRequest.getBlockIdLayout().sequenceNoBits).setPartitionIdBits(rssGetShuffleResultForMultiPartRequest.getBlockIdLayout().partitionIdBits).setTaskAttemptIdBits(rssGetShuffleResultForMultiPartRequest.getBlockIdLayout().taskAttemptIdBits).build()).build());
        switch (shuffleResultForMultiPart.getStatus()) {
            case SUCCESS:
                try {
                    return new RssGetShuffleResultResponse(StatusCode.SUCCESS, shuffleResultForMultiPart.getSerializedBitmap().toByteArray());
                } catch (Exception e) {
                    throw new RssException(e);
                }
            default:
                String str = "Can't get shuffle result from " + this.host + ":" + this.port + " for [appId=" + rssGetShuffleResultForMultiPartRequest.getAppId() + ", shuffleId=" + rssGetShuffleResultForMultiPartRequest.getShuffleId() + ", errorMsg:" + shuffleResultForMultiPart.getRetMsg();
                LOG.error(str);
                throw new RssFetchFailedException(str);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest rssGetShuffleDataRequest) {
        RssProtos.GetLocalShuffleDataResponse localShuffleData;
        long currentTimeMillis = System.currentTimeMillis();
        RssProtos.GetLocalShuffleDataRequest build = RssProtos.GetLocalShuffleDataRequest.newBuilder().setAppId(rssGetShuffleDataRequest.getAppId()).setShuffleId(rssGetShuffleDataRequest.getShuffleId()).setPartitionId(rssGetShuffleDataRequest.getPartitionId()).setPartitionNumPerRange(rssGetShuffleDataRequest.getPartitionNumPerRange()).setPartitionNum(rssGetShuffleDataRequest.getPartitionNum()).setOffset(rssGetShuffleDataRequest.getOffset()).setLength(rssGetShuffleDataRequest.getLength()).setTimestamp(currentTimeMillis).build();
        String str = "appId[" + rssGetShuffleDataRequest.getAppId() + "], shuffleId[" + rssGetShuffleDataRequest.getShuffleId() + "], partitionId[" + rssGetShuffleDataRequest.getPartitionId() + "]";
        int i = 0;
        while (true) {
            localShuffleData = getBlockingStub().getLocalShuffleData(build);
            if (localShuffleData.getStatus() != RssProtos.StatusCode.NO_BUFFER) {
                break;
            }
            waitOrThrow(rssGetShuffleDataRequest, i, str, StatusCode.fromProto(localShuffleData.getStatus()), currentTimeMillis);
            i++;
        }
        switch (localShuffleData.getStatus()) {
            case SUCCESS:
                LOG.info("GetShuffleData from {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.port), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                return new RssGetShuffleDataResponse(StatusCode.SUCCESS, ByteBuffer.wrap(localShuffleData.getData().toByteArray()));
            default:
                String str2 = "Can't get shuffle data from " + this.host + ":" + this.port + " for " + str + ", errorMsg:" + localShuffleData.getRetMsg();
                LOG.error(str2);
                throw new RssFetchFailedException(str2);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest rssGetShuffleIndexRequest) {
        RssProtos.GetLocalShuffleIndexResponse localShuffleIndex;
        RssProtos.GetLocalShuffleIndexRequest build = RssProtos.GetLocalShuffleIndexRequest.newBuilder().setAppId(rssGetShuffleIndexRequest.getAppId()).setShuffleId(rssGetShuffleIndexRequest.getShuffleId()).setPartitionId(rssGetShuffleIndexRequest.getPartitionId()).setPartitionNumPerRange(rssGetShuffleIndexRequest.getPartitionNumPerRange()).setPartitionNum(rssGetShuffleIndexRequest.getPartitionNum()).build();
        String str = "appId[" + rssGetShuffleIndexRequest.getAppId() + "], shuffleId[" + rssGetShuffleIndexRequest.getShuffleId() + "], partitionId[" + rssGetShuffleIndexRequest.getPartitionId() + "]";
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            localShuffleIndex = getBlockingStub().getLocalShuffleIndex(build);
            if (localShuffleIndex.getStatus() != RssProtos.StatusCode.NO_BUFFER) {
                break;
            }
            waitOrThrow(rssGetShuffleIndexRequest, i, str, StatusCode.fromProto(localShuffleIndex.getStatus()), currentTimeMillis);
            i++;
        }
        switch (localShuffleIndex.getStatus()) {
            case SUCCESS:
                LOG.info("GetShuffleIndex from {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.port), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                return new RssGetShuffleIndexResponse(StatusCode.SUCCESS, new NettyManagedBuffer(Unpooled.wrappedBuffer(localShuffleIndex.getIndexData().toByteArray())), localShuffleIndex.getDataFileLen());
            default:
                String str2 = "Can't get shuffle index from " + this.host + ":" + this.port + " for " + str + ", errorMsg:" + localShuffleIndex.getRetMsg();
                LOG.error(str2);
                throw new RssFetchFailedException(str2);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData(RssGetInMemoryShuffleDataRequest rssGetInMemoryShuffleDataRequest) {
        RssProtos.GetMemoryShuffleDataResponse memoryShuffleData;
        long currentTimeMillis = System.currentTimeMillis();
        ByteString byteString = ByteString.EMPTY;
        try {
            if (rssGetInMemoryShuffleDataRequest.getExpectedTaskIds() != null) {
                byteString = UnsafeByteOperations.unsafeWrap(RssUtils.serializeBitMap(rssGetInMemoryShuffleDataRequest.getExpectedTaskIds()));
            }
            RssProtos.GetMemoryShuffleDataRequest build = RssProtos.GetMemoryShuffleDataRequest.newBuilder().setAppId(rssGetInMemoryShuffleDataRequest.getAppId()).setShuffleId(rssGetInMemoryShuffleDataRequest.getShuffleId()).setPartitionId(rssGetInMemoryShuffleDataRequest.getPartitionId()).setLastBlockId(rssGetInMemoryShuffleDataRequest.getLastBlockId()).setReadBufferSize(rssGetInMemoryShuffleDataRequest.getReadBufferSize()).setSerializedExpectedTaskIdsBitmap(byteString).setTimestamp(currentTimeMillis).build();
            String str = "appId[" + rssGetInMemoryShuffleDataRequest.getAppId() + "], shuffleId[" + rssGetInMemoryShuffleDataRequest.getShuffleId() + "], partitionId[" + rssGetInMemoryShuffleDataRequest.getPartitionId() + "]";
            int i = 0;
            while (true) {
                memoryShuffleData = getBlockingStub().getMemoryShuffleData(build);
                if (memoryShuffleData.getStatus() != RssProtos.StatusCode.NO_BUFFER) {
                    break;
                }
                waitOrThrow(rssGetInMemoryShuffleDataRequest, i, str, StatusCode.fromProto(memoryShuffleData.getStatus()), currentTimeMillis);
                i++;
            }
            switch (memoryShuffleData.getStatus()) {
                case SUCCESS:
                    LOG.info("GetInMemoryShuffleData from {}:{} for " + str + " cost " + (System.currentTimeMillis() - currentTimeMillis) + " ms", this.host, Integer.valueOf(this.port));
                    return new RssGetInMemoryShuffleDataResponse(StatusCode.SUCCESS, ByteBuffer.wrap(memoryShuffleData.getData().toByteArray()), toBufferSegments(memoryShuffleData.getShuffleDataBlockSegmentsList()));
                default:
                    String str2 = "Can't get shuffle in memory data from " + this.host + ":" + this.port + " for " + str + ", errorMsg:" + memoryShuffleData.getRetMsg();
                    LOG.error(str2);
                    throw new RssFetchFailedException(str2);
            }
        } catch (Exception e) {
            throw new RssException("Errors on serializing task ids bitmap.", e);
        }
    }

    @Override // org.apache.uniffle.client.api.ShuffleServerClient
    public String getClientInfo() {
        return "ShuffleServerGrpcClient for host[" + this.host + "], port[" + this.port + "]";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitOrThrow(RetryableRequest retryableRequest, int i, String str, StatusCode statusCode, long j) {
        if (i >= retryableRequest.getRetryMax()) {
            String format = String.format("ShuffleServer %s:%s is full when %s due to %s, after %d retries, cost %d ms", this.host, Integer.valueOf(this.port), retryableRequest.operationType(), statusCode, Integer.valueOf(retryableRequest.getRetryMax()), Long.valueOf(System.currentTimeMillis() - j));
            LOG.error(format);
            throw new RssFetchFailedException(format);
        }
        try {
            long min = Math.min(retryableRequest.getRetryIntervalMax(), (2000 * (1 << Math.min(i, 16))) + this.random.nextInt(BACK_OFF_BASE));
            LOG.warn("Can't acquire buffer for {} from {}:{} when executing {}, due to {}. Will retry {} more time(s) after waiting {} milliseconds.", new Object[]{str, this.host, Integer.valueOf(this.port), retryableRequest.operationType(), statusCode, Integer.valueOf(retryableRequest.getRetryMax() - i), Long.valueOf(min)});
            Thread.sleep(min);
        } catch (InterruptedException e) {
            LOG.warn("Exception happened when executing {} from {}:{}", new Object[]{retryableRequest.operationType(), this.host, Integer.valueOf(this.port), e});
        }
    }

    private List<RssProtos.ShufflePartitionRange> toShufflePartitionRanges(List<PartitionRange> list) {
        ArrayList newArrayList = Lists.newArrayList();
        for (PartitionRange partitionRange : list) {
            newArrayList.add(RssProtos.ShufflePartitionRange.newBuilder().setStart(partitionRange.getStart()).setEnd(partitionRange.getEnd()).build());
        }
        return newArrayList;
    }

    protected List<BufferSegment> toBufferSegments(List<RssProtos.ShuffleDataBlockSegment> list) {
        ArrayList newArrayList = Lists.newArrayList();
        for (RssProtos.ShuffleDataBlockSegment shuffleDataBlockSegment : list) {
            newArrayList.add(new BufferSegment(shuffleDataBlockSegment.getBlockId(), shuffleDataBlockSegment.getOffset(), shuffleDataBlockSegment.getLength(), shuffleDataBlockSegment.getUncompressLength(), shuffleDataBlockSegment.getCrc(), shuffleDataBlockSegment.getTaskAttemptId()));
        }
        return newArrayList;
    }

    @VisibleForTesting
    public void adjustTimeout(long j) {
        this.rpcTimeout = j;
    }
}
