package org.apache.hudi.utilities.sources;

import java.lang.invoke.SerializedLambda;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/AvroKafkaSource.class */
public class AvroKafkaSource extends AvroSource {
    private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
    private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema";
    private final KafkaOffsetGen offsetGen;
    private final HoodieDeltaStreamerMetrics metrics;
    private final SchemaProvider schemaProvider;
    private final String deserializerClassName;

    public AvroKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        typedProperties.put("key.deserializer", StringDeserializer.class.getName());
        this.deserializerClassName = typedProperties.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
        try {
            typedProperties.put("value.deserializer", Class.forName(this.deserializerClassName).getName());
            if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
                if (schemaProvider == null) {
                    throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
                }
                typedProperties.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, schemaProvider.getSourceSchema().toString());
            }
            this.schemaProvider = schemaProvider;
            this.metrics = hoodieDeltaStreamerMetrics;
            this.offsetGen = new KafkaOffsetGen(typedProperties);
        } catch (ClassNotFoundException e) {
            String str = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error(str);
            throw new HoodieException(str, e);
        }
    }

    @Override // org.apache.hudi.utilities.sources.Source
    protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> option, long j) {
        try {
            OffsetRange[] nextOffsetRanges = this.offsetGen.getNextOffsetRanges(option, j, this.metrics);
            long j2 = KafkaOffsetGen.CheckpointUtils.totalNewMessages(nextOffsetRanges);
            LOG.info("About to read " + j2 + " from Kafka for topic :" + this.offsetGen.getTopicName());
            return j2 <= 0 ? new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges)) : new InputBatch<>(Option.of(toRDD(nextOffsetRanges)), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges));
        } catch (TimeoutException e) {
            throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
        }
    }

    private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRangeArr) {
        if (!this.deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
            return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord -> {
                return (GenericRecord) consumerRecord.value();
            });
        }
        if (this.schemaProvider == null) {
            throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
        }
        AvroConvertor avroConvertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord2 -> {
            return avroConvertor.fromAvroBinary((byte[]) consumerRecord2.value());
        });
    }

    @Override // org.apache.hudi.utilities.callback.SourceCommitCallback
    public void onCommit(String str) {
        if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue().booleanValue())) {
            this.offsetGen.commitOffsetToKafka(str);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -112450425:
                if (implMethodName.equals("lambda$toRDD$3b364b3c$1")) {
                    z = true;
                    break;
                }
                break;
            case 1173961342:
                if (implMethodName.equals("lambda$toRDD$29bf7102$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/utilities/sources/helpers/AvroConvertor;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    AvroConvertor avroConvertor = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return consumerRecord2 -> {
                        return avroConvertor.fromAvroBinary((byte[]) consumerRecord2.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    return consumerRecord -> {
                        return (GenericRecord) consumerRecord.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
