package org.apache.hudi.table;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.util.SerializableConfiguration;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Buffer;

/* loaded from: input_file:org/apache/hudi/table/SparkBroadcastManager.class */
public class SparkBroadcastManager extends EngineBroadcastManager {
    private final transient HoodieEngineContext context;
    private final transient HoodieTableMetaClient metaClient;
    protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
    protected Broadcast<SQLConf> sqlConfBroadcast;
    protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
    protected Broadcast<SerializableConfiguration> configurationBroadcast;

    public SparkBroadcastManager(HoodieEngineContext hoodieEngineContext, HoodieTableMetaClient hoodieTableMetaClient) {
        this.context = hoodieEngineContext;
        this.metaClient = hoodieTableMetaClient;
    }

    @Override // org.apache.hudi.table.EngineBroadcastManager
    public void prepareAndBroadcast() {
        if (!(this.context instanceof HoodieSparkEngineContext)) {
            throw new HoodieIOException("Expected to be called using Engine's context and not local context");
        }
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext) this.context;
        SQLConf conf = hoodieSparkEngineContext.getSqlContext().sessionState().conf();
        JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
        Map<String, String> $plus = Map$.MODULE$.empty().$plus(new Tuple2(FileFormat.OPTION_RETURNING_BATCH(), Boolean.toString(conf.parquetVectorizedReaderEnabled())));
        java.util.Map<String, String> schemaEvolutionConfigs = getSchemaEvolutionConfigs(new TableSchemaResolver(this.metaClient), this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), this.metaClient.getTimelineLayout().getInstantFileNameGenerator(), this.metaClient.getBasePath().toString());
        this.sqlConfBroadcast = jsc.broadcast(conf);
        Configuration hadoopConfiguration = getHadoopConfiguration(jsc.hadoopConfiguration());
        addSchemaEvolutionConfigs(hadoopConfiguration, schemaEvolutionConfigs);
        this.configurationBroadcast = jsc.broadcast(new SerializableConfiguration(hadoopConfiguration));
        this.parquetReaderOpt = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(false, (SQLConf) this.sqlConfBroadcast.getValue(), $plus, ((SerializableConfiguration) this.configurationBroadcast.getValue()).value()));
        this.parquetReaderBroadcast = jsc.broadcast(this.parquetReaderOpt.get());
    }

    @Override // org.apache.hudi.table.EngineBroadcastManager
    public Option<HoodieReaderContext> retrieveFileGroupReaderContext(StoragePath storagePath) {
        if (this.parquetReaderBroadcast == null) {
            throw new HoodieException("Spark Parquet reader broadcast is not initialized.");
        }
        SparkParquetReader sparkParquetReader = (SparkParquetReader) this.parquetReaderBroadcast.getValue();
        if (sparkParquetReader == null) {
            throw new HoodieException("Cannot get the broadcast Spark Parquet reader.");
        }
        ArrayList arrayList = new ArrayList();
        return Option.of(new SparkFileFormatInternalRowReaderContext(sparkParquetReader, ((Buffer) JavaConverters.asScalaBufferConverter(arrayList).asScala()).toSeq(), ((Buffer) JavaConverters.asScalaBufferConverter(arrayList).asScala()).toSeq()));
    }

    @Override // org.apache.hudi.table.EngineBroadcastManager
    public Option<Configuration> retrieveStorageConfig() {
        return Option.of(((SerializableConfiguration) this.configurationBroadcast.getValue()).value());
    }

    static Configuration getHadoopConfiguration(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), false);
        configuration2.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
        configuration2.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
        configuration2.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), true);
        configuration2.setBoolean("spark.sql.legacy.parquet.nanosAsLong", false);
        if (HoodieSparkUtils.gteqSpark3_4()) {
            configuration2.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false);
        }
        return new HadoopStorageConfiguration(configuration2).getInline().unwrap();
    }

    static java.util.Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver tableSchemaResolver, HoodieTimeline hoodieTimeline, InstantFileNameGenerator instantFileNameGenerator, String str) {
        Option<InternalSchema> tableInternalSchemaFromCommitMetadata = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata();
        HashMap hashMap = new HashMap();
        if (tableInternalSchemaFromCommitMetadata.isPresent()) {
            Stream<HoodieInstant> stream = hoodieTimeline.getInstants().stream();
            instantFileNameGenerator.getClass();
            hashMap.put(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, String.join(",", (List) stream.map(instantFileNameGenerator::getFileName).collect(Collectors.toList())));
            hashMap.put(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, str);
        }
        return hashMap;
    }

    static void addSchemaEvolutionConfigs(Configuration configuration, java.util.Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
    }
}
