package org.apache.hudi.utilities.sources;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ThreadUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.Lazy;
import org.apache.hudi.utilities.config.PulsarSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.pulsar.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/PulsarSource.class */
public class PulsarSource extends RowSource implements Closeable {
    private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d";
    private final String topicName;
    private final String serviceEndpointURL;
    private final String adminEndpointURL;
    private final Lazy<PulsarClient> pulsarClient;
    private final Lazy<Consumer<byte[]>> pulsarConsumer;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSource.class);
    private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20);
    private static final String[] PULSAR_META_FIELDS = {"__key", "__topic", "__messageId", "__publishTime", "__eventTime", "__messageProperties"};

    /* renamed from: org.apache.hudi.utilities.sources.PulsarSource$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/utilities/sources/PulsarSource$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$utilities$config$PulsarSourceConfig$OffsetAutoResetStrategy = new int[PulsarSourceConfig.OffsetAutoResetStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$utilities$config$PulsarSourceConfig$OffsetAutoResetStrategy[PulsarSourceConfig.OffsetAutoResetStrategy.LATEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$utilities$config$PulsarSourceConfig$OffsetAutoResetStrategy[PulsarSourceConfig.OffsetAutoResetStrategy.EARLIEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$utilities$config$PulsarSourceConfig$OffsetAutoResetStrategy[PulsarSourceConfig.OffsetAutoResetStrategy.FAIL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public PulsarSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Arrays.asList(PulsarSourceConfig.PULSAR_SOURCE_TOPIC_NAME, PulsarSourceConfig.PULSAR_SOURCE_SERVICE_ENDPOINT_URL));
        this.topicName = TopicName.get(ConfigUtils.getStringWithAltKeys(typedProperties, PulsarSourceConfig.PULSAR_SOURCE_TOPIC_NAME)).toString();
        this.serviceEndpointURL = ConfigUtils.getStringWithAltKeys(typedProperties, PulsarSourceConfig.PULSAR_SOURCE_SERVICE_ENDPOINT_URL);
        this.adminEndpointURL = ConfigUtils.getStringWithAltKeys(typedProperties, PulsarSourceConfig.PULSAR_SOURCE_ADMIN_ENDPOINT_URL);
        this.pulsarClient = Lazy.lazily(this::initPulsarClient);
        this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> option, long j) {
        Pair<MessageId, MessageId> computeOffsets = computeOffsets(option, j);
        MessageId messageId = (MessageId) computeOffsets.getLeft();
        MessageId messageId2 = (MessageId) computeOffsets.getRight();
        String convertToOffsetString = convertToOffsetString(this.topicName, messageId);
        String convertToOffsetString2 = convertToOffsetString(this.topicName, messageId2);
        return Pair.of(Option.of(transform(this.sparkSession.read().format("pulsar").option("service.url", this.serviceEndpointURL).option("admin.url", this.adminEndpointURL).option("topics", this.topicName).option("startingOffsets", convertToOffsetString).option("endingOffsets", convertToOffsetString2).load())), new StreamerCheckpointV2(convertToOffsetString2));
    }

    @Override // org.apache.hudi.utilities.callback.SourceCommitCallback
    public void onCommit(String str) {
        ackOffset((MessageId) JsonUtils.topicOffsets(str).apply(this.topicName));
    }

    private Dataset<Row> transform(Dataset<Row> dataset) {
        return dataset.drop(PULSAR_META_FIELDS);
    }

    private Pair<MessageId, MessageId> computeOffsets(Option<Checkpoint> option, long j) {
        MessageId decodeStartingOffset = decodeStartingOffset(option);
        MessageId fetchLatestOffset = fetchLatestOffset();
        if (fetchLatestOffset.compareTo(decodeStartingOffset) < 0) {
            throw new HoodieException(String.format("Ending offset (%s) is preceding starting offset (%s) for '%s'", fetchLatestOffset, decodeStartingOffset, this.topicName));
        }
        computeTargetRecordLimit(j, this.props);
        return Pair.of(decodeStartingOffset, fetchLatestOffset);
    }

    private MessageId decodeStartingOffset(Option<Checkpoint> option) {
        return (MessageId) option.map(checkpoint -> {
            return (MessageId) JsonUtils.topicOffsets(checkpoint.getCheckpointKey()).apply(this.topicName);
        }).orElseGet(() -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$utilities$config$PulsarSourceConfig$OffsetAutoResetStrategy[PulsarSourceConfig.OffsetAutoResetStrategy.valueOf(ConfigUtils.getStringWithAltKeys(this.props, PulsarSourceConfig.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY, ((PulsarSourceConfig.OffsetAutoResetStrategy) PulsarSourceConfig.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue()).name())).ordinal()]) {
                case 1:
                    return fetchLatestOffset();
                case 2:
                    return MessageId.earliest;
                case NoNewDataTerminationStrategy.DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN /* 3 */:
                    throw new IllegalArgumentException("No checkpoint has been provided!");
                default:
                    throw new UnsupportedOperationException("Unsupported offset auto-reset strategy");
            }
        });
    }

    private void ackOffset(MessageId messageId) {
        try {
            ((Consumer) this.pulsarConsumer.get()).acknowledgeCumulative(messageId);
        } catch (PulsarClientException e) {
            LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", messageId, this.topicName), e);
            throw new HoodieReadFromSourceException("Failed to ack message for topic", e);
        }
    }

    private MessageId fetchLatestOffset() {
        try {
            return ((Consumer) this.pulsarConsumer.get()).getLastMessageId();
        } catch (PulsarClientException e) {
            LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", this.topicName), e);
            throw new HoodieReadFromSourceException("Failed to fetch latest messageId for topic", e);
        }
    }

    private Consumer<byte[]> subscribeToTopic() {
        try {
            return ((PulsarClient) this.pulsarClient.get()).newConsumer().topic(new String[]{this.topicName}).subscriptionName(String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, Long.valueOf(System.currentTimeMillis()))).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
        } catch (PulsarClientException e) {
            LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", this.topicName), e);
            throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
        }
    }

    private PulsarClient initPulsarClient() {
        try {
            return PulsarClient.builder().serviceUrl(this.serviceEndpointURL).build();
        } catch (PulsarClientException e) {
            LOG.error(String.format("Failed to init Pulsar client connecting to '%s'", this.serviceEndpointURL), e);
            throw new HoodieIOException("Failed to init Pulsar client", e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        shutdownPulsarClient((PulsarClient) this.pulsarClient.get());
    }

    private static Long computeTargetRecordLimit(long j, TypedProperties typedProperties) {
        return j < Long.MAX_VALUE ? Long.valueOf(j) : Long.valueOf(ConfigUtils.getLongWithAltKeys(typedProperties, PulsarSourceConfig.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD));
    }

    private static String convertToOffsetString(String str, MessageId messageId) {
        return JsonUtils.topicOffsets(HoodieConversionUtils.mapAsScalaImmutableMap(Collections.singletonMap(str, messageId)));
    }

    private static void shutdownPulsarClient(PulsarClient pulsarClient) throws PulsarClientException {
        pulsarClient.close();
        try {
            EventLoopGroup eventLoopGroup = ((PulsarClientImpl) pulsarClient).eventLoopGroup();
            if (eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully().await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
        }
        ((Stream) ThreadUtils.collectActiveThreads().stream().sequential()).filter(thread -> {
            return thread.getName().startsWith("pulsar-client-io");
        }).forEach((v0) -> {
            v0.interrupt();
        });
    }
}
