package com.logicalclocks.hsfs.spark.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.JobConfiguration;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase;
import com.logicalclocks.hsfs.spark.ExternalFeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.class */
public class FeatureGroupEngine extends FeatureGroupEngineBase {
    private HudiEngine hudiEngine = new HudiEngine();

    public FeatureGroup save(FeatureGroup featureGroup, Dataset<Row> dataset, List<String> list, String str, Map<String, String> map) throws FeatureStoreException, IOException, ParseException {
        Dataset<Row> sanitizeFeatureNames = SparkEngine.getInstance().sanitizeFeatureNames(dataset);
        FeatureGroup saveFeatureGroupMetaData = saveFeatureGroupMetaData(featureGroup, list, str, sanitizeFeatureNames, false);
        insert(saveFeatureGroupMetaData, sanitizeFeatureNames, null, saveFeatureGroupMetaData.getTimeTravelFormat() == TimeTravelFormat.HUDI ? HudiOperationType.BULK_INSERT : null, SaveMode.Append, list, str, map);
        return saveFeatureGroupMetaData;
    }

    public StreamFeatureGroup save(StreamFeatureGroup streamFeatureGroup, Dataset<Row> dataset, List<String> list, String str, Map<String, String> map, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException {
        insert(saveFeatureGroupMetaData(streamFeatureGroup, list, str, map, jobConfiguration, dataset), SparkEngine.getInstance().sanitizeFeatureNames(dataset), SaveMode.Append, list, str, map, jobConfiguration);
        return streamFeatureGroup;
    }

    public void insert(FeatureGroup featureGroup, Dataset<Row> dataset, Storage storage, HudiOperationType hudiOperationType, SaveMode saveMode, List<String> list, String str, Map<String, String> map) throws FeatureStoreException, IOException, ParseException {
        if (featureGroup.getId() == null) {
            featureGroup = saveFeatureGroupMetaData(featureGroup, list, str, dataset, false);
        }
        if (saveMode == SaveMode.Overwrite) {
            this.featureGroupApi.deleteContent(featureGroup);
        }
        saveDataframe(featureGroup, dataset, storage, hudiOperationType, map, SparkEngine.getInstance().getKafkaConfig(featureGroup, map), null);
    }

    public void insert(StreamFeatureGroup streamFeatureGroup, Dataset<Row> dataset, SaveMode saveMode, List<String> list, String str, Map<String, String> map, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException {
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = saveFeatureGroupMetaData(streamFeatureGroup, list, str, map, jobConfiguration, dataset);
        }
        if (saveMode == SaveMode.Overwrite) {
            this.featureGroupApi.deleteContent(streamFeatureGroup);
        }
        SparkEngine.getInstance().writeOnlineDataframe(streamFeatureGroup, dataset, streamFeatureGroup.getOnlineTopicName(), SparkEngine.getInstance().getKafkaConfig(streamFeatureGroup, map));
    }

    public void insert(ExternalFeatureGroup externalFeatureGroup, Dataset<Row> dataset, Map<String, String> map) throws FeatureStoreException, IOException {
        if (!externalFeatureGroup.getOnlineEnabled().booleanValue()) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. External feature groups can only store data in online storage. To create an offline only external feature group, use the `save` method.");
        }
        if (externalFeatureGroup.getId() == null) {
            externalFeatureGroup = saveExternalFeatureGroup(externalFeatureGroup);
        }
        SparkEngine.getInstance().writeOnlineDataframe(externalFeatureGroup, dataset, externalFeatureGroup.getOnlineTopicName(), SparkEngine.getInstance().getKafkaConfig(externalFeatureGroup, map));
    }

