package com.logicalclocks.hsfs.engine;

import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
import com.amazon.deequ.profiles.ColumnProfilerRunner;
import com.amazon.deequ.profiles.ColumnProfiles;
import com.damnhandy.uri.template.UriTemplate;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.OnDemandFeatureGroup;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.util.Constants;
import java.io.IOException;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.collection.Iterator;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/logicalclocks/hsfs/engine/SparkEngine.class */
public class SparkEngine {
    private static SparkEngine INSTANCE = null;
    private Utils utils = new Utils();
    private HudiEngine hudiEngine = new HudiEngine();
    private SparkSession sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();

    public static synchronized SparkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SparkEngine();
        }
        return INSTANCE;
    }

    private SparkEngine() {
        this.sparkSession.conf().set("hive.exec.dynamic.partition", "true");
        this.sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
        this.sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
    }

    public void validateSparkConfiguration() throws FeatureStoreException {
        HashMap hashMap = new HashMap();
        hashMap.put("spark.hadoop.hops.ssl.trustore.name", null);
        hashMap.put("spark.hadoop.hops.rpc.socket.factory.class.default", "io.hops.hadoop.shaded.org.apache.hadoop.net.HopsSSLSocketFactory");
        hashMap.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        hashMap.put("spark.hadoop.hops.ssl.hostname.verifier", "ALLOW_ALL");
        hashMap.put("spark.hadoop.hops.ssl.keystore.name", null);
        hashMap.put("spark.hadoop.fs.hopsfs.impl", "io.hops.hopsfs.client.HopsFileSystem");
        hashMap.put("spark.hadoop.hops.ssl.keystores.passwd.name", null);
        hashMap.put("spark.hadoop.hops.ipc.server.ssl.enabled", "true");
        hashMap.put("spark.sql.hive.metastore.jars", null);
        hashMap.put("spark.hadoop.client.rpc.ssl.enabled.protocol", "TLSv1.2");
        hashMap.put("spark.hadoop.hive.metastore.uris", null);
        for (Map.Entry entry : hashMap.entrySet()) {
            if (!this.sparkSession.conf().contains((String) entry.getKey()) || (entry.getValue() != null && !this.sparkSession.conf().get((String) entry.getKey(), (String) null).equals(entry.getValue()))) {
                throw new FeatureStoreException("Spark is misconfigured for communication with Hopsworks, missing or invalid property: " + ((String) entry.getKey()));
            }
        }
    }

    public String getTrustStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.trustore.name");
    }

    public String getKeyStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystore.name");
    }

    public String getCertKey() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystores.passwd.name");
    }

    public Dataset<Row> sql(String str) {
        return this.sparkSession.sql(str);
    }

    public Dataset<Row> registerOnDemandTemporaryTable(OnDemandFeatureGroup onDemandFeatureGroup, String str) throws FeatureStoreException, IOException {
        Dataset<Row> read = onDemandFeatureGroup.getStorageConnector().read(onDemandFeatureGroup.getQuery(), onDemandFeatureGroup.getDataFormat() != null ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
        if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
            this.sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
        }
        read.createOrReplaceTempView(str);
        return read;
    }

    private Map<String, String> getOnDemandOptions(OnDemandFeatureGroup onDemandFeatureGroup) {
        return onDemandFeatureGroup.getOptions() == null ? new HashMap() : (Map) onDemandFeatureGroup.getOptions().stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public void registerHudiTemporaryTable(FeatureGroup featureGroup, String str, Long l, Long l2, Map<String, String> map) {
        this.hudiEngine.registerTemporaryTable(this.sparkSession, featureGroup, str, l, l2, map);
    }

    public void write(TrainingDataset trainingDataset, Dataset<Row> dataset, Map<String, String> map, SaveMode saveMode) {
        setupConnectorHadoopConf(trainingDataset.getStorageConnector());
        if (trainingDataset.getCoalesce().booleanValue()) {
            dataset = dataset.coalesce(1);
        }
        if (trainingDataset.getSplits() == null) {
            writeSingle(dataset, trainingDataset.getDataFormat(), map, saveMode, new Path(trainingDataset.getLocation(), trainingDataset.getName()).toString());
        } else {
            List list = (List) trainingDataset.getSplits().stream().map((v0) -> {
                return v0.getPercentage();
            }).collect(Collectors.toList());
            writeSplits(trainingDataset.getSeed() != null ? dataset.randomSplit(list.stream().mapToDouble((v0) -> {
                return v0.doubleValue();
            }).toArray(), trainingDataset.getSeed().longValue()) : dataset.randomSplit(list.stream().mapToDouble((v0) -> {
                return v0.doubleValue();
            }).toArray()), trainingDataset.getDataFormat(), map, saveMode, trainingDataset.getLocation(), trainingDataset.getSplits());
        }
    }

    public Map<String, String> getWriteOptions(Map<String, String> map, DataFormat dataFormat) {
        HashMap hashMap = new HashMap();
        switch (dataFormat) {
            case CSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, UriTemplate.DEFAULT_SEPARATOR);
                break;
            case TSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, "\t");
                break;
            case TFRECORDS:
            case TFRECORD:
                hashMap.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example");
                break;
        }
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    public Map<String, String> getReadOptions(Map<String, String> map, DataFormat dataFormat) {
        HashMap hashMap = new HashMap();
        switch (dataFormat) {
            case CSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, UriTemplate.DEFAULT_SEPARATOR);
                hashMap.put(Constants.INFER_SCHEMA, "true");
                break;
            case TSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, "\t");
                hashMap.put(Constants.INFER_SCHEMA, "true");
                break;
            case TFRECORDS:
            case TFRECORD:
                hashMap.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example");
                break;
        }
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    private void writeSplits(Dataset<Row>[] datasetArr, DataFormat dataFormat, Map<String, String> map, SaveMode saveMode, String str, List<Split> list) {
        for (int i = 0; i < datasetArr.length; i++) {
            writeSingle(datasetArr[i], dataFormat, map, saveMode, new Path(str, list.get(i).getName()).toString());
        }
    }

    private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat, Map<String, String> map, SaveMode saveMode, String str) {
        dataset.write().format(dataFormat.toString()).options(map).mode(saveMode).save(sparkPath(str));
    }

    public Dataset<Row> read(StorageConnector storageConnector, String str, Map<String, String> map, String str2) {
        setupConnectorHadoopConf(storageConnector);
        String sparkPath = sparkPath(str2 != null ? new Path(str2, "**").toString() : null);
        DataFrameReader options = getInstance().getSparkSession().read().format(str).options(map);
        return !Strings.isNullOrEmpty(sparkPath) ? options.load(sparkPath(sparkPath)) : options.load();
    }

    public void writeOnlineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, Map<String, String> map) throws FeatureStoreException, IOException {
        onlineFeatureGroupToAvro(featureGroup, encodeComplexFeatures(featureGroup, dataset)).write().format(Constants.KAFKA_FORMAT).options(map).option("topic", featureGroup.getOnlineTopicName()).save();
    }

    public StreamingQuery writeStreamDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, String str, String str2, boolean z, Long l, Map<String, String> map) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException {
        if (Strings.isNullOrEmpty(str)) {
            str = "insert_stream_" + featureGroup.getOnlineTopicName() + "_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
        }
        StreamingQuery start = onlineFeatureGroupToAvro(featureGroup, encodeComplexFeatures(featureGroup, dataset)).writeStream().format(Constants.KAFKA_FORMAT).outputMode(str2).option("checkpointLocation", "/Projects/" + HopsworksClient.getInstance().getProject().getProjectName() + "/Resources/" + str + "-checkpoint").options(map).option("topic", featureGroup.getOnlineTopicName()).start();
        if (z) {
            start.awaitTermination(l.longValue());
        }
        return start;
    }

    public Dataset<Row> encodeComplexFeatures(FeatureGroup featureGroup, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        ArrayList arrayList = new ArrayList();
        for (Schema.Field field : featureGroup.getDeserializedAvroSchema().getFields()) {
            if (featureGroup.getComplexFeatures().contains(field.name())) {
                arrayList.add(functions.to_avro(org.apache.spark.sql.functions.col(field.name()), featureGroup.getFeatureAvroSchema(field.name())).alias(field.name()));
            } else {
                arrayList.add(org.apache.spark.sql.functions.col(field.name()));
            }
        }
        return dataset.select((Column[]) arrayList.stream().toArray(i -> {
            return new Column[i];
        }));
    }

    private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroup featureGroup, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        Collections.sort(featureGroup.getPrimaryKeys());
        return dataset.select(new Column[]{functions.to_avro(org.apache.spark.sql.functions.concat((Column[]) featureGroup.getPrimaryKeys().stream().map(str -> {
            return org.apache.spark.sql.functions.col(str).cast("string");
        }).toArray(i -> {
            return new Column[i];
        }))).alias("key"), functions.to_avro(org.apache.spark.sql.functions.struct((Column[]) featureGroup.getDeserializedAvroSchema().getFields().stream().map(field -> {
            return org.apache.spark.sql.functions.col(field.name());
        }).toArray(i2 -> {
            return new Column[i2];
        })), featureGroup.getEncodedAvroSchema()).alias("value")});
    }

    public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, HudiOperationType hudiOperationType, Map<String, String> map, Integer num) throws IOException, FeatureStoreException, ParseException {
        if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, featureGroup, dataset, hudiOperationType, map, num);
        } else {
            writeSparkDataset(featureGroup, dataset, map);
        }
    }

    private void writeSparkDataset(FeatureGroup featureGroup, Dataset<Row> dataset, Map<String, String> map) {
        dataset.write().format(Constants.HIVE_FORMAT).mode(SaveMode.Append).options(map == null ? new HashMap<>() : map).partitionBy(this.utils.getPartitionColumns(featureGroup)).saveAsTable(this.utils.getTableName(featureGroup));
    }

    public String profile(Dataset<Row> dataset, List<String> list, Boolean bool, Boolean bool2, Boolean bool3) {
        if (bool == null) {
            bool = true;
        }
        if (bool2 == null) {
            bool2 = true;
        }
        if (bool3 == null) {
            bool3 = true;
        }
        ColumnProfilerRunBuilder withExactUniqueness = new ColumnProfilerRunner().onData(dataset).withCorrelation(bool.booleanValue(), 100).withHistogram(bool2.booleanValue(), 20).withExactUniqueness(bool3.booleanValue());
        if (list != null && !list.isEmpty()) {
            withExactUniqueness.restrictToColumns(((Iterator) JavaConverters.asScalaIteratorConverter(list.iterator()).asScala()).toSeq());
        }
        return ColumnProfiles.toJson(withExactUniqueness.run().profiles().values().toSeq());
    }

    public String profile(Dataset<Row> dataset, List<String> list, Boolean bool, Boolean bool2) {
        return profile(dataset, list, bool, bool2, true);
    }

    public String profile(Dataset<Row> dataset, List<String> list) {
        return profile(dataset, list, true, true);
    }

    public String profile(Dataset<Row> dataset, boolean z, boolean z2) {
        return profile(dataset, null, Boolean.valueOf(z), Boolean.valueOf(z2));
    }

    public String profile(Dataset<Row> dataset) {
        return profile(dataset, null, true, true);
    }

    public void setupConnectorHadoopConf(StorageConnector storageConnector) {
        if (storageConnector == null) {
            return;
        }
        switch (storageConnector.getStorageConnectorType()) {
            case S3:
                setupS3ConnectorHadoopConf((StorageConnector.S3Connector) storageConnector);
                return;
            case ADLS:
                setupAdlsConnectorHadoopConf((StorageConnector.AdlsConnector) storageConnector);
                return;
            default:
                return;
        }
    }

    public static String sparkPath(String str) {
        if (str == null) {
            return null;
        }
        return str.startsWith(Constants.S3_SCHEME) ? str.replaceFirst(Constants.S3_SCHEME, Constants.S3_SPARK_SCHEME) : str;
    }

    private void setupS3ConnectorHadoopConf(StorageConnector.S3Connector s3Connector) {
        if (!Strings.isNullOrEmpty(s3Connector.getAccessKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_ACCESS_KEY_ENV, s3Connector.getAccessKey());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getSecretKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_SECRET_KEY_ENV, s3Connector.getSecretKey());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getServerEncryptionAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-algorithm", s3Connector.getServerEncryptionAlgorithm());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getServerEncryptionKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-key", s3Connector.getServerEncryptionKey());
        }
        if (Strings.isNullOrEmpty(s3Connector.getSessionToken())) {
            return;
        }
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_CREDENTIAL_PROVIDER_ENV, Constants.S3_TEMPORARY_CREDENTIAL_PROVIDER);
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_SESSION_KEY_ENV, s3Connector.getSessionToken());
    }

    private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector adlsConnector) {
        for (Option option : adlsConnector.getSparkOptions()) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(option.getName(), option.getValue());
        }
    }

    public Dataset<Row> getEmptyAppendedDataframe(Dataset<Row> dataset, List<Feature> list) {
        Dataset<Row> limit = dataset.limit(0);
        for (Feature feature : list) {
            limit = limit.withColumn(feature.getName(), org.apache.spark.sql.functions.lit((Object) null).cast(feature.getType()));
        }
        return limit;
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }
}
