/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.util.SerializableConfiguration;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Buffer;

public class SparkBroadcastManager
extends EngineBroadcastManager {
    private final transient HoodieEngineContext context;
    private final transient HoodieTableMetaClient metaClient;
    protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
    protected Broadcast<SQLConf> sqlConfBroadcast;
    protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
    protected Broadcast<SerializableConfiguration> configurationBroadcast;

    public SparkBroadcastManager(HoodieEngineContext context, HoodieTableMetaClient metaClient) {
        this.context = context;
        this.metaClient = metaClient;
    }

    @Override
    public void prepareAndBroadcast() {
        if (!(this.context instanceof HoodieSparkEngineContext)) {
            throw new HoodieIOException("Expected to be called using Engine's context and not local context");
        }
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext)this.context;
        SQLConf sqlConf = hoodieSparkEngineContext.getSqlContext().sessionState().conf();
        JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
        boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled();
        Map options = Map$.MODULE$.empty().$plus(new Tuple2((Object)FileFormat.OPTION_RETURNING_BATCH(), (Object)Boolean.toString(returningBatch)));
        TableSchemaResolver resolver2 = new TableSchemaResolver(this.metaClient);
        InstantFileNameGenerator fileNameGenerator = this.metaClient.getTimelineLayout().getInstantFileNameGenerator();
        HoodieTimeline timeline = this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        java.util.Map<String, String> schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs(resolver2, timeline, fileNameGenerator, this.metaClient.getBasePath().toString());
        this.sqlConfBroadcast = jsc.broadcast((Object)sqlConf);
        Configuration configs = SparkBroadcastManager.getHadoopConfiguration(jsc.hadoopConfiguration());
        SparkBroadcastManager.addSchemaEvolutionConfigs(configs, schemaEvolutionConfigs);
        this.configurationBroadcast = jsc.broadcast((Object)new SerializableConfiguration(configs));
        this.parquetReaderOpt = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(false, (SQLConf)this.sqlConfBroadcast.getValue(), (Map<String, String>)options, ((SerializableConfiguration)this.configurationBroadcast.getValue()).value()));
        this.parquetReaderBroadcast = jsc.broadcast((Object)this.parquetReaderOpt.get());
    }

    @Override
    public Option<HoodieReaderContext> retrieveFileGroupReaderContext(StoragePath basePath) {
        if (this.parquetReaderBroadcast == null) {
            throw new HoodieException("Spark Parquet reader broadcast is not initialized.");
        }
        SparkParquetReader sparkParquetReader = (SparkParquetReader)this.parquetReaderBroadcast.getValue();
        if (sparkParquetReader != null) {
            ArrayList filters = new ArrayList();
            return Option.of(new SparkFileFormatInternalRowReaderContext(sparkParquetReader, (Seq<Filter>)((Buffer)JavaConverters.asScalaBufferConverter(filters).asScala()).toSeq(), (Seq<Filter>)((Buffer)JavaConverters.asScalaBufferConverter(filters).asScala()).toSeq()));
        }
        throw new HoodieException("Cannot get the broadcast Spark Parquet reader.");
    }

    @Override
    public Option<Configuration> retrieveStorageConfig() {
        return Option.of(((SerializableConfiguration)this.configurationBroadcast.getValue()).value());
    }

    static Configuration getHadoopConfiguration(Configuration configuration) {
        Configuration hadoopConf = new Configuration(configuration);
        hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), false);
        hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
        hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
        hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), true);
        hadoopConf.setBoolean("spark.sql.legacy.parquet.nanosAsLong", false);
        if (HoodieSparkUtils.gteqSpark3_4()) {
            hadoopConf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false);
        }
        return new HadoopStorageConfiguration(hadoopConf).getInline().unwrap();
    }

    static java.util.Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver, HoodieTimeline timeline, InstantFileNameGenerator fileNameGenerator, String basePath) {
        Option<InternalSchema> internalSchemaOpt = schemaResolver.getTableInternalSchemaFromCommitMetadata();
        HashMap<String, String> configs = new HashMap<String, String>();
        if (internalSchemaOpt.isPresent()) {
            List instantFiles = timeline.getInstants().stream().map(fileNameGenerator::getFileName).collect(Collectors.toList());
            configs.put("hoodie.valid.commits.list", String.join((CharSequence)",", instantFiles));
            configs.put("hoodie.tablePath", basePath);
        }
        return configs;
    }

    static void addSchemaEvolutionConfigs(Configuration configs, java.util.Map<String, String> schemaEvolutionConfigs) {
        for (Map.Entry<String, String> entry : schemaEvolutionConfigs.entrySet()) {
            configs.set(entry.getKey(), entry.getValue());
        }
    }
}

