/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieIndexingConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.table.action.index.BaseHoodieIndexClient;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Map;

public class HoodieSparkIndexClient
extends BaseHoodieIndexClient {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkIndexClient.class);
    private Option<SparkSession> sparkSessionOpt = Option.empty();
    private Option<HoodieWriteConfig> writeConfigOpt = Option.empty();
    private Option<HoodieEngineContext> engineContextOpt = Option.empty();

    public HoodieSparkIndexClient(SparkSession sparkSession) {
        this(Option.of(sparkSession), Option.empty(), Option.empty());
    }

    public HoodieSparkIndexClient(HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
        this(Option.empty(), Option.of(writeConfig), Option.of(engineContext));
    }

    public HoodieSparkIndexClient(Option<SparkSession> sparkSessionOpt, Option<HoodieWriteConfig> writeConfig, Option<HoodieEngineContext> engineContext) {
        this.sparkSessionOpt = sparkSessionOpt;
        this.writeConfigOpt = writeConfig;
        this.engineContextOpt = engineContext;
    }

    @Override
    public void create(HoodieTableMetaClient metaClient, String userIndexName, String indexType, java.util.Map<String, java.util.Map<String, String>> columns, java.util.Map<String, String> options, java.util.Map<String, String> tableProperties) throws Exception {
        if (indexType.equals("secondary_index") || indexType.equals("bloom_filters") || indexType.equals("column_stats")) {
            this.createExpressionOrSecondaryIndex(metaClient, userIndexName, indexType, columns, options, tableProperties);
        } else {
            this.createRecordIndex(metaClient, userIndexName, indexType);
        }
    }

    private void createRecordIndex(HoodieTableMetaClient metaClient, String userIndexName, String indexType) {
        block17: {
            if (!userIndexName.equals("record_index")) {
                throw new HoodieIndexException("Record index should be named as record_index");
            }
            String fullIndexName = "record_index";
            if (HoodieIndexUtils.indexExists(metaClient, fullIndexName)) {
                throw new HoodieMetadataIndexException("Index already exists: " + userIndexName);
            }
            LOG.info("Creating index {} of using {}", (Object)fullIndexName, (Object)indexType);
            try (SparkRDDWriteClient writeClient = this.getWriteClient(metaClient, Option.empty(), Option.of(indexType));){
                Option<String> indexInstantTime = HoodieSparkIndexClient.doSchedule(writeClient, metaClient, fullIndexName, MetadataPartitionType.RECORD_INDEX);
                if (indexInstantTime.isPresent()) {
                    writeClient.index(indexInstantTime.get());
                    break block17;
                }
                throw new HoodieMetadataIndexException("Scheduling of index action did not return any instant.");
            }
            catch (Throwable t) {
                this.drop(metaClient, fullIndexName, Option.empty());
                throw t;
            }
        }
    }

    @Override
    public void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
        HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder().withIndexName("column_stats").withIndexType("column_stats").withIndexFunction("column_stats").withSourceFields(columnsToIndex).withIndexOptions(Collections.EMPTY_MAP).build();
        LOG.info("Registering Or Updating the index column_stats");
        HoodieIndexUtils.register(metaClient, indexDefinition);
    }

    private void createExpressionOrSecondaryIndex(HoodieTableMetaClient metaClient, String userIndexName, String indexType, java.util.Map<String, java.util.Map<String, String>> columns, java.util.Map<String, String> options, java.util.Map<String, String> tableProperties) throws Exception {
        block16: {
            HoodieIndexDefinition indexDefinition = HoodieIndexUtils.getSecondaryOrExpressionIndexDefinition(metaClient, userIndexName, indexType, columns, options, tableProperties);
            if (!(metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent() && metaClient.getIndexMetadata().isPresent() && metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexDefinition.getIndexName()))) {
                LOG.info("Index definition is not present. Registering the index first");
                HoodieIndexUtils.register(metaClient, indexDefinition);
            }
            ValidationUtils.checkState(metaClient.getIndexMetadata().isPresent(), "Index definition is not present");
            LOG.info("Creating index {} of using {}", (Object)indexDefinition.getIndexName(), (Object)indexType);
            Option<HoodieIndexDefinition> expressionIndexDefinitionOpt = Option.ofNullable(indexDefinition);
            try (SparkRDDWriteClient writeClient = this.getWriteClient(metaClient, expressionIndexDefinitionOpt, Option.of(indexType));){
                MetadataPartitionType partitionType = indexType.equals("secondary_index") ? MetadataPartitionType.SECONDARY_INDEX : MetadataPartitionType.EXPRESSION_INDEX;
                Option<String> indexInstantTime = HoodieSparkIndexClient.doSchedule(writeClient, metaClient, indexDefinition.getIndexName(), partitionType);
                if (indexInstantTime.isPresent()) {
                    writeClient.index(indexInstantTime.get());
                    break block16;
                }
                throw new HoodieMetadataIndexException("Scheduling of index action did not return any instant.");
            }
            catch (Throwable t) {
                LOG.warn("Error while creating index: {}. So drop it.", (Object)indexDefinition.getIndexName(), (Object)t);
                this.drop(metaClient, indexDefinition.getIndexName(), Option.ofNullable(indexDefinition));
                throw t;
            }
        }
    }

    private void drop(HoodieTableMetaClient metaClient, String indexName, Option<HoodieIndexDefinition> indexDefinitionOpt) {
        LOG.info("Dropping index {}", (Object)indexName);
        try (SparkRDDWriteClient writeClient = this.getWriteClient(metaClient, indexDefinitionOpt, Option.empty());){
            writeClient.dropIndex(Collections.singletonList(indexName));
        }
    }

    @Override
    public void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists) {
        LOG.info("Dropping index {}", (Object)indexName);
        Option<HoodieIndexDefinition> indexDefinitionOpt = metaClient.getIndexMetadata().map(HoodieIndexMetadata::getIndexDefinitions).map(definition -> (HoodieIndexDefinition)definition.get(indexName));
        try (SparkRDDWriteClient writeClient = this.getWriteClient(metaClient, indexDefinitionOpt, Option.empty());){
            writeClient.dropIndex(Collections.singletonList(indexName));
        }
    }

    private SparkRDDWriteClient getWriteClient(HoodieTableMetaClient metaClient, Option<HoodieIndexDefinition> indexDefinitionOpt, Option<String> indexTypeOpt) {
        try {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
            String schemaStr = schemaUtil.getTableAvroSchema(false).toString();
            TypedProperties props = this.getProps(metaClient, indexDefinitionOpt, indexTypeOpt, schemaStr);
            if (!this.engineContextOpt.isPresent()) {
                this.engineContextOpt = Option.of(new HoodieSparkEngineContext(new JavaSparkContext(this.sparkSessionOpt.get().sparkContext())));
            }
            HoodieWriteConfig localWriteConfig = HoodieWriteConfig.newBuilder().withPath(metaClient.getBasePath()).withProperties(props).withEmbeddedTimelineServerEnabled(false).withAutoCommit(false).withSchema(schemaStr).withEngineType(EngineType.SPARK).build();
            return new SparkRDDWriteClient(this.engineContextOpt.get(), localWriteConfig, Option.empty());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to create write client while performing index operation ", e);
        }
    }

    private TypedProperties getProps(HoodieTableMetaClient metaClient, Option<HoodieIndexDefinition> indexDefinitionOpt, Option<String> indexTypeOpt, String schemaStr) {
        if (this.writeConfigOpt.isPresent()) {
            return this.writeConfigOpt.get().getProps();
        }
        TypedProperties typedProperties = metaClient.getTableConfig().getProps();
        ((java.util.Map)JavaConverters.mapAsJavaMapConverter((Map)this.sparkSessionOpt.get().sqlContext().getAllConfs()).asJava()).forEach((k, v) -> {
            if (k.startsWith("hoodie.")) {
                typedProperties.put(k, v);
            }
        });
        typedProperties.putAll(HoodieSparkIndexClient.buildWriteConfig(metaClient, indexDefinitionOpt, indexTypeOpt));
        typedProperties.put(HoodieWriteConfig.AVRO_SCHEMA_STRING.key(), schemaStr);
        return typedProperties;
    }

    private static Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client, HoodieTableMetaClient metaClient, String indexName, MetadataPartitionType partitionType) {
        List<MetadataPartitionType> partitionTypes = Collections.singletonList(partitionType);
        if (metaClient.getTableConfig().getMetadataPartitions().isEmpty()) {
            throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray()));
        }
        return client.scheduleIndexing(partitionTypes, Collections.singletonList(indexName));
    }

    private static java.util.Map<String, String> buildWriteConfig(HoodieTableMetaClient metaClient, Option<HoodieIndexDefinition> indexDefinitionOpt, Option<String> indexTypeOpt) {
        HashMap<String, String> writeConfig = new HashMap<String, String>();
        if (metaClient.getTableConfig().isMetadataTableAvailable()) {
            String indexType;
            writeConfig.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
            writeConfig.putAll(HoodieSparkIndexClient.getLockOptions(metaClient.getBasePath().toString(), metaClient.getBasePath().toUri().getScheme(), new TypedProperties()));
            metaClient.getTableConfig().getMetadataPartitions().forEach(partitionPath -> {
                if (partitionPath.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath())) {
                    writeConfig.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true");
                }
                if (partitionPath.equals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
                    writeConfig.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true");
                }
                if (partitionPath.equals(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
                    writeConfig.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
                }
            });
            if (indexTypeOpt.isPresent() && (indexType = indexTypeOpt.get()).equals("record_index")) {
                writeConfig.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true");
            }
        }
        indexDefinitionOpt.ifPresent(indexDefinition -> HoodieIndexingConfig.fromIndexDefinition(indexDefinition).getProps().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> writeConfig.put(key.toString(), value.toString()))));
        return writeConfig;
    }

    static java.util.Map<String, String> getLockOptions(String tablePath, String scheme2, TypedProperties lockConfig) {
        List<String> customSupportedFSs = lockConfig.getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<String>());
        if (scheme2 == null || customSupportedFSs.contains(scheme2) || StorageSchemes.isAtomicCreationSupported(scheme2)) {
            TypedProperties props = FileSystemBasedLockProvider.getLockConfig(tablePath);
            HashMap<String, String> toReturn = new HashMap<String, String>();
            props.stringPropertyNames().stream().forEach(key -> toReturn.put((String)key, props.getString((String)key)));
            return toReturn;
        }
        return Collections.emptyMap();
    }
}

