package org.apache.hudi.utilities.ingestion;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/ingestion/HoodieIngestionService.class */
public abstract class HoodieIngestionService extends HoodieAsyncService {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIngestionService.class);
    protected HoodieIngestionConfig ingestionConfig;

    /* loaded from: input_file:org/apache/hudi/utilities/ingestion/HoodieIngestionService$HoodieIngestionConfig.class */
    public static class HoodieIngestionConfig extends HoodieConfig {
        public static final ConfigProperty<Boolean> INGESTION_IS_CONTINUOUS = ConfigProperty.key("hoodie.utilities.ingestion.is.continuous").defaultValue(false).markAdvanced().withDocumentation("Indicate if the ingestion runs in a continuous loop.");
        public static final ConfigProperty<Integer> INGESTION_MIN_SYNC_INTERNAL_SECONDS = ConfigProperty.key("hoodie.utilities.ingestion.min.sync.internal.seconds").defaultValue(0).markAdvanced().withDocumentation("the minimum sync interval of each ingestion in continuous mode");

        /* loaded from: input_file:org/apache/hudi/utilities/ingestion/HoodieIngestionService$HoodieIngestionConfig$Builder.class */
        public static class Builder {
            private final HoodieIngestionConfig ingestionConfig = new HoodieIngestionConfig();

            public Builder isContinuous(boolean z) {
                this.ingestionConfig.setValue(HoodieIngestionConfig.INGESTION_IS_CONTINUOUS, String.valueOf(z));
                return this;
            }

            public Builder withMinSyncInternalSeconds(int i) {
                this.ingestionConfig.setValue(HoodieIngestionConfig.INGESTION_MIN_SYNC_INTERNAL_SECONDS, String.valueOf(i));
                return this;
            }

            public HoodieIngestionConfig build() {
                this.ingestionConfig.setDefaults(HoodieIngestionConfig.class.getName());
                return this.ingestionConfig;
            }
        }

        public static Builder newBuilder() {
            return new Builder();
        }
    }

    public HoodieIngestionService(HoodieIngestionConfig hoodieIngestionConfig) {
        this.ingestionConfig = hoodieIngestionConfig;
    }

    public void startIngestion() {
        if (!this.ingestionConfig.getBoolean(HoodieIngestionConfig.INGESTION_IS_CONTINUOUS).booleanValue()) {
            LOG.info("Ingestion service starts running in run-once mode");
            ingestOnce();
            LOG.info("Ingestion service (run-once mode) has been shut down.");
        } else {
            LOG.info("Ingestion service starts running in continuous mode");
            start((v1) -> {
                return onIngestionCompletes(v1);
            });
            try {
                waitForShutdown();
                LOG.info("Ingestion service (continuous mode) has been shut down.");
            } catch (Exception e) {
                throw new HoodieIngestionException("Ingestion service was shut down with exception.", e);
            }
        }
    }

    @Override // org.apache.hudi.async.HoodieAsyncService
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        return Pair.of(CompletableFuture.supplyAsync(() -> {
            while (!isShutdownRequested()) {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    ingestOnce();
                    if (!requestShutdownIfNeeded(Option.empty())) {
                        sleepBeforeNextIngestion(currentTimeMillis);
                    }
                } finally {
                    newFixedThreadPool.shutdownNow();
                }
            }
            return true;
        }, newFixedThreadPool), newFixedThreadPool);
    }

    public abstract void ingestOnce();

    protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> option) {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sleepBeforeNextIngestion(long j) {
        try {
            long longValue = this.ingestionConfig.getLongOrDefault(HoodieIngestionConfig.INGESTION_MIN_SYNC_INTERNAL_SECONDS).longValue();
            long currentTimeMillis = (longValue * 1000) - (System.currentTimeMillis() - j);
            if (currentTimeMillis > 0) {
                LOG.info(String.format("Last ingestion took less than min sync interval: %d s; sleep for %.2f s", Long.valueOf(longValue), Double.valueOf(currentTimeMillis / 1000.0d)));
                Thread.sleep(currentTimeMillis);
            }
        } catch (InterruptedException e) {
            throw new HoodieIngestionException("Ingestion service (continuous mode) was interrupted during sleep.", e);
        }
    }

    protected boolean onIngestionCompletes(boolean z) {
        return true;
    }

    public abstract Option<HoodieIngestionMetrics> getMetrics();

    public void close() {
        if (isShutdown()) {
            return;
        }
        shutdown(true);
    }
}
