package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.class */
public class StreamerCheckpointUtils {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerCheckpointUtils.class);

    public static Option<Checkpoint> resolveCheckpointToResumeFrom(Option<HoodieTimeline> option, HoodieStreamer.Config config, TypedProperties typedProperties, HoodieTableMetaClient hoodieTableMetaClient) throws IOException {
        Option<Checkpoint> empty = Option.empty();
        assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(hoodieTableMetaClient, config, typedProperties);
        if (option.isPresent()) {
            empty = resolveCheckpointBetweenConfigAndPrevCommit((HoodieTimeline) option.get(), config, typedProperties);
        }
        return useCkpFromOverrideConfigIfAny(config, typedProperties, empty);
    }

    @VisibleForTesting
    static void assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(HoodieTableMetaClient hoodieTableMetaClient, HoodieStreamer.Config config, TypedProperties typedProperties) {
        boolean z = (StringUtils.isNullOrEmpty(config.checkpoint) && StringUtils.isNullOrEmpty(config.ignoreCheckpoint)) ? false : true;
        boolean contains = CheckpointUtils.HOODIE_INCREMENTAL_SOURCES.contains(config.sourceClassName);
        if (z && contains) {
            HoodieTableVersion fromVersionCode = HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(typedProperties, HoodieWriteConfig.WRITE_TABLE_VERSION));
            HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(config.targetBasePath).withProps(typedProperties).build();
            if (build.autoUpgrade() && UpgradeDowngrade.needsUpgradeOrDowngrade(hoodieTableMetaClient, build, fromVersionCode)) {
                throw new HoodieUpgradeDowngradeException(String.format("When upgrade/downgrade is happening, please avoid setting --checkpoint option and --ignore-checkpoint for your delta streamers. Detected invalid streamer configuration:\n%s", config));
            }
        }
    }

    private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(HoodieStreamer.Config config, TypedProperties typedProperties, Option<Checkpoint> option) {
        LOG.debug("Checkpoint from config: " + config.checkpoint);
        if (!option.isPresent() && config.checkpoint != null) {
            option = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(config.sourceClassName, ConfigUtils.getIntWithAltKeys(typedProperties, HoodieWriteConfig.WRITE_TABLE_VERSION), config.checkpoint));
        }
        return option;
    }

    @VisibleForTesting
    static Option<Checkpoint> resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline hoodieTimeline, HoodieStreamer.Config config, TypedProperties typedProperties) throws IOException {
        Option<Checkpoint> empty = Option.empty();
        if (config.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            HoodieTimeline filter = hoodieTimeline.filter(hoodieInstant -> {
                return hoodieInstant.getAction().equals("deltacommit");
            });
            if (!filter.empty()) {
                hoodieTimeline = filter;
            }
        }
        Option lastInstant = hoodieTimeline.lastInstant();
        if (lastInstant.isPresent()) {
            Option<HoodieCommitMetadata> latestCommitMetadataWithValidCheckpointInfo = getLatestCommitMetadataWithValidCheckpointInfo(hoodieTimeline);
            int intWithAltKeys = ConfigUtils.getIntWithAltKeys(typedProperties, HoodieWriteConfig.WRITE_TABLE_VERSION);
            if (latestCommitMetadataWithValidCheckpointInfo.isPresent()) {
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) latestCommitMetadataWithValidCheckpointInfo.get();
                Checkpoint checkpoint = CheckpointUtils.getCheckpoint(hoodieCommitMetadata);
                LOG.debug("Checkpoint reset from metadata: " + checkpoint.getCheckpointResetKey());
                if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(config, checkpoint)) {
                    empty = Option.empty();
                } else if (ckpOverrideCfgPrevailsOverCkpFromPrevCommit(config, checkpoint)) {
                    empty = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(config.sourceClassName, intWithAltKeys, config.checkpoint));
                } else if (shouldUseCkpFromPrevCommit(checkpoint)) {
                    empty = Option.of(checkpoint);
                } else if (InstantComparison.compareTimestamps("00000000000002", InstantComparison.LESSER_THAN, ((HoodieInstant) lastInstant.get()).requestedTime())) {
                    throw new HoodieStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastInstant + ", Instants :" + hoodieTimeline.getInstants());
                }
                if (!StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))) {
                    ConfigUtils.removeConfigFromProps(typedProperties, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
                }
            } else if (config.checkpoint != null) {
                empty = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(config.sourceClassName, intWithAltKeys, config.checkpoint));
            }
        }
        return empty;
    }

    private static boolean shouldUseCkpFromPrevCommit(Checkpoint checkpoint) {
        return !StringUtils.isNullOrEmpty(checkpoint.getCheckpointKey());
    }

    private static boolean ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config config, Checkpoint checkpoint) {
        return config.checkpoint != null && (StringUtils.isNullOrEmpty(checkpoint.getCheckpointResetKey()) || !config.checkpoint.equals(checkpoint.getCheckpointResetKey()));
    }

    private static boolean ignoreCkpCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config config, Checkpoint checkpoint) {
        return config.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(checkpoint.getCheckpointIgnoreKey()) || !config.ignoreCheckpoint.equals(checkpoint.getCheckpointIgnoreKey()));
    }

    public static Option<Pair<String, HoodieCommitMetadata>> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
        return (Option) hoodieTimeline.getReverseOrderedInstants().map(hoodieInstant -> {
            try {
                HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) TimelineLayout.fromVersion(hoodieTimeline.getTimelineLayoutVersion()).getCommitMetadataSerDe().deserialize(hoodieInstant, (byte[]) hoodieTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
                return (StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY)) && StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)) && StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata("streamer.checkpoint.key.v2")) && StringUtils.isNullOrEmpty(hoodieCommitMetadata.getMetadata("streamer.checkpoint.reset.key.v2"))) ? Option.empty() : Option.of(Pair.of(hoodieInstant.toString(), hoodieCommitMetadata));
            } catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + hoodieInstant.toString(), e);
            }
        }).filter((v0) -> {
            return v0.isPresent();
        }).findFirst().orElse(Option.empty());
    }

    public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
        return getLatestInstantAndCommitMetadataWithValidCheckpointInfo(hoodieTimeline).map(pair -> {
            return (HoodieCommitMetadata) pair.getRight();
        });
    }

    public static Option<String> getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> option) {
        return (Option) option.map(hoodieTimeline -> {
            try {
                return getLatestInstantAndCommitMetadataWithValidCheckpointInfo(hoodieTimeline).map(pair -> {
                    return (String) pair.getLeft();
                });
            } catch (IOException e) {
                throw new HoodieIOException("failed to get latest instant with ValidCheckpointInfo", e);
            }
        }).orElse(Option.empty());
    }
}