    @Deprecated
    public StreamingQuery insertStream(FeatureGroup featureGroup, Dataset<Row> dataset, String str, String str2, boolean z, Long l, String str3, List<String> list, String str4, Map<String, String> map) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException, ParseException {
        if (!featureGroup.getOnlineEnabled().booleanValue()) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. It is currently only possible to stream to the online storage.");
        }
        if (featureGroup.getId() == null) {
            featureGroup = saveFeatureGroupMetaData(featureGroup, list, str4, dataset, true);
        }
        return SparkEngine.getInstance().writeStreamDataframe(featureGroup, SparkEngine.getInstance().convertToDefaultDataframe(dataset), str, str2, z, l, str3, SparkEngine.getInstance().getKafkaConfig(featureGroup, map));
    }

    public StreamingQuery insertStream(StreamFeatureGroup streamFeatureGroup, Dataset<Row> dataset, String str, String str2, boolean z, Long l, String str3, List<String> list, String str4, Map<String, String> map, JobConfiguration jobConfiguration) {
        if (map == null) {
            map = new HashMap();
        }
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = saveFeatureGroupMetaData(streamFeatureGroup, list, str4, map, jobConfiguration, dataset);
        }
        return SparkEngine.getInstance().writeStreamDataframe(streamFeatureGroup, SparkEngine.getInstance().sanitizeFeatureNames(dataset), str, str2, z, l, str3, SparkEngine.getInstance().getKafkaConfig(streamFeatureGroup, map));
    }

    public void saveDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, Storage storage, HudiOperationType hudiOperationType, Map<String, String> map, Map<String, String> map2, Integer num) throws IOException, FeatureStoreException, ParseException {
        if (!featureGroup.getOnlineEnabled().booleanValue() && storage == Storage.ONLINE) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. Set `online=false` to write to the offline storage.");
        }
        if (storage == Storage.OFFLINE || !featureGroup.getOnlineEnabled().booleanValue()) {
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, dataset, hudiOperationType, map, num);
            return;
        }
        if (storage == Storage.ONLINE) {
            SparkEngine.getInstance().writeOnlineDataframe(featureGroup, dataset, featureGroup.getOnlineTopicName(), map2);
        } else {
            if (!featureGroup.getOnlineEnabled().booleanValue() || storage != null) {
                throw new FeatureStoreException("Error writing to offline and online feature store.");
            }
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, dataset, hudiOperationType, map, num);
            SparkEngine.getInstance().writeOnlineDataframe(featureGroup, dataset, featureGroup.getOnlineTopicName(), map2);
        }
    }

    public FeatureGroup saveFeatureGroupMetaData(FeatureGroup featureGroup, List<String> list, String str, Dataset<Row> dataset, boolean z) throws FeatureStoreException, IOException, ParseException {
        if (featureGroup.getFeatures() == null) {
            featureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(dataset, featureGroup.getTimeTravelFormat()));
        }
        FeatureGroupEngineBase.LOGGER.info("Featuregroup features: " + featureGroup.getFeatures());
        this.utils.verifyAttributeKeyNames(featureGroup, list, str);
        featureGroup.setOnlineTopicName(((FeatureGroup) this.featureGroupApi.saveFeatureGroupMetaData(featureGroup, list, str, null, null, FeatureGroup.class)).getOnlineTopicName());
        if (z) {
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, (Dataset) SparkEngine.getInstance().createEmptyDataFrame(dataset), featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI ? HudiOperationType.BULK_INSERT : null, null, null);
        }
        return featureGroup;
    }

    public StreamFeatureGroup saveFeatureGroupMetaData(StreamFeatureGroup streamFeatureGroup, List<String> list, String str, Map<String, String> map, JobConfiguration jobConfiguration, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        if (streamFeatureGroup.getFeatures() == null) {
            streamFeatureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(SparkEngine.getInstance().sanitizeFeatureNames(dataset), streamFeatureGroup.getTimeTravelFormat()));
        }
        FeatureGroupEngineBase.LOGGER.info("Featuregroup features: " + streamFeatureGroup.getFeatures());
        this.utils.verifyAttributeKeyNames(streamFeatureGroup, list, str);
        streamFeatureGroup.setOnlineTopicName(((StreamFeatureGroup) this.featureGroupApi.saveFeatureGroupMetaData(streamFeatureGroup, list, str, map, jobConfiguration, StreamFeatureGroup.class)).getOnlineTopicName());
        return streamFeatureGroup;
    }

    public FeatureGroup getOrCreateFeatureGroup(FeatureStore featureStore, String str, Integer num, String str2, List<String> list, List<String> list2, String str3, boolean z, TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String str4, String str5) throws IOException, FeatureStoreException {
        FeatureGroup build;
        try {
            build = getFeatureGroup(featureStore, str, num);
        } catch (FeatureStoreException | IOException e) {
            if (!e.getMessage().contains("Error: 404") || !e.getMessage().contains("\"errorCode\":270009")) {
                throw e;
            }
            build = FeatureGroup.builder().featureStore(featureStore).name(str).version(num).description(str2).primaryKeys(list).partitionKeys(list2).hudiPrecombineKey(str3).onlineEnabled(z).timeTravelFormat(timeTravelFormat).statisticsConfig(statisticsConfig).eventTime(str5).topicName(str4).build();
            build.setFeatureStore(featureStore);
        }
        return build;
    }

    public FeatureGroup getFeatureGroup(FeatureStore featureStore, String str, Integer num) throws IOException, FeatureStoreException {
        FeatureGroup featureGroup = ((FeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, num, FeatureGroup[].class))[0];
        featureGroup.setFeatureStore(featureStore);
        return featureGroup;
    }

    public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String str) throws FeatureStoreException, IOException {
        return Arrays.asList((FeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, null, FeatureGroup[].class));
    }

    public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String str, Integer num, String str2, List<String> list, List<String> list2, String str3, boolean z, StatisticsConfig statisticsConfig, String str4) throws IOException, FeatureStoreException {
        StreamFeatureGroup build;
        try {
            build = getStreamFeatureGroup(featureStore, str, num);
        } catch (FeatureStoreException | IOException e) {
            if (!e.getMessage().contains("Error: 404") || !e.getMessage().contains("\"errorCode\":270009")) {
                throw e;
            }
            build = StreamFeatureGroup.builder().featureStore(featureStore).name(str).version(num).description(str2).primaryKeys(list).partitionKeys(list2).hudiPrecombineKey(str3).onlineEnabled(z).statisticsConfig(statisticsConfig).eventTime(str4).build();
            build.setFeatureStore(featureStore);
        }
        return build;
    }

    public StreamFeatureGroup getStreamFeatureGroup(FeatureStore featureStore, String str, Integer num) throws IOException, FeatureStoreException {
        StreamFeatureGroup streamFeatureGroup = ((StreamFeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, num, StreamFeatureGroup[].class))[0];
        streamFeatureGroup.setFeatureStore(featureStore);
        return streamFeatureGroup;
    }

    public List<StreamFeatureGroup> getStreamFeatureGroups(FeatureStore featureStore, String str) throws FeatureStoreException, IOException {
        return Arrays.asList((StreamFeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, null, StreamFeatureGroup[].class));
    }

    public <T extends FeatureGroupBase> void appendFeatures(FeatureGroupBase featureGroupBase, List<Feature> list, Class<T> cls) throws FeatureStoreException, IOException, ParseException {
        featureGroupBase.getFeatures().addAll(list);
        featureGroupBase.setFeatures(this.featureGroupApi.updateMetadata(featureGroupBase, "updateMetadata", cls).getFeatures());
        featureGroupBase.unloadSubject();
        if (featureGroupBase instanceof FeatureGroup) {
            SparkEngine.getInstance().writeEmptyDataframe(featureGroupBase);
        }
    }

    public Map<Long, Map<String, String>> commitDetails(FeatureGroupBase featureGroupBase, Integer num) throws IOException, FeatureStoreException, ParseException {
        if (((featureGroupBase instanceof FeatureGroup) && featureGroupBase.getTimeTravelFormat() == TimeTravelFormat.HUDI) || (featureGroupBase instanceof StreamFeatureGroup)) {
            return this.utils.getCommitDetails(featureGroupBase, null, num);
        }
        throw new FeatureStoreException("commitDetails function is only valid for time travel enabled feature group");
    }

    public Map<Long, Map<String, String>> commitDetailsByWallclockTime(FeatureGroupBase featureGroupBase, String str, Integer num) throws IOException, FeatureStoreException, ParseException {
        return this.utils.getCommitDetails(featureGroupBase, str, num);
    }

    public FeatureGroupCommit commitDelete(FeatureGroupBase featureGroupBase, Dataset<Row> dataset, Map<String, String> map) throws IOException, FeatureStoreException, ParseException {
        if (((featureGroupBase instanceof FeatureGroup) && featureGroupBase.getTimeTravelFormat() == TimeTravelFormat.HUDI) || (featureGroupBase instanceof StreamFeatureGroup)) {
            return this.hudiEngine.deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase, dataset, map);
        }
        throw new FeatureStoreException("delete function is only valid for time travel enabled feature group");
    }

    public ExternalFeatureGroup saveExternalFeatureGroup(ExternalFeatureGroup externalFeatureGroup) throws FeatureStoreException, IOException {
        if (externalFeatureGroup.getFeatures() == null) {
            externalFeatureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(SparkEngine.getInstance().registerOnDemandTemporaryTable(externalFeatureGroup, "read_ondmd"), externalFeatureGroup.getTimeTravelFormat()));
        }
        this.utils.verifyAttributeKeyNames(externalFeatureGroup, null, null);
        if (externalFeatureGroup.getPrimaryKeys() != null) {
            externalFeatureGroup.getPrimaryKeys().forEach(str -> {
                externalFeatureGroup.getFeatures().forEach(feature -> {
                    if (feature.getName().equals(str)) {
                        feature.setPrimary(true);
                    }
                });
            });
        }
        ExternalFeatureGroup externalFeatureGroup2 = (ExternalFeatureGroup) saveExtennalFeatureGroupMetaData(externalFeatureGroup, ExternalFeatureGroup.class);
        externalFeatureGroup.setId(externalFeatureGroup2.getId());
        externalFeatureGroup.setOnlineTopicName(externalFeatureGroup2.getOnlineTopicName());
        return externalFeatureGroup;
    }

    public List<ExternalFeatureGroup> getExternalFeatureGroups(FeatureStore featureStore, String str) throws FeatureStoreException, IOException {
        return Arrays.asList((ExternalFeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, null, ExternalFeatureGroup[].class));
    }

    public ExternalFeatureGroup getExternalFeatureGroup(FeatureStore featureStore, String str, Integer num) throws IOException, FeatureStoreException {
        ExternalFeatureGroup externalFeatureGroup = ((ExternalFeatureGroup[]) this.featureGroupApi.getInternal(featureStore, str, num, ExternalFeatureGroup[].class))[0];
        externalFeatureGroup.setFeatureStore(featureStore);
        return externalFeatureGroup;
    }
}
