package org.apache.spark.shuffle;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.shaded.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/spark/shuffle/RssSparkShuffleUtils.class */
public class RssSparkShuffleUtils {
    private static final Logger LOG = LoggerFactory.getLogger(RssSparkShuffleUtils.class);
    public static final ClassTag<SimpleShuffleHandleInfo> DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG = ClassTag$.MODULE$.apply(SimpleShuffleHandleInfo.class);
    public static final ClassTag<byte[]> BYTE_ARRAY_CLASS_TAG = ClassTag$.MODULE$.apply(byte[].class);

    public static Configuration newHadoopConfiguration(SparkConf sparkConf) {
        Configuration newConfiguration = new SparkHadoopUtil().newConfiguration(sparkConf);
        boolean booleanValue = ((Boolean) sparkConf.get(RssSparkConfig.RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE)).booleanValue();
        if (booleanValue) {
            int length = "spark.rss.ozone.".length();
            newConfiguration.setBoolean(RssSparkConfig.RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE.key().substring(length), booleanValue);
            newConfiguration.set(RssSparkConfig.RSS_OZONE_FS_HDFS_IMPL.key().substring(length), (String) sparkConf.get(RssSparkConfig.RSS_OZONE_FS_HDFS_IMPL));
            newConfiguration.set(RssSparkConfig.RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL.key().substring(length), (String) sparkConf.get(RssSparkConfig.RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL));
        }
        return newConfiguration;
    }

    public static ShuffleManager loadShuffleManager(String str, SparkConf sparkConf, boolean z) throws Exception {
        ShuffleManager shuffleManager;
        Class<?> cls = Class.forName(str);
        try {
            shuffleManager = (ShuffleManager) cls.getConstructor(sparkConf.getClass(), Boolean.TYPE).newInstance(sparkConf, Boolean.valueOf(z));
        } catch (NoSuchMethodException e) {
            shuffleManager = (ShuffleManager) cls.getConstructor(sparkConf.getClass()).newInstance(sparkConf);
        }
        return shuffleManager;
    }

