package com.logicalclocks.hsfs.spark.engine;

import com.google.common.collect.Maps;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.EntityEndpointType;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.TrainingDatasetType;
import com.logicalclocks.hsfs.engine.FeatureViewEngineBase;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.FeatureView;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import com.logicalclocks.hsfs.spark.TrainingDataset;
import com.logicalclocks.hsfs.spark.TrainingDatasetBundle;
import com.logicalclocks.hsfs.spark.constructor.Query;
import java.io.IOException;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/FeatureViewEngine.class */
public class FeatureViewEngine extends FeatureViewEngineBase<Query, FeatureView, FeatureStore, StreamFeatureGroup, Dataset<Row>> {
    private TrainingDatasetEngine trainingDatasetEngine = new TrainingDatasetEngine();
    private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.TRAINING_DATASET);

    @Override // com.logicalclocks.hsfs.engine.FeatureViewEngineBase
    public FeatureView update(FeatureView featureView) throws FeatureStoreException, IOException {
        this.featureViewApi.update(featureView, FeatureView.class);
        return featureView;
    }

    @Override // com.logicalclocks.hsfs.engine.FeatureViewEngineBase
    public FeatureView get(FeatureStore featureStore, String str, Integer num) throws FeatureStoreException, IOException {
        FeatureView featureView = get(featureStore, str, num, FeatureView.class);
        featureView.setFeatureStore(featureStore);
        return featureView;
    }

    public TrainingDatasetBundle createTrainingDataset(FeatureView featureView, TrainingDataset trainingDataset, Map<String, String> map) throws IOException, FeatureStoreException {
        TrainingDataset createTrainingDataMetadata = createTrainingDataMetadata(featureView, trainingDataset);
        writeTrainingDataset(featureView, createTrainingDataMetadata, map);
        return new TrainingDatasetBundle(createTrainingDataMetadata.getVersion());
    }

    public void writeTrainingDataset(FeatureView featureView, TrainingDataset trainingDataset, Map<String, String> map) throws IOException, FeatureStoreException {
        Map<String, String> writeOptions = SparkEngine.getInstance().getWriteOptions(map, trainingDataset.getDataFormat());
        computeStatistics(featureView, trainingDataset, SparkEngine.getInstance().write(trainingDataset, getBatchQuery(featureView, trainingDataset.getEventStartTime(), trainingDataset.getEventEndTime(), true, trainingDataset.getVersion(), Query.class), Maps.newHashMap(), writeOptions, SaveMode.Overwrite));
    }

    public TrainingDatasetBundle getTrainingDataset(FeatureView featureView, Integer num, List<String> list, Map<String, String> map) throws IOException, FeatureStoreException, ParseException {
        return getTrainingDataset(featureView, featureView.getFeatureStore().createTrainingDataset().version(num).build(), list, map);
    }

    public TrainingDatasetBundle getTrainingDataset(FeatureView featureView, TrainingDataset trainingDataset, Map<String, String> map) throws IOException, FeatureStoreException {
        return getTrainingDataset(featureView, trainingDataset, (List<String>) null, map);
    }

    public TrainingDatasetBundle getTrainingDataset(FeatureView featureView, TrainingDataset trainingDataset, List<String> list, Map<String, String> map) throws IOException, FeatureStoreException {
        TrainingDatasetBundle trainingDatasetBundle;
        TrainingDataset trainingDataMetadata = trainingDataset.getVersion() != null ? getTrainingDataMetadata(featureView, trainingDataset.getVersion()) : createTrainingDataMetadata(featureView, trainingDataset);
        if (list != null) {
            int size = trainingDataMetadata.getSplits().size();
            Object obj = "";
            if (size != list.size()) {
                if (size == 0) {
                    obj = "getTrainingData";
                } else if (size == 2) {
                    obj = "getTrainTestSplit";
                } else if (size == 3) {
                    obj = "getTrainValidationTestSplit";
                }
                throw new FeatureStoreException(String.format("Incorrect `get` method is used. Use `FeatureView.%s` instead.", obj));
            }
        }
        if (TrainingDatasetType.IN_MEMORY_TRAINING_DATASET.equals(trainingDataMetadata.getTrainingDatasetType())) {
            if (trainingDataMetadata.getSplits() == null || trainingDataMetadata.getSplits().isEmpty()) {
                Dataset<Row> readDataset = readDataset(featureView, trainingDataMetadata, map);
                trainingDatasetBundle = new TrainingDatasetBundle(trainingDataMetadata.getVersion(), readDataset, featureView.getLabels());
                computeStatistics(featureView, trainingDataMetadata, new Dataset[]{readDataset});
            } else {
                Dataset<Row>[] splitDataset = SparkEngine.getInstance().splitDataset(trainingDataMetadata, getBatchQuery(featureView, trainingDataset.getEventStartTime(), trainingDataset.getEventEndTime(), true, trainingDataset.getVersion(), Query.class), map);
                trainingDatasetBundle = new TrainingDatasetBundle(trainingDataMetadata.getVersion(), convertSplitDatasetsToMap(trainingDataMetadata.getSplits(), splitDataset), featureView.getLabels());
                computeStatistics(featureView, trainingDataMetadata, splitDataset);
            }
            return trainingDatasetBundle;
        }
        try {
            List<Split> splits = trainingDataMetadata.getSplits();
            if (splits == null || splits.isEmpty()) {
                return new TrainingDatasetBundle(trainingDataMetadata.getVersion(), castColumnType(trainingDataMetadata.getDataFormat(), this.trainingDatasetEngine.read(trainingDataMetadata, "", map), featureView.getFeatures()), featureView.getLabels());
            }
            HashMap newHashMap = Maps.newHashMap();
            for (Split split : splits) {
                newHashMap.put(split.getName(), castColumnType(trainingDataMetadata.getDataFormat(), this.trainingDatasetEngine.read(trainingDataMetadata, split.getName(), map), featureView.getFeatures()));
            }
            return new TrainingDatasetBundle(trainingDataMetadata.getVersion(), newHashMap, featureView.getLabels());
        } catch (InvalidInputException e) {
            throw new IllegalStateException("Failed to read datasets. Check if path exists or recreate a training dataset.");
        }
    }

    private Dataset<Row> castColumnType(DataFormat dataFormat, Dataset<Row> dataset, List<TrainingDatasetFeature> list) throws FeatureStoreException {
        return (DataFormat.CSV.equals(dataFormat) || DataFormat.TSV.equals(dataFormat)) ? SparkEngine.getInstance().castColumnType(dataset, list) : dataset;
    }

    private TrainingDataset createTrainingDataMetadata(FeatureView featureView, TrainingDataset trainingDataset) throws IOException, FeatureStoreException {
        setEventTime(featureView, trainingDataset);
        return (TrainingDataset) this.featureViewApi.createTrainingData(featureView.getName(), featureView.getVersion(), trainingDataset, TrainingDataset.class);
    }

    private TrainingDataset getTrainingDataMetadata(FeatureView featureView, Integer num) throws IOException, FeatureStoreException {
        return (TrainingDataset) this.featureViewApi.getTrainingData(featureView.getFeatureStore(), featureView.getName(), featureView.getVersion(), num, TrainingDataset.class);
    }

    public Statistics computeStatistics(FeatureView featureView, TrainingDataset trainingDataset, Dataset<Row>[] datasetArr) throws FeatureStoreException, IOException {
        if (trainingDataset.getStatisticsConfig().getEnabled().booleanValue()) {
            return (trainingDataset.getSplits() == null || trainingDataset.getSplits().isEmpty()) ? this.statisticsEngine.computeStatistics(featureView, trainingDataset, datasetArr[0]) : this.statisticsEngine.computeAndSaveSplitStatistics(featureView, trainingDataset, convertSplitDatasetsToMap(trainingDataset.getSplits(), datasetArr));
        }
        return null;
    }

    protected Map<String, Dataset<Row>> convertSplitDatasetsToMap(List<Split> list, Dataset<Row>[] datasetArr) {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < datasetArr.length; i++) {
            newHashMap.put(list.get(i).getName(), datasetArr[i]);
        }
        return newHashMap;
    }

    public void recreateTrainingDataset(FeatureView featureView, Integer num, Map<String, String> map) throws IOException, FeatureStoreException {
        writeTrainingDataset(featureView, getTrainingDataMetadata(featureView, num), map);
    }

    private Dataset<Row> readDataset(FeatureView featureView, TrainingDataset trainingDataset, Map<String, String> map) throws IOException, FeatureStoreException {
        return getBatchQuery(featureView, trainingDataset.getEventStartTime(), trainingDataset.getEventEndTime(), true, trainingDataset.getVersion(), Query.class).read(false, map);
    }

    public String getBatchQueryString(FeatureView featureView, Date date, Date date2, Integer num) throws FeatureStoreException, IOException {
        return getBatchQuery(featureView, date, date2, false, num, Query.class).sql();
    }

    public Dataset<Row> getBatchData(FeatureView featureView, Date date, Date date2, Map<String, String> map, Integer num) throws FeatureStoreException, IOException {
        return getBatchQuery(featureView, date, date2, false, num, Query.class).read(false, map);
    }

    public Query getBatchQuery(FeatureView featureView, Date date, Date date2, Boolean bool, Integer num) throws FeatureStoreException, IOException {
        return getBatchQuery(featureView, date, date2, false, num, Query.class);
    }

    public FeatureView getOrCreateFeatureView(FeatureStore featureStore, String str, Integer num, Query query, String str2, List<String> list) throws FeatureStoreException, IOException {
        FeatureView build;
        try {
            build = get(featureStore, str, num, FeatureView.class);
        } catch (FeatureStoreException | IOException e) {
            if (!e.getMessage().contains("Error: 404") || !e.getMessage().contains("\"errorCode\":270181")) {
                throw e;
            }
            build = new FeatureView.FeatureViewBuilder(featureStore).name(str).version(num).query(query).description(str2).labels(list).build();
        }
        return build;
    }
}
