package com.logicalclocks.onlinefs.kafka;

import com.logicalclocks.onlinefs.DatabaseType;
import com.logicalclocks.onlinefs.conf.OnlineFSConf;
import com.logicalclocks.onlinefs.handler.OnlineFsHandler;
import com.logicalclocks.onlinefs.notification.NotificationManager;
import com.logicalclocks.onlinefs.rondb.Committer;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/kafka/KafkaConsumer.class */
public class KafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaConsumer.class);
    private final Properties kafkaProperties = new Properties();
    private final Pattern topicPattern;
    private final List<String> topicList;
    private final OnlineFsHandler onlineFsHandler;
    private final long pollTimeoutMs;
    private final DatabaseType databaseType;
    private final Committer committer;

    public KafkaConsumer(Configuration configuration, Committer committer, DatabaseType databaseType) throws IOException {
        Path path;
        LoggingUtils.log(LOGGER, Level.INFO, "KafkaConsumer thread started");
        this.databaseType = databaseType;
        this.committer = committer;
        this.onlineFsHandler = new OnlineFsHandler();
        switch (databaseType) {
            case VECTORDB:
                path = Paths.get(System.getenv("ONLINEFS_HOME"), "etc", configuration.getString(OnlineFSConf.KAFKA_PROPERTIES_FILE_VECTOR_DB, OnlineFSConf.KAFKA_PROPERTIES_FILE_VECTOR_DB_DEFAULT));
                break;
            case RONDB:
                path = Paths.get(System.getenv("ONLINEFS_HOME"), "etc", configuration.getString(OnlineFSConf.KAFKA_PROPERTIES_FILE, OnlineFSConf.KAFKA_PROPERTIES_FILE_DEFAULT));
                break;
            default:
                throw new IllegalArgumentException(String.format("databaseType '%s' is not allowed.", databaseType));
        }
        FileInputStream fileInputStream = new FileInputStream(path.toString());
        try {
            this.kafkaProperties.load(fileInputStream);
            fileInputStream.close();
            this.kafkaProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            this.kafkaProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteBufferDeserializer.class.getName());
            overrideKafkaPropsFromEnv();
            KafkaAdmin.createInstance(this.kafkaProperties);
            NotificationManager.createInstance(configuration, this.kafkaProperties);
            this.topicPattern = Pattern.compile(configuration.getString(OnlineFSConf.KAFKA_CONSUMER_TOPIC_PATTERN, OnlineFSConf.KAFKA_CONSUMER_TOPIC_PATTERN_DEFAULT));
            this.topicList = (List) Arrays.stream(configuration.getString(OnlineFSConf.KAFKA_CONSUMER_TOPIC_LIST, "").split(",")).filter(str -> {
                return !str.isEmpty();
            }).collect(Collectors.toList());
            this.pollTimeoutMs = configuration.getLong(OnlineFSConf.KAFKA_CONSUMER_POLL_TIMEOUT_MS, OnlineFSConf.KAFKA_CONSUMER_POLL_TIMEOUT_MS_DEFAULT).longValue();
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void run() {
        try {
            try {
                org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer(this.kafkaProperties);
                try {
                    if (this.topicList.isEmpty()) {
                        kafkaConsumer.subscribe(this.topicPattern, new ManageOffsetsOnRebalance(kafkaConsumer, this.databaseType));
                    } else {
                        kafkaConsumer.subscribe(this.topicList, new ManageOffsetsOnRebalance(kafkaConsumer, this.databaseType));
                    }
                    while (true) {
                        ConsumerRecords<K, V> poll = kafkaConsumer.poll(Duration.ofMillis(this.pollTimeoutMs));
                        LoggingUtils.log(LOGGER, Level.DEBUG, "Consumed records", new LogArgument(LogArgumentKey.COUNT, Integer.valueOf(poll.count())), new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
                        this.committer.commitRows(this.onlineFsHandler.filterAndDeserialize(poll, this.databaseType));
                    }
                } finally {
                }
            } catch (Exception e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "KafkaConsumer exception", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
                LoggingUtils.log(LOGGER, Level.INFO, "KafkaConsumer thread closed", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
            }
        } catch (Throwable th) {
            LoggingUtils.log(LOGGER, Level.INFO, "KafkaConsumer thread closed", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
            throw th;
        }
    }

    private void overrideKafkaPropsFromEnv() {
        for (String str : this.kafkaProperties.stringPropertyNames()) {
            String str2 = System.getenv(String.format("KAFKA_%s", str.replaceAll("\\.", ShingleFilter.DEFAULT_FILLER_TOKEN).toUpperCase()));
            if (str2 != null) {
                this.kafkaProperties.setProperty(str, str2);
            }
        }
    }
}
