package org.apache.hudi.utilities.sources.helpers.gcs;

import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.class */
public class PubsubMessagesFetcher {
    private static final int DEFAULT_BATCH_SIZE_ACK_API = 10;
    private static final int ACK_PRODUCER_THREAD_POOL_SIZE = 3;
    private final ExecutorService threadPool;
    private final String googleProjectId;
    private final String pubsubSubscriptionId;
    private final int batchSize;
    private final int maxMessagesPerSync;
    private final long maxFetchTimePerSyncSecs;
    private final SubscriberStubSettings subscriberStubSettings;
    private final PubsubQueueClient pubsubQueueClient;
    private static final long MAX_WAIT_TIME_TO_ACK_MESSAGES = TimeUnit.MINUTES.toMillis(1);
    private static final Logger LOG = LoggerFactory.getLogger(PubsubMessagesFetcher.class);

    public PubsubMessagesFetcher(String str, String str2, int i, int i2, long j, PubsubQueueClient pubsubQueueClient) {
        this.threadPool = Executors.newFixedThreadPool(3);
        this.googleProjectId = str;
        this.pubsubSubscriptionId = str2;
        this.batchSize = i;
        this.maxMessagesPerSync = i2;
        this.maxFetchTimePerSyncSecs = j;
        try {
            this.subscriberStubSettings = SubscriberStubSettings.newBuilder().setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE)).build()).build();
            this.pubsubQueueClient = pubsubQueueClient;
        } catch (IOException e) {
            throw new HoodieException("Error creating subscriber stub settings", e);
        }
    }

    public PubsubMessagesFetcher(String str, String str2, int i, int i2, long j) {
        this(str, str2, i, i2, j, new PubsubQueueClient());
    }

    public List<ReceivedMessage> fetchMessages() {
        ArrayList arrayList = new ArrayList();
        try {
            SubscriberStub subscriber = this.pubsubQueueClient.getSubscriber(this.subscriberStubSettings);
            Throwable th = null;
            try {
                try {
                    String format = ProjectSubscriptionName.format(this.googleProjectId, this.pubsubSubscriptionId);
                    long currentTimeMillis = System.currentTimeMillis();
                    long numUnAckedMessages = this.pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId);
                    LOG.info("Found unacked messages " + numUnAckedMessages);
                    while (arrayList.size() < numUnAckedMessages && arrayList.size() < this.maxMessagesPerSync && System.currentTimeMillis() - currentTimeMillis < this.maxFetchTimePerSyncSecs * 1000) {
                        arrayList.addAll(this.pubsubQueueClient.makePullRequest(subscriber, format, this.batchSize).getReceivedMessagesList());
                    }
                    if (subscriber != null) {
                        if (0 != 0) {
                            try {
                                subscriber.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            subscriber.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new HoodieException("Error when fetching metadata", e);
        }
    }

    public void sendAcks(List<String> list) throws IOException {
        try {
            SubscriberStub subscriber = this.pubsubQueueClient.getSubscriber(this.subscriberStubSettings);
            Throwable th = null;
            try {
                try {
                    CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, (int) Math.ceil(list.size() / 10.0d)).parallel().boxed().map(num -> {
                        return getTask(subscriber, list, num.intValue());
                    }).toArray(i -> {
                        return new CompletableFuture[i];
                    })).get(MAX_WAIT_TIME_TO_ACK_MESSAGES, TimeUnit.MILLISECONDS);
                    LOG.debug("Flushed out all outstanding acknowledged messages: " + list.size());
                    if (subscriber != null) {
                        if (0 != 0) {
                            try {
                                subscriber.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            subscriber.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IOException("Failed to ack messages from PubSub", e);
        }
    }

    private CompletableFuture<Void> getTask(SubscriberStub subscriberStub, List<String> list, int i) {
        String format = ProjectSubscriptionName.format(this.googleProjectId, this.pubsubSubscriptionId);
        List<String> subList = list.subList(i, Math.min(i + 10, list.size()));
        return CompletableFuture.runAsync(() -> {
            this.pubsubQueueClient.makeAckRequest(subscriberStub, format, subList);
        }, this.threadPool);
    }
}