    public static CoordinatorGrpcRetryableClient createCoordinatorClients(SparkConf sparkConf) {
        String str = (String) sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
        return CoordinatorClientFactory.getInstance().createCoordinatorClient(ClientType.valueOf(str), (String) sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM), ((Long) sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)).longValue(), ((Integer) sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)).intValue(), ((Integer) sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM)).intValue());
    }

    public static void applyDynamicClientConf(SparkConf sparkConf, Map<String, String> map) {
        if (sparkConf == null) {
            LOG.warn("Spark conf is null");
            return;
        }
        if (map == null || map.isEmpty()) {
            LOG.warn("Empty conf items");
            return;
        }
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            if (!key.startsWith("spark.")) {
                key = "spark." + key;
            }
            String value = entry.getValue();
            boolean contains = RssSparkConfig.RSS_MANDATORY_CLUSTER_CONF.contains(key);
            if (!sparkConf.contains(key) || contains) {
                if (sparkConf.contains(key) && contains) {
                    LOG.warn("Override with mandatory dynamic conf {} = {}", key, value);
                } else {
                    LOG.info("Use dynamic conf {} = {}", key, value);
                }
                sparkConf.set(key, value);
            }
        }
    }

    public static void validateRssClientConf(SparkConf sparkConf) {
        if (!sparkConf.contains(RssSparkConfig.RSS_STORAGE_TYPE.key())) {
            String format = String.format("%s must be set by the client or fetched from coordinators.", "Storage type");
            LOG.error(format);
            throw new IllegalArgumentException(format);
        }
        ClientUtils.validateTestModeConf(sparkConf.getBoolean(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), false), sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()));
        int intValue = ((Integer) sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)).intValue();
        long longValue = ((Long) sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)).longValue();
        long longValue2 = ((Long) sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS)).longValue();
        if (longValue * intValue > longValue2) {
            throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should not bigger than %s(%s)", RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), Integer.valueOf(intValue), RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), Long.valueOf(longValue), RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), Long.valueOf(longValue2)));
        }
    }

    public static Configuration getRemoteStorageHadoopConf(SparkConf sparkConf, RemoteStorageInfo remoteStorageInfo) {
        Configuration newHadoopConfiguration = newHadoopConfiguration(sparkConf);
        Map<String, String> confItems = remoteStorageInfo.getConfItems();
        if (confItems != null && !confItems.isEmpty()) {
            for (Map.Entry<String, String> entry : confItems.entrySet()) {
                newHadoopConfiguration.set(entry.getKey(), entry.getValue());
            }
        }
        return newHadoopConfiguration;
    }

    public static Set<String> getAssignmentTags(SparkConf sparkConf) {
        HashSet hashSet = new HashSet();
        String str = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_TAGS.key(), "");
        if (StringUtils.isNotEmpty(str)) {
            hashSet.addAll(Arrays.asList(str.trim().split(Constants.COMMA_SPLIT_CHAR)));
        }
        hashSet.add(Constants.SHUFFLE_SERVER_VERSION);
        return hashSet;
    }

    public static int estimateTaskConcurrency(SparkConf sparkConf) {
        int i;
        double doubleValue = ((Double) sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR)).doubleValue();
        if (doubleValue > 1.0d || doubleValue < 0.0d) {
            throw new RssException("dynamicAllocationFactor is not valid: " + doubleValue);
        }
        int floorDiv = Math.floorDiv(sparkConf.getInt(Constants.SPARK_EXECUTOR_CORES, 1), sparkConf.getInt(Constants.SPARK_TASK_CPUS, 1));
        if (sparkConf.getBoolean(Constants.SPARK_DYNAMIC_ENABLED, false)) {
            int min = Math.min(sparkConf.getInt(Constants.SPARK_MAX_DYNAMIC_EXECUTOR, 0), 10000);
            i = ((int) (((min - r0) * doubleValue) + sparkConf.getInt(Constants.SPARK_MIN_DYNAMIC_EXECUTOR, 0))) * floorDiv;
        } else {
            int i2 = sparkConf.getInt(Constants.SPARK_EXECUTOR_INSTANTS, -1);
            i = i2 > 0 ? i2 * floorDiv : 0;
        }
        return i;
    }

    public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
        boolean booleanValue = ((Boolean) sparkConf.get(RssSparkConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED)).booleanValue();
        int intValue = ((Integer) sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER)).intValue();
        if (!booleanValue || intValue > 0) {
            return intValue;
        }
        return (int) Math.ceil((estimateTaskConcurrency(sparkConf) * 1.0d) / ((Integer) sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER)).intValue());
    }

    public static SparkContext getActiveSparkContext() {
        return SparkContext.getOrCreate();
    }

    public static Broadcast<SimpleShuffleHandleInfo> broadcastShuffleHdlInfo(SparkContext sparkContext, int i, Map<Integer, List<ShuffleServerInfo>> map, RemoteStorageInfo remoteStorageInfo) {
        return sparkContext.broadcast(new SimpleShuffleHandleInfo(i, map, remoteStorageInfo), DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG);
    }

    private static <T> T instantiateFetchFailedException(BlockManagerId blockManagerId, int i, int i2, int i3, Throwable th) {
        Object newInstance;
        try {
            Class<?> cls = Class.forName(FetchFailedException.class.getName());
            try {
                newInstance = cls.getConstructor(blockManagerId.getClass(), Integer.TYPE, Long.TYPE, Integer.TYPE, Integer.TYPE, Throwable.class).newInstance(blockManagerId, Integer.valueOf(i), Long.valueOf(i2), Integer.valueOf(i2), Integer.valueOf(i3), th);
            } catch (IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                try {
                    newInstance = cls.getConstructor(blockManagerId.getClass(), Integer.TYPE, Integer.TYPE, Integer.TYPE, Throwable.class).newInstance(blockManagerId, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), th);
                } catch (Exception e2) {
                    LOG.error("Fail to new instance.", e2);
                    throw new RssException(e2);
                }
            }
            return (T) newInstance;
        } catch (ClassNotFoundException e3) {
            throw new RssException(e3);
        }
    }

    public static FetchFailedException createFetchFailedException(int i, int i2, int i3, Throwable th) {
        return (FetchFailedException) instantiateFetchFailedException(BlockManagerId.apply("exec-dummy", "dummy_host", 9999, Option.empty()), i, i2, i3, th == null ? new Throwable("No cause") : th);
    }

    public static boolean isStageResubmitSupported() {
        return SparkVersionUtils.isSpark3() || (SparkVersionUtils.isSpark2() && SparkVersionUtils.MINOR_VERSION >= 3);
    }

    public static RssException reportRssFetchFailedException(Supplier<ShuffleManagerClient> supplier, RssFetchFailedException rssFetchFailedException, SparkConf sparkConf, String str, int i, int i2, Set<Integer> set) {
        if (RssSparkConfig.toRssConf(sparkConf).getBoolean(RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED) && isStageResubmitSupported()) {
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (supplier.get().reportShuffleFetchFailure(new RssReportShuffleFetchFailureRequest(str, i, i2, intValue, rssFetchFailedException.getMessage())).getReSubmitWholeStage()) {
                    return new RssException((Throwable) createFetchFailedException(i, -1, intValue, rssFetchFailedException));
                }
            }
        }
        return rssFetchFailedException;
    }
}
