package org.apache.uniffle.client.impl.grpc;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.netty.client.TransportClient;
import org.apache.uniffle.common.netty.client.TransportClientFactory;
import org.apache.uniffle.common.netty.client.TransportConf;
import org.apache.uniffle.common.netty.client.TransportContext;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.RpcResponse;
import org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.class */
public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
    private int nettyPort;
    private TransportClientFactory clientFactory;
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class);
    private static final AtomicLong counter = new AtomicLong();

    public ShuffleServerGrpcNettyClient(RssConf rssConf, String str, int i, int i2) {
        this(rssConf, str, i, i2, rssConf == null ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue().intValue() : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS), rssConf == null ? RssClientConf.RPC_TIMEOUT_MS.defaultValue().longValue() : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
    }

    public ShuffleServerGrpcNettyClient(RssConf rssConf, String str, int i, int i2, int i3, long j) {
        super(str, i, i3, j);
        this.nettyPort = i2;
        this.clientFactory = new TransportClientFactory(new TransportContext(new TransportConf(rssConf)));
    }

    @Override // org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient, org.apache.uniffle.client.api.ShuffleServerClient
    public String getClientInfo() {
        return "ShuffleServerGrpcNettyClient for host[" + this.host + "], port[" + this.port + "], nettyPort[" + this.nettyPort + "]";
    }

    @Override // org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient, org.apache.uniffle.client.api.ShuffleServerClient
    public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest rssSendShuffleDataRequest) {
        boolean z = true;
        for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> entry : rssSendShuffleDataRequest.getShuffleIdToBlocks().entrySet()) {
            int intValue = entry.getKey().intValue();
            int i = 0;
            int i2 = 0;
            Iterator<Map.Entry<Integer, List<ShuffleBlockInfo>>> it = entry.getValue().entrySet().iterator();
            while (it.hasNext()) {
                Iterator<ShuffleBlockInfo> it2 = it.next().getValue().iterator();
                while (it2.hasNext()) {
                    i += it2.next().getSize();
                    i2++;
                }
            }
            SendShuffleDataRequest sendShuffleDataRequest = new SendShuffleDataRequest(requestId(), rssSendShuffleDataRequest.getAppId(), intValue, 0L, entry.getValue(), System.currentTimeMillis());
            int encodedLength = i + sendShuffleDataRequest.encodedLength();
            int i3 = i2;
            try {
                RetryUtils.retryWithCondition(() -> {
                    TransportClient transportClient = getTransportClient();
                    long requirePreAllocation = requirePreAllocation(rssSendShuffleDataRequest.getAppId(), encodedLength, rssSendShuffleDataRequest.getRetryMax(), rssSendShuffleDataRequest.getRetryIntervalMax());
                    if (requirePreAllocation == -1) {
                        throw new RssException(String.format("requirePreAllocation failed! size[%s], host[%s], port[%s]", Integer.valueOf(encodedLength), this.host, Integer.valueOf(this.port)));
                    }
                    sendShuffleDataRequest.setRequireId(requirePreAllocation);
                    sendShuffleDataRequest.setTimestamp(System.currentTimeMillis());
                    long currentTimeMillis = System.currentTimeMillis();
                    RpcResponse sendRpcSync = transportClient.sendRpcSync(sendShuffleDataRequest, this.rpcTimeout);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms for " + encodedLength + " bytes with " + i3 + " blocks", this.host, Integer.valueOf(this.port));
                    }
                    if (sendRpcSync.getStatusCode() == StatusCode.SUCCESS) {
                        return sendRpcSync;
                    }
                    String str = "Can't send shuffle data with " + i3 + " blocks to " + this.host + ":" + this.port + ", statusCode=" + sendRpcSync.getStatusCode() + ", errorMsg:" + sendRpcSync.getRetMessage();
                    if (sendRpcSync.getStatusCode() == StatusCode.NO_REGISTER) {
                        throw new NotRetryException(str);
                    }
                    throw new RssException(str);
                }, null, rssSendShuffleDataRequest.getRetryIntervalMax(), this.maxRetryAttempts, th -> {
                    return Boolean.valueOf(!(th instanceof OutOfMemoryError));
                });
            } catch (Throwable th2) {
                LOG.warn("Failed to send shuffle data due to ", th2);
                z = false;
            }
        }
        return z ? new RssSendShuffleDataResponse(StatusCode.SUCCESS) : new RssSendShuffleDataResponse(StatusCode.INTERNAL_ERROR);
    }

    @Override // org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient, org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData(RssGetInMemoryShuffleDataRequest rssGetInMemoryShuffleDataRequest) {
        RpcResponse sendRpcSync;
        GetMemoryShuffleDataResponse getMemoryShuffleDataResponse;
        TransportClient transportClient = getTransportClient();
        GetMemoryShuffleDataRequest getMemoryShuffleDataRequest = new GetMemoryShuffleDataRequest(requestId(), rssGetInMemoryShuffleDataRequest.getAppId(), rssGetInMemoryShuffleDataRequest.getShuffleId(), rssGetInMemoryShuffleDataRequest.getPartitionId(), rssGetInMemoryShuffleDataRequest.getLastBlockId(), rssGetInMemoryShuffleDataRequest.getReadBufferSize(), System.currentTimeMillis(), rssGetInMemoryShuffleDataRequest.getExpectedTaskIds());
        String str = "appId[" + rssGetInMemoryShuffleDataRequest.getAppId() + "], shuffleId[" + rssGetInMemoryShuffleDataRequest.getShuffleId() + "], partitionId[" + rssGetInMemoryShuffleDataRequest.getPartitionId() + "], lastBlockId[" + rssGetInMemoryShuffleDataRequest.getLastBlockId() + "]";
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            sendRpcSync = transportClient.sendRpcSync(getMemoryShuffleDataRequest, this.rpcTimeout);
            getMemoryShuffleDataResponse = (GetMemoryShuffleDataResponse) sendRpcSync;
            if (sendRpcSync.getStatusCode() != StatusCode.NO_BUFFER) {
                break;
            }
            waitOrThrow(rssGetInMemoryShuffleDataRequest, i, str, sendRpcSync.getStatusCode(), currentTimeMillis);
            i++;
        }
        switch (sendRpcSync.getStatusCode()) {
            case SUCCESS:
                LOG.info("GetInMemoryShuffleData from {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.nettyPort), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                return new RssGetInMemoryShuffleDataResponse(StatusCode.SUCCESS, getMemoryShuffleDataResponse.body(), getMemoryShuffleDataResponse.getBufferSegments());
            default:
                String str2 = "Can't get shuffle in memory data from " + this.host + ":" + this.nettyPort + " for " + str + ", errorMsg:" + getMemoryShuffleDataResponse.getRetMessage();
                LOG.error(str2);
                throw new RssFetchFailedException(str2);
        }
    }

    @Override // org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient, org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest rssGetShuffleIndexRequest) {
        RpcResponse sendRpcSync;
        GetLocalShuffleIndexResponse getLocalShuffleIndexResponse;
        TransportClient transportClient = getTransportClient();
        GetLocalShuffleIndexRequest getLocalShuffleIndexRequest = new GetLocalShuffleIndexRequest(requestId(), rssGetShuffleIndexRequest.getAppId(), rssGetShuffleIndexRequest.getShuffleId(), rssGetShuffleIndexRequest.getPartitionId(), rssGetShuffleIndexRequest.getPartitionNumPerRange(), rssGetShuffleIndexRequest.getPartitionNum());
        String str = "appId[" + rssGetShuffleIndexRequest.getAppId() + "], shuffleId[" + rssGetShuffleIndexRequest.getShuffleId() + "], partitionId[" + rssGetShuffleIndexRequest.getPartitionId();
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            sendRpcSync = transportClient.sendRpcSync(getLocalShuffleIndexRequest, this.rpcTimeout);
            getLocalShuffleIndexResponse = (GetLocalShuffleIndexResponse) sendRpcSync;
            if (sendRpcSync.getStatusCode() != StatusCode.NO_BUFFER) {
                break;
            }
            waitOrThrow(rssGetShuffleIndexRequest, i, str, sendRpcSync.getStatusCode(), currentTimeMillis);
            i++;
        }
        switch (sendRpcSync.getStatusCode()) {
            case SUCCESS:
                LOG.info("GetShuffleIndex from {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.nettyPort), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                return new RssGetShuffleIndexResponse(StatusCode.SUCCESS, getLocalShuffleIndexResponse.body(), getLocalShuffleIndexResponse.getFileLength());
            default:
                String str2 = "Can't get shuffle index from " + this.host + ":" + this.nettyPort + " for " + str + ", errorMsg:" + getLocalShuffleIndexResponse.getRetMessage();
                LOG.error(str2);
                throw new RssFetchFailedException(str2);
        }
    }

    @Override // org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient, org.apache.uniffle.client.api.ShuffleServerClient
    public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest rssGetShuffleDataRequest) {
        RpcResponse sendRpcSync;
        GetLocalShuffleDataResponse getLocalShuffleDataResponse;
        TransportClient transportClient = getTransportClient();
        GetLocalShuffleDataRequest getLocalShuffleDataRequest = new GetLocalShuffleDataRequest(requestId(), rssGetShuffleDataRequest.getAppId(), rssGetShuffleDataRequest.getShuffleId(), rssGetShuffleDataRequest.getPartitionId(), rssGetShuffleDataRequest.getPartitionNumPerRange(), rssGetShuffleDataRequest.getPartitionNum(), rssGetShuffleDataRequest.getOffset(), rssGetShuffleDataRequest.getLength(), System.currentTimeMillis());
        String str = "appId[" + rssGetShuffleDataRequest.getAppId() + "], shuffleId[" + rssGetShuffleDataRequest.getShuffleId() + "], partitionId[" + rssGetShuffleDataRequest.getPartitionId() + "]";
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            sendRpcSync = transportClient.sendRpcSync(getLocalShuffleDataRequest, this.rpcTimeout);
            getLocalShuffleDataResponse = (GetLocalShuffleDataResponse) sendRpcSync;
            if (sendRpcSync.getStatusCode() != StatusCode.NO_BUFFER) {
                break;
            }
            waitOrThrow(rssGetShuffleDataRequest, i, str, sendRpcSync.getStatusCode(), currentTimeMillis);
            i++;
        }
        switch (sendRpcSync.getStatusCode()) {
            case SUCCESS:
                LOG.info("GetShuffleData from {}:{} for {} cost {} ms", new Object[]{this.host, Integer.valueOf(this.nettyPort), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                return new RssGetShuffleDataResponse(StatusCode.SUCCESS, getLocalShuffleDataResponse.body());
            default:
                String str2 = "Can't get shuffle data from " + this.host + ":" + this.nettyPort + " for " + str + ", errorMsg:" + getLocalShuffleDataResponse.getRetMessage();
                LOG.error(str2);
                throw new RssFetchFailedException(str2);
        }
    }

    public static long requestId() {
        return counter.getAndIncrement();
    }

    private TransportClient getTransportClient() {
        try {
            return this.clientFactory.createClient(this.host, this.nettyPort);
        } catch (Exception e) {
            throw new RssException("create transport client failed", e);
        }
    }
}
