package org.apache.hudi.cli;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/cli/HDFSParquetImporterUtils.class */
public class HDFSParquetImporterUtils implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetImporterUtils.class);
    private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd").withZone(ZoneId.systemDefault());
    private final String command;
    private final String srcPath;
    private final String targetPath;
    private final String tableName;
    private final String tableType;
    private final String rowKey;
    private final String partitionKey;
    private final int parallelism;
    private final String schemaFile;
    private int retry;
    private final String propsFilePath;
    private final List<String> configs = new ArrayList();
    private TypedProperties props;

    public HDFSParquetImporterUtils(String str, String str2, String str3, String str4, String str5, String str6, String str7, int i, String str8, int i2, String str9) {
        this.command = str;
        this.srcPath = str2;
        this.targetPath = str3;
        this.tableName = str4;
        this.tableType = str5;
        this.rowKey = str6;
        this.partitionKey = str7;
        this.parallelism = i;
        this.schemaFile = str8;
        this.retry = i2;
        this.propsFilePath = str9;
    }

    public boolean isUpsert() {
        return "upsert".equalsIgnoreCase(this.command);
    }

    public int dataImport(JavaSparkContext javaSparkContext) {
        int i;
        FileSystem fs = HadoopFSUtils.getFs(this.targetPath, javaSparkContext.hadoopConfiguration());
        this.props = (this.propsFilePath == null || this.propsFilePath.isEmpty()) ? buildProperties(this.configs) : readConfig(fs.getConf(), new StoragePath(this.propsFilePath), this.configs).getProps(true);
        LOG.info("Starting data import with configs : " + this.props.toString());
        int i2 = -1;
        try {
        } catch (Throwable th) {
            LOG.error("dataImport failed", th);
        }
        if (fs.exists(new Path(this.targetPath)) && !isUpsert()) {
            throw new HoodieIOException(String.format("Make sure %s is not present.", this.targetPath));
        }
        do {
            i2 = dataImport(javaSparkContext, fs);
            if (i2 == 0) {
                break;
            }
            i = this.retry;
            this.retry = i - 1;
        } while (i > 0);
        return i2;
    }

    public int dataImport(JavaSparkContext javaSparkContext, FileSystem fileSystem) {
        try {
            if (fileSystem.exists(new Path(this.targetPath)) && !isUpsert()) {
                fileSystem.delete(new Path(this.targetPath), true);
            }
            if (!fileSystem.exists(new Path(this.targetPath))) {
                HoodieTableMetaClient.newTableBuilder().setTableName(this.tableName).setTableType(this.tableType).initTable((StorageConfiguration<?>) HadoopFSUtils.getStorageConfWithCopy(javaSparkContext.hadoopConfiguration()), this.targetPath);
            }
            String parseSchema = parseSchema(fileSystem, this.schemaFile);
            SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = createHoodieClient(javaSparkContext, this.targetPath, parseSchema, this.parallelism, Option.empty(), this.props);
            JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport = buildHoodieRecordsForImport(javaSparkContext, parseSchema);
            String startCommit = createHoodieClient.startCommit();
            return handleErrors(javaSparkContext, startCommit, load(createHoodieClient, startCommit, buildHoodieRecordsForImport));
        } catch (Throwable th) {
            LOG.error("Error occurred.", th);
            return -1;
        }
    }

    public JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext javaSparkContext, String str) throws IOException {
        Job job = Job.getInstance(javaSparkContext.hadoopConfiguration());
        job.getConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", HoodieMetadataConfig.DEFAULT_ENABLE_FALLBACK);
        job.getConfiguration().set("mapreduce.input.fileinputformat.list-status.num-threads", "1024");
        AvroReadSupport.setAvroReadSchema(javaSparkContext.hadoopConfiguration(), new Schema.Parser().parse(str));
        ParquetInputFormat.setReadSupportClass(job, AvroReadSupport.class);
        new HoodieSparkEngineContext(javaSparkContext).setJobStatus(getClass().getSimpleName(), "Build records for import: " + this.tableName);
        return javaSparkContext.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()).coalesce(16 * this.parallelism).map(obj -> {
            GenericRecord genericRecord = (GenericRecord) ((Tuple2) obj)._2();
            Object obj = genericRecord.get(this.partitionKey);
            if (obj == null) {
                throw new HoodieIOException("partition key is missing. :" + this.partitionKey);
            }
            Object obj2 = genericRecord.get(this.rowKey);
            if (obj2 == null) {
                throw new HoodieIOException("row field is missing. :" + this.rowKey);
            }
            String obj3 = obj.toString();
            LOG.debug("Row Key : " + obj2 + ", Partition Path is (" + obj3 + VisibilityConstants.CLOSED_PARAN);
            if (obj instanceof Number) {
                try {
                    obj3 = PARTITION_FORMATTER.format(Instant.ofEpochMilli((long) (Double.parseDouble(obj.toString()) * 1000.0d)));
                } catch (NumberFormatException e) {
                    LOG.warn("Unable to parse date from partition field. Assuming partition as (" + obj + VisibilityConstants.CLOSED_PARAN);
                }
            }
            return new HoodieAvroRecord(new HoodieKey(obj2.toString(), obj3), new HoodieJsonPayload(genericRecord.toString()));
        });
    }

    public <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> sparkRDDWriteClient, String str, JavaRDD<HoodieRecord<T>> javaRDD) {
        String lowerCase = this.command.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -838395601:
                if (lowerCase.equals("upsert")) {
                    z = false;
                    break;
                }
                break;
            case -267485621:
                if (lowerCase.equals("bulkinsert")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return sparkRDDWriteClient.upsert(javaRDD, str);
            case true:
                return sparkRDDWriteClient.bulkInsert(javaRDD, str);
            default:
                return sparkRDDWriteClient.insert(javaRDD, str);
        }
    }

    public static TypedProperties buildProperties(List<String> list) {
        TypedProperties globalProps = DFSPropertiesConfiguration.getGlobalProps();
        list.forEach(str -> {
            String[] split = str.split(Strings.DEFAULT_SEPARATOR);
            ValidationUtils.checkArgument(split.length == 2);
            globalProps.setProperty(split[0], split[1]);
        });
        return globalProps;
    }

    public static DFSPropertiesConfiguration readConfig(Configuration configuration, StoragePath storagePath, List<String> list) {
        DFSPropertiesConfiguration dFSPropertiesConfiguration = new DFSPropertiesConfiguration(configuration, storagePath);
        try {
            if (!list.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                dFSPropertiesConfiguration.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", list))), storagePath);
            }
            return dFSPropertiesConfiguration;
        } catch (IOException e) {
            throw new HoodieIOException("Unexpected error adding config overrides", e);
        }
    }

    public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext javaSparkContext, String str, String str2, int i, Option<String> option, TypedProperties typedProperties) {
        return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(javaSparkContext), HoodieWriteConfig.newBuilder().withPath(str).withParallelism(i, i).withBulkInsertParallelism(i).withDeleteParallelism(i).withSchema(str2).combineInput(true, true).withCompactionConfig((HoodieCompactionConfig) option.map(ReflectionUtils::loadClass).map(compactionStrategy -> {
            return HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withCompactionStrategy(compactionStrategy).build();
        }).orElseGet(() -> {
            return HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build();
        })).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withProps(typedProperties).build());
    }

    public static String parseSchema(FileSystem fileSystem, String str) throws Exception {
        Path path = new Path(str);
        if (!fileSystem.exists(path)) {
            throw new Exception(String.format("Could not find - %s - schema file.", str));
        }
        ByteBuffer allocate = ByteBuffer.allocate((int) fileSystem.getFileStatus(path).getLen());
        FSDataInputStream open = fileSystem.open(path);
        Throwable th = null;
        try {
            open.readFully(0L, allocate.array(), 0, allocate.array().length);
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    open.close();
                }
            }
            return StringUtils.fromUTF8Bytes(allocate.array());
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    public static int handleErrors(JavaSparkContext javaSparkContext, String str, JavaRDD<WriteStatus> javaRDD) {
        LongAccumulator longAccumulator = javaSparkContext.sc().longAccumulator();
        javaRDD.foreach(writeStatus -> {
            if (writeStatus.hasErrors()) {
                longAccumulator.add(serialVersionUID);
                LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
            }
        });
        if (longAccumulator.value().longValue() == 0) {
            LOG.info(String.format("Table imported into hoodie with %s instant time.", str));
            return 0;
        }
        LOG.error(String.format("Import failed with %d errors.", longAccumulator.value()));
        return -1;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1604262682:
                if (implMethodName.equals("lambda$buildHoodieRecordsForImport$ecca0a96$1")) {
                    z = true;
                    break;
                }
                break;
            case 969855199:
                if (implMethodName.equals("lambda$handleErrors$9ecf2cfd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/hudi/cli/HDFSParquetImporterUtils") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/util/LongAccumulator;Lorg/apache/hudi/client/WriteStatus;)V")) {
                    LongAccumulator longAccumulator = (LongAccumulator) serializedLambda.getCapturedArg(0);
                    return writeStatus -> {
                        if (writeStatus.hasErrors()) {
                            longAccumulator.add(serialVersionUID);
                            LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/cli/HDFSParquetImporterUtils") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    HDFSParquetImporterUtils hDFSParquetImporterUtils = (HDFSParquetImporterUtils) serializedLambda.getCapturedArg(0);
                    return obj -> {
                        GenericRecord genericRecord = (GenericRecord) ((Tuple2) obj)._2();
                        Object obj = genericRecord.get(this.partitionKey);
                        if (obj == null) {
                            throw new HoodieIOException("partition key is missing. :" + this.partitionKey);
                        }
                        Object obj2 = genericRecord.get(this.rowKey);
                        if (obj2 == null) {
                            throw new HoodieIOException("row field is missing. :" + this.rowKey);
                        }
                        String obj3 = obj.toString();
                        LOG.debug("Row Key : " + obj2 + ", Partition Path is (" + obj3 + VisibilityConstants.CLOSED_PARAN);
                        if (obj instanceof Number) {
                            try {
                                obj3 = PARTITION_FORMATTER.format(Instant.ofEpochMilli((long) (Double.parseDouble(obj.toString()) * 1000.0d)));
                            } catch (NumberFormatException e) {
                                LOG.warn("Unable to parse date from partition field. Assuming partition as (" + obj + VisibilityConstants.CLOSED_PARAN);
                            }
                        }
                        return new HoodieAvroRecord(new HoodieKey(obj2.toString(), obj3), new HoodieJsonPayload(genericRecord.toString()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
