package com.logicalclocks.onlinefs;

import com.logicalclocks.onlinefs.conf.ConfigurationBuilder;
import com.logicalclocks.onlinefs.conf.OnlineFSConf;
import com.logicalclocks.onlinefs.kafka.KafkaConsumer;
import com.logicalclocks.onlinefs.notification.NotificationManager;
import com.logicalclocks.onlinefs.rondb.Committer;
import com.logicalclocks.onlinefs.rondb.SharedCommitHelper;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/OnlineFS.class */
public class OnlineFS implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OnlineFS.class);

    public OnlineFS() throws ConfigurationException, IOException {
        Configuration configuration = ConfigurationBuilder.getConfiguration();
        new HTTPServer(configuration.getInt(OnlineFSConf.METRICS_PORT, OnlineFSConf.METRICS_PORT_DEFAULT.intValue()));
        SharedCommitHelper.initInstance(configuration);
        int i = configuration.getInt("service.ronDbThreadNumber", OnlineFSConf.SERVICE_RON_DB_THREAD_NUMBER_DEFAULT.intValue());
        int i2 = configuration.getInt("service.ronDbThreadNumber", OnlineFSConf.SERVICE_VECTOR_DB_THREAD_NUMBER_DEFAULT.intValue());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(i2);
        for (int i3 = 0; i3 < i - 1; i3++) {
            newFixedThreadPool.submit(() -> {
                run(configuration, DatabaseType.RONDB);
            });
        }
        for (int i4 = 0; i4 < i2; i4++) {
            newFixedThreadPool2.submit(() -> {
                run(configuration, DatabaseType.VECTORDB);
            });
        }
        run(configuration, DatabaseType.RONDB);
    }

    public void run(Configuration configuration, DatabaseType databaseType) {
        try {
            new KafkaConsumer(configuration, new Committer(configuration, databaseType), databaseType).run();
        } catch (IOException e) {
            LoggingUtils.log(LOGGER, Level.ERROR, "Problem when initialising KafkaConsumer", e);
        }
    }

    public static void main(String[] strArr) {
        LoggingUtils.log(LOGGER, Level.INFO, "Starting online feature store pipe");
        try {
            try {
                new OnlineFS();
                SharedCommitHelper.getInstance().closeVectorDatabase();
                NotificationManager.close();
            } catch (Exception e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "Problem while starting online feature store pipe", e);
                SharedCommitHelper.getInstance().closeVectorDatabase();
                NotificationManager.close();
            }
            System.exit(0);
        } catch (Throwable th) {
            SharedCommitHelper.getInstance().closeVectorDatabase();
            NotificationManager.close();
            throw th;
        }
    }
}
