package com.logicalclocks.hsfs.spark.engine.hudi;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerConfig.class */
public class DeltaStreamerConfig implements Serializable {
    private HoodieDeltaStreamer.Config deltaStreamerConfig(final Map<String, String> map) {
        HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
        config.targetBasePath = map.get("hoodie.base.path");
        config.targetTableName = map.get("hoodie.table.name");
        config.tableType = map.get("hoodie.datasource.write.storage.type");
        if (map.containsKey("operation") && EnumUtils.isValidEnum(WriteOperationType.class, map.get("operation"))) {
            config.operation = WriteOperationType.valueOf(map.get("operation"));
        } else {
            config.operation = WriteOperationType.UPSERT;
        }
        if (map.containsKey("initialCheckPointString")) {
            config.checkpoint = map.get("initialCheckPointString");
        }
        config.enableHiveSync = true;
        config.sourceClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerKafkaSource";
        config.schemaProviderClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerSchemaProvider";
        if (map.get("minSyncIntervalSeconds") != null) {
            config.minSyncIntervalSeconds = Integer.valueOf(Integer.parseInt(map.get("minSyncIntervalSeconds")));
            config.continuousMode = true;
        }
        config.sparkMaster = "yarn";
        config.transformerClassNames = new ArrayList<String>() { // from class: com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerConfig.1
            {
                add("com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerTransformer");
            }
        };
        config.sourceOrderingField = map.get("sourceOrderingField");
        config.configs = new ArrayList<String>() { // from class: com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerConfig.2
            {
                map.entrySet().stream().filter(entry -> {
                    return !((String) entry.getKey()).startsWith("kafka.");
                }).forEach(entry2 -> {
                    add(((String) entry2.getKey()) + "=" + ((String) entry2.getValue()));
                });
                map.entrySet().stream().filter(entry3 -> {
                    return ((String) entry3.getKey()).startsWith("kafka.");
                }).forEach(entry4 -> {
                    add(((String) entry4.getKey()).replace("kafka.", "") + "=" + ((String) entry4.getValue()));
                });
            }
        };
        return config;
    }

    public void streamToHoodieTable(Map<String, String> map, SparkSession sparkSession) throws Exception {
        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        migrateTable(map, fromSparkContext);
        new HoodieDeltaStreamer(deltaStreamerConfig(map), fromSparkContext).sync();
    }

    private void migrateTable(Map<String, String> map, JavaSparkContext javaSparkContext) {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConf(javaSparkContext.hadoopConfiguration())).setBasePath(map.get("hoodie.base.path")).setLoadActiveTimelineOnLoad(false).build();
        if (!build.getTableConfig().contains(HoodieTableConfig.VERSION) || build.getTableConfig().getTableVersion() == HoodieTableVersion.FIVE) {
            return;
        }
        build.getTableConfig().setValue("hoodie.datasource.write.operation", WriteOperationType.UPSERT.value());
        HoodieTableConfig.update(build.getStorage(), build.getMetaPath(), build.getTableConfig().getProps());
        new UpgradeDowngrade(build, HoodieWriteConfig.newBuilder().forTable(build.getTableConfig().getTableName()).withPath(map.get("hoodie.base.path")).withRollbackUsingMarkers(true).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(), new HoodieSparkEngineContext(javaSparkContext), SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.FIVE, (String) null);
    }
}
