package org.apache.spark.shuffle;

import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.com.google.common.collect.Lists;
import org.apache.uniffle.com.google.common.collect.Maps;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/shuffle/DelegationRssShuffleManager.class */
public class DelegationRssShuffleManager implements ShuffleManager {
    private static final Logger LOG = LoggerFactory.getLogger(DelegationRssShuffleManager.class);
    private final ShuffleManager delegate;
    private final List<CoordinatorClient> coordinatorClients;
    private final int accessTimeoutMs;
    private final SparkConf sparkConf;
    private String user;
    private String uuid;

    public DelegationRssShuffleManager(SparkConf sparkConf, boolean z) throws Exception {
        this.sparkConf = sparkConf;
        this.accessTimeoutMs = ((Integer) sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS)).intValue();
        if (z) {
            this.coordinatorClients = RssSparkShuffleUtils.createCoordinatorClients(sparkConf);
            this.delegate = createShuffleManagerInDriver();
        } else {
            this.coordinatorClients = Lists.newArrayList();
            this.delegate = createShuffleManagerInExecutor();
        }
        if (this.delegate == null) {
            throw new RssException("Fail to create shuffle manager!");
        }
    }

    private ShuffleManager createShuffleManagerInDriver() throws RssException {
        this.user = "user";
        try {
            this.user = UserGroupInformation.getCurrentUser().getShortUserName();
        } catch (Exception e) {
            LOG.error("Error on getting user from ugi." + e);
        }
        boolean tryAccessCluster = tryAccessCluster();
        if (this.uuid == null || "".equals(this.uuid)) {
            this.uuid = String.valueOf(System.currentTimeMillis());
        }
        if (tryAccessCluster) {
            try {
                this.sparkConf.set("spark.rss.quota.user", this.user);
                this.sparkConf.set("spark.rss.quota.uuid", this.uuid);
                RssShuffleManager rssShuffleManager = new RssShuffleManager(this.sparkConf, true);
                this.sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
                this.sparkConf.set("spark.shuffle.manager", RssShuffleManager.class.getCanonicalName());
                LOG.info("Use RssShuffleManager");
                return rssShuffleManager;
            } catch (Exception e2) {
                LOG.warn("Fail to create RssShuffleManager, fallback to SortShuffleManager {}", e2.getMessage());
            }
        }
        try {
            ShuffleManager loadShuffleManager = RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME, this.sparkConf, true);
            this.sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "false");
            this.sparkConf.set("spark.shuffle.manager", "sort");
            LOG.info("Use SortShuffleManager");
            return loadShuffleManager;
        } catch (Exception e3) {
            throw new RssException(e3.getMessage());
        }
    }

    private boolean tryAccessCluster() {
        String trim = this.sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
        if (StringUtils.isEmpty(trim)) {
            LOG.warn("Access id key is empty");
            return false;
        }
        long longValue = ((Long) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS)).longValue();
        int intValue = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES)).intValue();
        int intValue2 = ((Integer) this.sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER)).intValue();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(intValue2));
        for (CoordinatorClient coordinatorClient : this.coordinatorClients) {
            Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(this.sparkConf);
            try {
                return ((Boolean) RetryUtils.retry(() -> {
                    RssAccessClusterResponse accessCluster = coordinatorClient.accessCluster(new RssAccessClusterRequest(trim, assignmentTags, this.accessTimeoutMs, newHashMap, this.user));
                    if (accessCluster.getStatusCode() == StatusCode.SUCCESS) {
                        LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), trim);
                        this.uuid = accessCluster.getUuid();
                        return true;
                    }
                    if (accessCluster.getStatusCode() == StatusCode.ACCESS_DENIED) {
                        throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using " + trim + " for " + accessCluster.getMessage());
                    }
                    throw new RssException("Fail to reach cluster " + coordinatorClient.getDesc() + " for " + accessCluster.getMessage());
                }, longValue, intValue)).booleanValue();
            } catch (Throwable th) {
                LOG.warn("Fail to access cluster {} using {} for {}", new Object[]{coordinatorClient.getDesc(), trim, th.getMessage()});
            }
        }
        return false;
    }

    private ShuffleManager createShuffleManagerInExecutor() throws RssException {
        ShuffleManager loadShuffleManager;
        if (((Boolean) this.sparkConf.get(RssSparkConfig.RSS_ENABLED)).booleanValue()) {
            loadShuffleManager = new RssShuffleManager(this.sparkConf, false);
            LOG.info("Use RssShuffleManager");
        } else {
            try {
                loadShuffleManager = RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME, this.sparkConf, false);
                LOG.info("Use SortShuffleManager");
            } catch (Exception e) {
                throw new RssException(e.getMessage());
            }
        }
        return loadShuffleManager;
    }

    public ShuffleManager getDelegate() {
        return this.delegate;
    }

    public <K, V, C> ShuffleHandle registerShuffle(int i, ShuffleDependency<K, V, C> shuffleDependency) {
        return this.delegate.registerShuffle(i, shuffleDependency);
    }

    public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle shuffleHandle, long j, TaskContext taskContext, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter) {
        return this.delegate.getWriter(shuffleHandle, j, taskContext, shuffleWriteMetricsReporter);
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle shuffleHandle, int i, int i2, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        return this.delegate.getReader(shuffleHandle, i, i2, taskContext, shuffleReadMetricsReporter);
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle shuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        try {
            return (ShuffleReader) this.delegate.getClass().getDeclaredMethod("getReader", ShuffleHandle.class, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, TaskContext.class, ShuffleReadMetricsReporter.class).invoke(shuffleHandle, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), taskContext, shuffleReadMetricsReporter);
        } catch (Exception e) {
            throw new RssException(e);
        }
    }

    public <K, C> ShuffleReader<K, C> getReaderForRange(ShuffleHandle shuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        try {
            return (ShuffleReader) this.delegate.getClass().getDeclaredMethod("getReaderForRange", ShuffleHandle.class, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, TaskContext.class, ShuffleReadMetricsReporter.class).invoke(shuffleHandle, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), taskContext, shuffleReadMetricsReporter);
        } catch (Exception e) {
            throw new RssException(e);
        }
    }

    public boolean unregisterShuffle(int i) {
        return this.delegate.unregisterShuffle(i);
    }

    public void stop() {
        this.delegate.stop();
        this.coordinatorClients.forEach((v0) -> {
            v0.close();
        });
    }

    public ShuffleBlockResolver shuffleBlockResolver() {
        return this.delegate.shuffleBlockResolver();
    }
}
