package org.apache.uniffle.client.impl.grpc;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.class */
public class CoordinatorGrpcRetryableClient implements CoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorGrpcRetryableClient.class);
    private List<CoordinatorClient> coordinatorClients;
    private long retryIntervalMs;
    private int retryTimes;
    private ExecutorService heartBeatExecutorService;

    public CoordinatorGrpcRetryableClient(List<CoordinatorClient> list, long j, int i, int i2) {
        this.coordinatorClients = list;
        this.retryIntervalMs = j;
        this.retryTimes = i;
        this.heartBeatExecutorService = ThreadUtils.getDaemonFixedThreadPool(i2, "client-heartbeat");
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssAppHeartBeatResponse scheduleAtFixedRateToSendAppHeartBeat(RssAppHeartBeatRequest rssAppHeartBeatRequest) {
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR));
        ThreadUtils.executeTasks(this.heartBeatExecutorService, this.coordinatorClients, coordinatorClient -> {
            try {
                RssAppHeartBeatResponse scheduleAtFixedRateToSendAppHeartBeat = coordinatorClient.scheduleAtFixedRateToSendAppHeartBeat(rssAppHeartBeatRequest);
                if (scheduleAtFixedRateToSendAppHeartBeat.getStatusCode() != StatusCode.SUCCESS) {
                    LOG.warn("Failed to send heartbeat to " + coordinatorClient.getDesc());
                } else {
                    atomicReference.set(scheduleAtFixedRateToSendAppHeartBeat);
                    LOG.info("Successfully send heartbeat to " + coordinatorClient.getDesc());
                }
                return null;
            } catch (Exception e) {
                LOG.warn("Error happened when send heartbeat to " + coordinatorClient.getDesc(), e);
                return null;
            }
        }, rssAppHeartBeatRequest.getTimeoutMs(), "send heartbeat to coordinator");
        return (RssAppHeartBeatResponse) atomicReference.get();
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssApplicationInfoResponse registerApplicationInfo(RssApplicationInfoRequest rssApplicationInfoRequest) {
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new RssApplicationInfoResponse(StatusCode.INTERNAL_ERROR));
        ThreadUtils.executeTasks(this.heartBeatExecutorService, this.coordinatorClients, coordinatorClient -> {
            try {
                RssApplicationInfoResponse registerApplicationInfo = coordinatorClient.registerApplicationInfo(rssApplicationInfoRequest);
                if (registerApplicationInfo.getStatusCode() != StatusCode.SUCCESS) {
                    LOG.error("Failed to send applicationInfo to " + coordinatorClient.getDesc());
                } else {
                    atomicReference.set(registerApplicationInfo);
                    LOG.info("Successfully send applicationInfo to " + coordinatorClient.getDesc());
                }
                return null;
            } catch (Exception e) {
                LOG.warn("Error happened when send applicationInfo to " + coordinatorClient.getDesc(), e);
                return null;
            }
        }, rssApplicationInfoRequest.getTimeoutMs(), "register application");
        return (RssApplicationInfoResponse) atomicReference.get();
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest rssSendHeartBeatRequest) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ThreadUtils.executeTasks(this.heartBeatExecutorService, this.coordinatorClients, coordinatorClient -> {
            return coordinatorClient.sendHeartBeat(rssSendHeartBeatRequest);
        }, rssSendHeartBeatRequest.getTimeout() * 2, "send heartbeat", future -> {
            try {
                if (((RssSendHeartBeatResponse) future.get(rssSendHeartBeatRequest.getTimeout() * 2, TimeUnit.MILLISECONDS)).getStatusCode() == StatusCode.SUCCESS) {
                    atomicBoolean.set(true);
                }
                return null;
            } catch (Exception e) {
                LOG.error(e.getMessage());
                return null;
            }
        });
        return atomicBoolean.get() ? new RssSendHeartBeatResponse(StatusCode.SUCCESS) : new RssSendHeartBeatResponse(StatusCode.INTERNAL_ERROR);
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssGetShuffleAssignmentsResponse getShuffleAssignments(RssGetShuffleAssignmentsRequest rssGetShuffleAssignmentsRequest) {
        try {
            return (RssGetShuffleAssignmentsResponse) RetryUtils.retry(() -> {
                RssGetShuffleAssignmentsResponse rssGetShuffleAssignmentsResponse = null;
                for (CoordinatorClient coordinatorClient : this.coordinatorClients) {
                    try {
                        rssGetShuffleAssignmentsResponse = coordinatorClient.getShuffleAssignments(rssGetShuffleAssignmentsRequest);
                    } catch (Exception e) {
                        LOG.error(e.getMessage());
                    }
                    if (rssGetShuffleAssignmentsResponse.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.info("Success to get shuffle server assignment from {}", coordinatorClient.getDesc());
                        return rssGetShuffleAssignmentsResponse;
                    }
                }
                if (rssGetShuffleAssignmentsResponse.getStatusCode() != StatusCode.SUCCESS) {
                    throw new RssException(rssGetShuffleAssignmentsResponse.getMessage());
                }
                return rssGetShuffleAssignmentsResponse;
            }, this.retryIntervalMs, this.retryTimes);
        } catch (Throwable th) {
            throw new RssException("getShuffleAssignments failed!", th);
        }
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssAccessClusterResponse accessCluster(RssAccessClusterRequest rssAccessClusterRequest) {
        try {
            return (RssAccessClusterResponse) RetryUtils.retry(() -> {
                RssAccessClusterResponse rssAccessClusterResponse = null;
                for (CoordinatorClient coordinatorClient : this.coordinatorClients) {
                    rssAccessClusterResponse = coordinatorClient.accessCluster(rssAccessClusterRequest);
                    if (rssAccessClusterResponse.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), rssAccessClusterRequest.getAccessId());
                        return rssAccessClusterResponse;
                    }
                }
                if (rssAccessClusterResponse.getStatusCode() == StatusCode.ACCESS_DENIED) {
                    throw new RssException("Request to access cluster is denied using " + rssAccessClusterRequest.getAccessId() + " for " + rssAccessClusterResponse.getMessage());
                }
                throw new RssException("Fail to reach cluster for " + rssAccessClusterResponse.getMessage());
            }, rssAccessClusterRequest.getRetryIntervalMs(), rssAccessClusterRequest.getRetryTimes());
        } catch (Throwable th) {
            throw new RssException("getShuffleAssignments failed!", th);
        }
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssFetchClientConfResponse fetchClientConf(RssFetchClientConfRequest rssFetchClientConfRequest) {
        try {
            return (RssFetchClientConfResponse) RetryUtils.retry(() -> {
                RssFetchClientConfResponse rssFetchClientConfResponse = null;
                Iterator<CoordinatorClient> it = this.coordinatorClients.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    CoordinatorClient next = it.next();
                    rssFetchClientConfResponse = next.fetchClientConf(rssFetchClientConfRequest);
                    if (rssFetchClientConfResponse.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.info("Success to get conf from {}", next.getDesc());
                        break;
                    }
                }
                if (rssFetchClientConfResponse.getStatusCode() != StatusCode.SUCCESS) {
                    throw new RssException(rssFetchClientConfResponse.getMessage());
                }
                return rssFetchClientConfResponse;
            }, this.retryIntervalMs, this.retryTimes);
        } catch (Throwable th) {
            throw new RssException("Fail to get conf", th);
        }
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public RssFetchRemoteStorageResponse fetchRemoteStorage(RssFetchRemoteStorageRequest rssFetchRemoteStorageRequest) {
        try {
            return (RssFetchRemoteStorageResponse) RetryUtils.retry(() -> {
                RssFetchRemoteStorageResponse rssFetchRemoteStorageResponse = null;
                Iterator<CoordinatorClient> it = this.coordinatorClients.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    CoordinatorClient next = it.next();
                    rssFetchRemoteStorageResponse = next.fetchRemoteStorage(rssFetchRemoteStorageRequest);
                    if (rssFetchRemoteStorageResponse.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.info("Success to get storage {} from {}", rssFetchRemoteStorageResponse.getRemoteStorageInfo(), next.getDesc());
                        break;
                    }
                }
                if (rssFetchRemoteStorageResponse.getStatusCode() != StatusCode.SUCCESS) {
                    throw new RssException(rssFetchRemoteStorageResponse.getMessage());
                }
                return rssFetchRemoteStorageResponse;
            }, this.retryIntervalMs, this.retryTimes);
        } catch (Throwable th) {
            throw new RssException("Fail to get conf", th);
        }
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public String getDesc() {
        StringBuilder sb = new StringBuilder("CoordinatorGrpcRetryableClient:");
        for (CoordinatorClient coordinatorClient : this.coordinatorClients) {
            sb.append("\n");
            sb.append(coordinatorClient.getDesc());
        }
        return sb.toString();
    }

    @Override // org.apache.uniffle.client.api.CoordinatorClient
    public void close() {
        this.heartBeatExecutorService.shutdownNow();
        this.coordinatorClients.forEach((v0) -> {
            v0.close();
        });
    }
}
