/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConcurrentSchemaEvolutionTableSchemaGetter {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentSchemaEvolutionTableSchemaGetter.class);
    protected final HoodieTableMetaClient metaClient;
    private final Lazy<ConcurrentHashMap<HoodieInstant, Schema>> tableSchemaCache;
    private Option<HoodieInstant> latestCommitWithValidSchema = Option.empty();

    @VisibleForTesting
    public ConcurrentHashMap<HoodieInstant, Schema> getTableSchemaCache() {
        return this.tableSchemaCache.get();
    }

    public ConcurrentSchemaEvolutionTableSchemaGetter(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
        this.tableSchemaCache = Lazy.lazily(ConcurrentHashMap::new);
    }

    private Schema handlePartitionColumnsIfNeeded(Schema schema) {
        if (this.metaClient.getTableConfig().shouldDropPartitionColumns().booleanValue()) {
            return this.metaClient.getTableConfig().getPartitionFields().map(partitionFields -> ConcurrentSchemaEvolutionTableSchemaGetter.appendPartitionColumns(schema, Option.ofNullable(partitionFields))).or(() -> Option.of(schema)).get();
        }
        return schema;
    }

    public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
        return this.getTableAvroSchemaFromTimelineWithCache(instant).or(this::getTableCreateSchemaWithoutMetaField).map(tableSchema -> includeMetadataFields ? HoodieAvroUtils.addMetadataFields(tableSchema, false) : HoodieAvroUtils.removeMetadataFields(tableSchema)).map(this::handlePartitionColumnsIfNeeded);
    }

    private Option<Schema> getTableCreateSchemaWithoutMetaField() {
        return this.metaClient.getTableConfig().getTableCreateSchema();
    }

    private void setCachedLatestCommitWithValidSchema(Option<HoodieInstant> instantOption) {
        this.latestCommitWithValidSchema = instantOption;
    }

    private Option<HoodieInstant> getCachedLatestCommitWithValidSchema() {
        return this.latestCommitWithValidSchema;
    }

    @VisibleForTesting
    Option<Schema> getTableAvroSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
        return this.getTableAvroSchemaFromTimelineWithCache(this.computeSchemaEvolutionTimelineInReverseOrder(), instantTime);
    }

    Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) {
        Option<Pair<HoodieInstant, Schema>> instantWithSchema;
        boolean fetchFromLastValidCommit = instantTime.isEmpty();
        Option<HoodieInstant> targetInstant = instantTime.or(this.getCachedLatestCommitWithValidSchema());
        Schema cachedTableSchema = null;
        if (!targetInstant.isEmpty()) {
            cachedTableSchema = this.tableSchemaCache.get().getOrDefault(targetInstant.get(), null);
        }
        if (cachedTableSchema == null && (instantWithSchema = this.getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant)).isPresent()) {
            targetInstant = Option.of(instantWithSchema.get().getLeft());
            cachedTableSchema = instantWithSchema.get().getRight();
        }
        if (fetchFromLastValidCommit) {
            this.setCachedLatestCommitWithValidSchema(targetInstant);
        }
        if (cachedTableSchema != null) {
            if (instantTime.isPresent()) {
                this.tableSchemaCache.get().putIfAbsent(instantTime.get(), cachedTableSchema);
            }
            if (targetInstant.isPresent()) {
                this.tableSchemaCache.get().putIfAbsent(targetInstant.get(), cachedTableSchema);
            }
        }
        return cachedTableSchema == null ? Option.empty() : Option.of(cachedTableSchema);
    }

    @VisibleForTesting
    Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) {
        ConcurrentHashMap tableSchemaAtInstant = new ConcurrentHashMap();
        Option<HoodieInstant> instantWithTableSchema = Option.fromJavaOptional(reversedTimelineStream.filter(s -> instant.isEmpty() || InstantComparison.compareTimestamps(s.getCompletionTime(), InstantComparison.LESSER_THAN_OR_EQUALS, ((HoodieInstant)instant.get()).getCompletionTime())).filter(s -> {
            try {
                boolean isValidSchemaStr;
                if (this.tableSchemaCache.get().containsKey(s)) {
                    tableSchemaAtInstant.putIfAbsent(s, this.tableSchemaCache.get().get(s));
                    return true;
                }
                HoodieCommitMetadata metadata = this.metaClient.getActiveTimeline().readCommitMetadata((HoodieInstant)s);
                String schemaStr = metadata.getMetadata("schema");
                boolean bl = isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr);
                if (isValidSchemaStr) {
                    tableSchemaAtInstant.putIfAbsent(s, new Schema.Parser().parse(schemaStr));
                }
                return isValidSchemaStr;
            }
            catch (IOException e) {
                LOG.warn("Failed to parse commit metadata for instant {} ", s, (Object)e);
                return false;
            }
        }).findFirst());
        if (instantWithTableSchema.isEmpty()) {
            return Option.empty();
        }
        return Option.of(Pair.of(instantWithTableSchema.get(), tableSchemaAtInstant.get(instantWithTableSchema.get())));
    }

    public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
        if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
            return dataSchema;
        }
        boolean hasPartitionColNotInSchema = Arrays.stream((Object[])partitionFields.get()).anyMatch(pf -> !AvroSchemaUtils.containsFieldInSchema(dataSchema, pf));
        boolean hasPartitionColInSchema = Arrays.stream((Object[])partitionFields.get()).anyMatch(pf -> AvroSchemaUtils.containsFieldInSchema(dataSchema, pf));
        if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
            throw new HoodieSchemaException("Partition columns could not be partially contained w/in the data schema");
        }
        if (hasPartitionColNotInSchema) {
            ArrayList<Schema.Field> newFields = new ArrayList<Schema.Field>();
            for (String partitionField : partitionFields.get()) {
                newFields.add(new Schema.Field(partitionField, AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", (Object)JsonProperties.NULL_VALUE));
            }
            return AvroSchemaUtils.appendFieldsToSchema(dataSchema, newFields);
        }
        return dataSchema;
    }

    public Stream<HoodieInstant> computeSchemaEvolutionTimelineInReverseOrder() {
        HashSet<String> actions;
        HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline();
        Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
        switch (this.metaClient.getTableType()) {
            case COPY_ON_WRITE: {
                actions = new HashSet<String>(Arrays.asList("commit", "replacecommit"));
                break;
            }
            case MERGE_ON_READ: {
                actions = new HashSet<String>(Arrays.asList("deltacommit", "replacecommit"));
                break;
            }
            default: {
                throw new HoodieException("Unsupported table type :" + (Object)((Object)this.metaClient.getTableType()));
            }
        }
        TimelineLayout timelineLayout = this.metaClient.getTimelineLayout();
        Comparator<HoodieInstant> reversedComparator = timelineLayout.getInstantComparator().completionTimeOrderedComparator().reversed();
        Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream.filter(instant -> actions.contains(instant.getAction())).filter(instant -> !ClusteringUtils.isClusteringInstant(timeline, instant, this.metaClient.getInstantGenerator())).filter(HoodieInstant::isCompleted).sorted(reversedComparator);
        return reversedTimelineWithTableSchema;
    }
}

