package com.logicalclocks.onlinefs.notification;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.logicalclocks.onlinefs.conf.OnlineFSConf;
import com.logicalclocks.onlinefs.handler.Row;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/notification/NotificationManager.class */
public class NotificationManager {
    private static NotificationManager instance;
    private final Producer<String, String> producer;
    private final ThreadPoolExecutor notificationThreadPool;
    private static final Object $LOCK = new Object[0];
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) NotificationManager.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static void createInstance(Configuration configuration, Properties properties) {
        synchronized ($LOCK) {
            if (instance == null) {
                instance = new NotificationManager(configuration, properties);
            }
        }
    }

    protected NotificationManager() {
        this.producer = null;
        this.notificationThreadPool = null;
    }

    public NotificationManager(Configuration configuration, Properties properties) {
        this.producer = new KafkaProducer(properties, (Serializer) new StringSerializer(), (Serializer) new StringSerializer());
        int i = configuration.getInt(OnlineFSConf.SERVICE_THREAD_NUMBER, OnlineFSConf.SERVICE_THREADS_DEFAULT.intValue());
        this.notificationThreadPool = new ThreadPoolExecutor(1, i, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(configuration.getInt(OnlineFSConf.SERVICE_REPORTING_QUEUE_SIZE, OnlineFSConf.SERVICE_REPORTING_QUEUE_SIZE_DEFAULT.intValue()) * i), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void sendNotifications(List<Row> list) {
        this.notificationThreadPool.submit(() -> {
            String notificationTopicName = ((Row) list.get(0)).getFeatureGroupDto().getNotificationTopicName();
            if (Strings.isNullOrEmpty(notificationTopicName)) {
                return;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Row row = (Row) it.next();
                try {
                    this.producer.send(new ProducerRecord<>(notificationTopicName, row.getKey(), getValue(row)));
                    LoggingUtils.log(LOGGER, Level.DEBUG, "Sent notification to topic", new LogArgument(LogArgumentKey.TOPIC, notificationTopicName), new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, row.getFeatureGroupDto().getFeaturestoreId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()));
                } catch (Exception e) {
                    LoggingUtils.log(LOGGER, Level.ERROR, "Failed to send notification to topic", e, new LogArgument(LogArgumentKey.TOPIC, notificationTopicName), new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, row.getFeatureGroupDto().getFeaturestoreId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()));
                }
            }
        });
    }

    protected String getValue(Row row) throws ExecutionException, IOException {
        return objectMapper.writeValueAsString(new Notification(row));
    }

    public static void close() {
        synchronized ($LOCK) {
            instance.producer.close();
            instance.notificationThreadPool.shutdown();
        }
    }

    public static NotificationManager getInstance() {
        return instance;
    }
}
