package com.logicalclocks.onlinefs.rondb;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.EvictingQueue;
import com.logicalclocks.onlinefs.conf.OnlineFSConf;
import com.logicalclocks.onlinefs.handler.Embedding;
import com.logicalclocks.onlinefs.handler.Row;
import com.logicalclocks.onlinefs.hopsworks.HopsworksCacheKey;
import com.logicalclocks.onlinefs.hopsworks.api.FeatureGroupApi;
import com.logicalclocks.onlinefs.hopsworks.api.FeatureStoreApi;
import com.logicalclocks.onlinefs.hopsworks.api.FeatureViewApi;
import com.logicalclocks.onlinefs.hopsworks.api.SchemaApi;
import com.logicalclocks.onlinefs.hopsworks.dto.FeatureGroupDto;
import com.logicalclocks.onlinefs.hopsworks.dto.FeatureStoreDto;
import com.logicalclocks.onlinefs.hopsworks.dto.FeatureViewDto;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import com.logicalclocks.onlinefs.util.OnlineFeatureStoreException;
import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.Constants;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.SessionFactory;
import io.hops.hopsworks.vectordb.VectorDatabase;
import io.hops.hopsworks.vectordb.VectorDatabaseFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javassist.CannotCompileException;
import javassist.NotFoundException;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/rondb/SharedCommitHelper.class */
public class SharedCommitHelper {
    private static final String FEATURE_STORE_SUFFIX = "_featurestore.db";
    private SessionFactory sessionFactory;
    private Properties sessionProperties;
    private Properties vectorDatabaseProperties;
    private ThreadPoolExecutor timeThreadPool;
    private Integer getSessionRetrySleepMs;
    private VectorDatabase vectorDatabase;
    private LoadingCache<String, HopsworksMetadata> featureGroupMap;
    private LoadingCache<Integer, FeatureStoreDto> featureStoreMap;
    private LoadingCache<String, List<FeatureViewDto>> featureViewMap;
    private Queue<Integer> subjectBlacklist;
    private boolean useDynamicObjectCache;
    private boolean useSessionCache;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SharedCommitHelper.class);
    private static SharedCommitHelper instance = null;
    protected final ConcurrentHashMap<String, Class<?>> dynamicObjects = new ConcurrentHashMap<>();
    private final List<Schema.Type> complexTypes = new ArrayList(Arrays.asList(Schema.Type.ARRAY, Schema.Type.MAP, Schema.Type.RECORD));
    private final Counter successCounter = Counter.build().name("onlinefs_clusterj_success_write_counter").labelNames("feature_group_id").help("Number of ndb success row writes").register();
    private final Counter errorCounter = Counter.build().name("onlinefs_clusterj_error_write_counter").labelNames("feature_group_id").help("Number of ndb error row writes").register();
    private final Gauge sessionGauge = Gauge.build().name("onlinefs_clusterj_session").labelNames("db").help("Number of open sessions").register();
    private final Counter rowFeatureGroupCounter = Counter.build().name("onlinefs_row_counter").labelNames("feature_group_id").help("Number of messages seen for each feature group").register();
    private final Counter rowFeatureGroupBytesCounter = Counter.build().name("onlinefs_row_total_bytes").labelNames("feature_group_id").help("Total bytes of row in kafka message.").register();
    private final Summary featureGroupDeserTimeSummary = Summary.build().quantile(0.5d, 0.01d).quantile(0.9d, 0.01d).quantile(0.95d, 0.01d).quantile(0.99d, 0.01d).name("onlinefs_row_deserialization_time").labelNames("feature_group_id").help("Total time to deserialize a row in seconds.").register();
    private final Summary featureGroupProcessingTimeSummary = Summary.build().quantile(0.5d, 0.01d).quantile(0.9d, 0.01d).quantile(0.95d, 0.01d).quantile(0.99d, 0.01d).name("onlinefs_row_processing_time").labelNames("feature_group_id").help("Total time needed from reading row from Kafka until committed to RonDB in seconds.").register();
    private final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
    private final TimeConversions.TimestampMicrosConversion timestampConversion = new TimeConversions.TimestampMicrosConversion();
    private final TimeConversions.DateConversion dateConversion = new TimeConversions.DateConversion();
    protected ClassGenerator classGenerator = new ClassGenerator();

    /* loaded from: input_file:com/logicalclocks/onlinefs/rondb/SharedCommitHelper$HopsworksMetadata.class */
    public static class HopsworksMetadata {
        private Schema schema;
        private DatumReader<GenericRecord> datumReader;
        private Embedding embedding;
        private FeatureGroupDto featureGroupDto;

        public HopsworksMetadata() {
        }

        public HopsworksMetadata(Schema schema, DatumReader<GenericRecord> datumReader, Embedding embedding, FeatureGroupDto featureGroupDto) {
            this.schema = schema;
            this.datumReader = datumReader;
            this.embedding = embedding;
            this.featureGroupDto = featureGroupDto;
        }

        public Schema getSchema() {
            return this.schema;
        }

        public void setSchema(Schema schema) {
            this.schema = schema;
        }

        public DatumReader<GenericRecord> getDatumReader() {
            return this.datumReader;
        }

        public void setDatumReader(DatumReader<GenericRecord> datumReader) {
            this.datumReader = datumReader;
        }

        public Embedding getEmbedding() {
            return this.embedding;
        }

        public void setEmbedding(Embedding embedding) {
            this.embedding = embedding;
        }

        public FeatureGroupDto getFeatureGroupDto() {
            return this.featureGroupDto;
        }

        public void setFeatureGroupDto(FeatureGroupDto featureGroupDto) {
            this.featureGroupDto = featureGroupDto;
        }
    }

    public static SharedCommitHelper getInstance() {
        if (instance == null) {
            throw new NullPointerException("SharedCommitHelper not initialized.");
        }
        return instance;
    }

    public static void initInstance(Configuration configuration) {
        if (instance != null) {
            throw new UnsupportedOperationException("SharedCommitHelper can only be initialized once.");
        }
        instance = new SharedCommitHelper(configuration);
        LoggingUtils.log(LOGGER, Level.INFO, "SharedCommitHelper initialized");
    }

    protected SharedCommitHelper(Configuration configuration) {
        int i = configuration.getInt(OnlineFSConf.SERVICE_THREAD_NUMBER, OnlineFSConf.SERVICE_THREADS_DEFAULT.intValue());
        this.timeThreadPool = new ThreadPoolExecutor(1, i, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(configuration.getInt(OnlineFSConf.SERVICE_REPORTING_QUEUE_SIZE, OnlineFSConf.SERVICE_REPORTING_QUEUE_SIZE_DEFAULT.intValue()) * i), new ThreadPoolExecutor.CallerRunsPolicy());
        this.getSessionRetrySleepMs = Integer.valueOf(configuration.getInt(OnlineFSConf.SERVICE_GET_SESSION_RETRY_SLEEP_MS, OnlineFSConf.SERVICE_GET_SESSION_RETRY_SLEEP_MS_DEFAULT.intValue()));
        this.useDynamicObjectCache = configuration.getBoolean(OnlineFSConf.RONDB_USE_DYNAMIC_OBJECT_CACHE, true);
        this.useSessionCache = configuration.getBoolean(OnlineFSConf.RONDB_USE_SESSION_CACHE, true);
        this.sessionProperties = new Properties();
        this.sessionProperties.setProperty(Constants.PROPERTY_CONNECTION_POOL_SIZE, configuration.getString(OnlineFSConf.RONDB_CONNECTION_POOL_SIZE, OnlineFSConf.RONDB_CONNECTION_POOL_SIZE_DEFAULT));
        this.sessionProperties.setProperty(Constants.PROPERTY_CLUSTER_MAX_CACHED_SESSIONS, configuration.getString(OnlineFSConf.RONDB_MAX_CACHED_SESSIONS, OnlineFSConf.RONDB_MAX_CACHED_SESSIONS_DEFAULT));
        this.sessionProperties.setProperty(Constants.PROPERTY_CLUSTER_MAX_CACHED_INSTANCES, configuration.getString(OnlineFSConf.RONDB_CONNECTION_MAX_CACHED_INSTANCES, "1024"));
        this.sessionProperties.setProperty(Constants.PROPERTY_CONNECTION_RECONNECT_TIMEOUT, configuration.getString(OnlineFSConf.RONDB_RECONNECT_TIMEOUT, OnlineFSConf.RONDB_RECONNECT_TIMEOUT_DEFAULT));
        this.sessionProperties.setProperty(Constants.PROPERTY_CLUSTER_MAX_TRANSACTIONS, configuration.getString(OnlineFSConf.RONDB_CONNECTION_MAX_TRANSACTIONS, "1024"));
        this.sessionProperties.setProperty(Constants.PROPERTY_CLUSTER_CONNECTSTRING, configuration.getString(OnlineFSConf.RONDB_CONNECTION_STR, OnlineFSConf.RONDB_CONNECTION_STR_DEFAULT));
        this.vectorDatabaseProperties = new Properties();
        this.vectorDatabaseProperties.setProperty(OnlineFSConf.OPENSEARCH_HOST, configuration.getString(OnlineFSConf.OPENSEARCH_HOST, OnlineFSConf.OPENSEARCH_HOST_DEFAULT));
        this.vectorDatabaseProperties.setProperty(OnlineFSConf.OPENSEARCH_USER_NAME, configuration.getString(OnlineFSConf.OPENSEARCH_USER_NAME, "onlinefs"));
        this.vectorDatabaseProperties.setProperty(OnlineFSConf.OPENSEARCH_PASSWORD, configuration.getString(OnlineFSConf.OPENSEARCH_PASSWORD, "onlinefs"));
        this.vectorDatabaseProperties.setProperty(OnlineFSConf.HOPSWORKS_TRUSTSTORE_LOCATION, configuration.getString(OnlineFSConf.HOPSWORKS_TRUSTSTORE_LOCATION, OnlineFSConf.HOPSWORKS_TRUSTSTORE_LOCATION_DEFAULT));
        this.featureGroupMap = CacheBuilder.newBuilder().maximumSize(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_GROUP_CACHE_MAX_SIZE, OnlineFSConf.SERVICE_FEATURE_GROUP_CACHE_MAX_SIZE_DEFAULT.intValue())).expireAfterWrite(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_GROUP_CACHE_EXPIRE, OnlineFSConf.SERVICE_FEATURE_GROUP_CACHE_EXPIRE_DEFAULT.intValue()), TimeUnit.MINUTES).removalListener(removalNotification -> {
            HopsworksCacheKey parseKey = HopsworksCacheKey.parseKey((String) removalNotification.getKey());
            LoggingUtils.log(LOGGER, Level.DEBUG, "Invalidating cache for schema", new LogArgument(LogArgumentKey.PROJECT_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.PROJECT_ID)), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_STORE_ID)), new LogArgument(LogArgumentKey.SUBJECT_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.SUBJECT_ID)), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_GROUP_ID)));
        }).build(new CacheLoader<String, HopsworksMetadata>() { // from class: com.logicalclocks.onlinefs.rondb.SharedCommitHelper.1
            @Override // com.google.common.cache.CacheLoader
            public HopsworksMetadata load(String str) throws Exception {
                HopsworksCacheKey parseKey = HopsworksCacheKey.parseKey(str);
                Integer valueOf = Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.PROJECT_ID)));
                Integer valueOf2 = Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_STORE_ID)));
                Integer valueOf3 = Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_GROUP_ID)));
                Integer valueOf4 = Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.SUBJECT_ID)));
                HopsworksMetadata hopsworksMetadata = new HopsworksMetadata();
                try {
                    FeatureGroupDto featureGroupById = FeatureGroupApi.getFeatureGroupById(valueOf, valueOf2, valueOf3);
                    LoggingUtils.log(SharedCommitHelper.LOGGER, Level.DEBUG, "Retrieved feature group", new LogArgument(LogArgumentKey.PROJECT_ID, valueOf), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, valueOf2), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, valueOf3));
                    hopsworksMetadata.setFeatureGroupDto(featureGroupById);
                    Schema avroSchema = SharedCommitHelper.this.getAvroSchema(valueOf, valueOf4);
                    Schema convertComplexType = SharedCommitHelper.this.convertComplexType(avroSchema, featureGroupById);
                    FeatureGroupDto.EmbeddingDto embeddingIndex = featureGroupById.getEmbeddingIndex();
                    if (embeddingIndex != null) {
                        Embedding embedding = new Embedding();
                        embedding.setColPrefix(embeddingIndex.getColPrefix());
                        embedding.setIndexName(embeddingIndex.getIndexName());
                        embedding.setFeatures((Map) embeddingIndex.getFeatures().stream().map(embeddingFeatureDto -> {
                            return new Embedding.EmbeddingFeature(embeddingFeatureDto.getName(), avroSchema.getField(embeddingFeatureDto.getName()).schema());
                        }).collect(Collectors.toMap((v0) -> {
                            return v0.getName();
                        }, embeddingFeature -> {
                            return embeddingFeature;
                        })));
                        String colPrefix = embedding.getColPrefix() == null ? "" : embedding.getColPrefix();
                        hopsworksMetadata.getFeatureGroupDto().setPrimaryKeyWithPrefix((List) featureGroupById.getFeatures().stream().filter((v0) -> {
                            return v0.getPrimary();
                        }).map(featureDto -> {
                            return colPrefix + featureDto.getName();
                        }).collect(Collectors.toList()));
                        hopsworksMetadata.getFeatureGroupDto().setBytesFieldsWithPrefix((Set) convertComplexType.getFields().stream().filter(field -> {
                            return field.schema().getTypes().get(1).getType().equals(Schema.Type.BYTES);
                        }).map((v0) -> {
                            return v0.name();
                        }).map(str2 -> {
                            return colPrefix + str2;
                        }).collect(Collectors.toSet()));
                        hopsworksMetadata.setEmbedding(embedding);
                    }
                    SharedCommitHelper.this.unloadSchema(SharedCommitHelper.getDatabaseName(convertComplexType.getNamespace()), convertComplexType.getName());
                    GenericDatumReader genericDatumReader = new GenericDatumReader(convertComplexType);
                    hopsworksMetadata.setSchema(convertComplexType);
                    hopsworksMetadata.setDatumReader(genericDatumReader);
                    return hopsworksMetadata;
                } catch (IOException e) {
                    if (e.getMessage().contains("error: 404 errorCode=270009")) {
                        SharedCommitHelper.getInstance().getSubjectBlacklist().add(valueOf4);
                    }
                    throw e;
                }
            }
        });
        this.featureStoreMap = CacheBuilder.newBuilder().maximumSize(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_STORE_CACHE_MAX_SIZE, OnlineFSConf.SERVICE_FEATURE_STORE_CACHE_MAX_SIZE_DEFAULT.intValue())).expireAfterWrite(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_STORE_CACHE_EXPIRE, OnlineFSConf.SERVICE_FEATURE_STORE_CACHE_EXPIRE_DEFAULT.intValue()), TimeUnit.MINUTES).removalListener(removalNotification2 -> {
            LoggingUtils.log(LOGGER, Level.DEBUG, "Invalidating cache for feature store", new LogArgument(LogArgumentKey.PROJECT_ID, (Integer) removalNotification2.getKey()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, ((FeatureStoreDto) removalNotification2.getValue()).getFeaturestoreId()));
        }).build(new CacheLoader<Integer, FeatureStoreDto>() { // from class: com.logicalclocks.onlinefs.rondb.SharedCommitHelper.2
            @Override // com.google.common.cache.CacheLoader
            public FeatureStoreDto load(Integer num) throws Exception {
                return FeatureStoreApi.getProjectFeatureStore(num);
            }
        });
        this.featureViewMap = CacheBuilder.newBuilder().maximumSize(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_VIEW_CACHE_MAX_SIZE, OnlineFSConf.SERVICE_FEATURE_VIEW_CACHE_MAX_SIZE_DEFAULT.intValue())).expireAfterWrite(configuration.getInt(OnlineFSConf.SERVICE_FEATURE_VIEW_CACHE_EXPIRE, OnlineFSConf.SERVICE_FEATURE_VIEW_CACHE_EXPIRE_DEFAULT.intValue()), TimeUnit.MINUTES).removalListener(removalNotification3 -> {
            HopsworksCacheKey parseKey = HopsworksCacheKey.parseKey((String) removalNotification3.getKey());
            LoggingUtils.log(LOGGER, Level.DEBUG, "Invalidating cache for feature view", new LogArgument(LogArgumentKey.PROJECT_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.PROJECT_ID)), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_STORE_ID)), new LogArgument(LogArgumentKey.SUBJECT_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.SUBJECT_ID)), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_GROUP_ID)));
        }).build(new CacheLoader<String, List<FeatureViewDto>>() { // from class: com.logicalclocks.onlinefs.rondb.SharedCommitHelper.3
            @Override // com.google.common.cache.CacheLoader
            public List<FeatureViewDto> load(String str) throws Exception {
                HopsworksCacheKey parseKey = HopsworksCacheKey.parseKey(str);
                return FeatureViewApi.getFeatureViewsByfeatureGroupId(Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.PROJECT_ID))), Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_STORE_ID))), Integer.valueOf(Integer.parseInt(parseKey.getSubKey(HopsworksCacheKey.SubKey.FEATURE_GROUP_ID)))).getItems();
            }
        });
        this.subjectBlacklist = EvictingQueue.create(configuration.getInt(OnlineFSConf.SERVICE_MAX_BLACKLIST_SIZE, OnlineFSConf.SERVICE_MAX_BLACKLIST_SIZE_DEFAULT.intValue()));
    }

    public synchronized SessionFactory getSessionFactory() {
        if (this.sessionFactory == null) {
            this.sessionFactory = ClusterJHelper.getSessionFactory(this.sessionProperties);
        }
        return this.sessionFactory;
    }

    public Session getSession(String str) throws InterruptedException {
        while (true) {
            try {
                Session session = getSessionFactory().getSession(str);
                getSessionGauge().labels(str).inc();
                return session;
            } catch (Exception e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "ClusterJ exception getting session - Wait", e, new LogArgument(LogArgumentKey.DATABASE_NAME, str));
                Thread.sleep(this.getSessionRetrySleepMs.intValue());
            }
        }
    }

    public void closeSession(Session session, String str) {
        if (this.useSessionCache) {
            closeSessionWithCache(session, str);
        } else {
            closeSessionWithoutCache(session, str);
        }
    }

    private void closeSessionWithCache(Session session, String str) {
        if (!session.isClosed()) {
            LoggingUtils.log(LOGGER, Level.DEBUG, "Returning session", new LogArgument(LogArgumentKey.SESSION, session), new LogArgument(LogArgumentKey.DATABASE_NAME, str));
            session.closeCache();
        }
        getSessionGauge().labels(str).dec();
    }

    private void closeSessionWithoutCache(Session session, String str) {
        if (!session.isClosed()) {
            LoggingUtils.log(LOGGER, Level.DEBUG, "Closing session", new LogArgument(LogArgumentKey.SESSION, session), new LogArgument(LogArgumentKey.DATABASE_NAME, str));
            session.close();
        }
        getSessionGauge().labels(str).dec();
    }

    public void releaseSessionObject(Session session, Object obj) {
        if (this.useDynamicObjectCache) {
            session.releaseCache(obj, obj.getClass());
        } else {
            session.release(obj);
        }
    }

    public void saveOffset(Session session, TopicPartition topicPartition, Long l, String str, List<Object> list) {
        KafkaOffsets kafkaOffsets = (KafkaOffsets) session.newInstance(KafkaOffsets.class);
        list.add(kafkaOffsets);
        kafkaOffsets.setConsumerGroup(str);
        kafkaOffsets.setTopic(topicPartition.topic());
        kafkaOffsets.setPartition((short) topicPartition.partition());
        kafkaOffsets.setOffset(l.longValue());
        LoggingUtils.log(LOGGER, Level.DEBUG, "Saving kafka offset", new LogArgument(LogArgumentKey.CONSUMER_GROUP_TYPE, kafkaOffsets.getConsumerGroup()), new LogArgument(LogArgumentKey.TOPIC, kafkaOffsets.getTopic()), new LogArgument(LogArgumentKey.PARTITION, Short.valueOf(kafkaOffsets.getPartition())), new LogArgument(LogArgumentKey.OFFSET, Long.valueOf(kafkaOffsets.getOffset())));
        session.savePersistent(kafkaOffsets);
    }

    public Class<?> getRowClass(String str, String str2) throws NotFoundException, CannotCompileException {
        String str3 = str + "_" + str2;
        Class<?> cls = this.dynamicObjects.get(str3);
        return cls == null ? getRowClassSynchronized(str3, str2) : cls;
    }

    private synchronized Class<?> getRowClassSynchronized(String str, String str2) throws NotFoundException, CannotCompileException {
        if (this.dynamicObjects.containsKey(str)) {
            return this.dynamicObjects.get(str);
        }
        Class<?> generateClass = this.classGenerator.generateClass(str, str2);
        this.dynamicObjects.put(str, generateClass);
        return generateClass;
    }

    public void recordProcessingTime(List<Row> list, Instant instant) {
        this.timeThreadPool.submit(() -> {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            int i = 0;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String num = ((Row) it.next()).getFeatureGroupId().toString();
                if (i % 10 == 0) {
                    this.featureGroupProcessingTimeSummary.labels(num).observe(Duration.between(r0.getStartProcessingTimestamp(), instant).toNanos());
                    this.featureGroupDeserTimeSummary.labels(num).observe(r0.getDeserializationNanos().longValue());
                }
                hashMap.put(num, Long.valueOf(((Long) hashMap.getOrDefault(num, 0L)).longValue() + r0.getByteSize()));
                hashMap2.put(num, Integer.valueOf(((Integer) hashMap2.getOrDefault(num, 0)).intValue() + 1));
                i++;
            }
            hashMap.forEach((str, l) -> {
                this.rowFeatureGroupBytesCounter.labels(str).inc(l.longValue());
            });
            hashMap2.forEach((str2, num2) -> {
                this.rowFeatureGroupCounter.labels(str2).inc(num2.intValue());
            });
        });
    }

    private Schema getAvroSchema(Integer num, Integer num2) throws IOException, OnlineFeatureStoreException {
        String schema = SchemaApi.getSubjectById(num, num2).getSchema();
        LoggingUtils.log(LOGGER, Level.INFO, "Retrieved schema", new LogArgument(LogArgumentKey.PROJECT_ID, num), new LogArgument(LogArgumentKey.SUBJECT_ID, num2));
        LoggingUtils.log(LOGGER, Level.DEBUG, "Schema string", new LogArgument(LogArgumentKey.SUBJECT_ID, num2), new LogArgument(LogArgumentKey.SCHEMA, schema));
        return new Schema.Parser().parse(schema);
    }

    public static String getDatabaseName(String str) {
        return str.substring(0, str.indexOf(FEATURE_STORE_SUFFIX));
    }

    protected Schema convertComplexType(Schema schema, FeatureGroupDto featureGroupDto) {
        HashMap hashMap = new HashMap();
        List list = (List) schema.getFields().stream().map(field -> {
            if (!field.schema().getTypes().stream().anyMatch(schema2 -> {
                return this.complexTypes.contains(schema2.getType());
            })) {
                return new Schema.Field(field, field.schema());
            }
            hashMap.put(field.name(), field.schema());
            return new Schema.Field(field, SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion());
        }).collect(Collectors.toList());
        featureGroupDto.setComplexFeatures(hashMap);
        Schema createRecord = Schema.createRecord(schema.getName(), null, schema.getNamespace(), schema.isError(), list);
        LoggingUtils.log(LOGGER, Level.DEBUG, "Encoded complex types for schema", new LogArgument(LogArgumentKey.SCHEMA, createRecord));
        return createRecord;
    }

    public void unloadSchema(String str, String str2) {
        Session session = null;
        try {
            session = getSession(str);
            session.unloadSchema(getRowClass(str, str2));
            closeSession(session, str);
        } catch (InterruptedException e) {
            LoggingUtils.log(LOGGER, Level.WARN, "Interrupted unloading of schema!", e, new LogArgument(LogArgumentKey.DATABASE_NAME, str), new LogArgument(LogArgumentKey.TABLE_NAME, str2));
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            LoggingUtils.log(LOGGER, Level.ERROR, "Could not unload schema", e2, new LogArgument(LogArgumentKey.DATABASE_NAME, str), new LogArgument(LogArgumentKey.TABLE_NAME, str2));
            if (session != null) {
                closeSessionWithoutCache(session, str);
            }
        }
    }

    public synchronized VectorDatabase getVectorDatabase() throws IOException {
        if (this.vectorDatabase == null) {
            this.vectorDatabase = VectorDatabaseFactory.getOpensearchDatabase(this.vectorDatabaseProperties.getProperty(OnlineFSConf.OPENSEARCH_HOST), this.vectorDatabaseProperties.getProperty(OnlineFSConf.OPENSEARCH_USER_NAME), this.vectorDatabaseProperties.getProperty(OnlineFSConf.OPENSEARCH_PASSWORD), this.vectorDatabaseProperties.getProperty(OnlineFSConf.HOPSWORKS_TRUSTSTORE_LOCATION), null);
        }
        return this.vectorDatabase;
    }

    public synchronized void closeVectorDatabase() {
        if (this.vectorDatabase != null) {
            this.vectorDatabase.close();
        }
    }

    public LoadingCache<String, HopsworksMetadata> getFeatureGroupMap() {
        return this.featureGroupMap;
    }

    public LoadingCache<Integer, FeatureStoreDto> getFeatureStoreMap() {
        return this.featureStoreMap;
    }

    public LoadingCache<String, List<FeatureViewDto>> getFeatureViewMap() {
        return this.featureViewMap;
    }

    public Queue<Integer> getSubjectBlacklist() {
        return this.subjectBlacklist;
    }

    public Counter getSuccessCounter() {
        return this.successCounter;
    }

    public Counter getErrorCounter() {
        return this.errorCounter;
    }

    public Gauge getSessionGauge() {
        return this.sessionGauge;
    }

    public Counter getRowFeatureGroupCounter() {
        return this.rowFeatureGroupCounter;
    }

    public Counter getRowFeatureGroupBytesCounter() {
        return this.rowFeatureGroupBytesCounter;
    }

    public Summary getFeatureGroupDeserTimeSummary() {
        return this.featureGroupDeserTimeSummary;
    }

    public Summary getFeatureGroupProcessingTimeSummary() {
        return this.featureGroupProcessingTimeSummary;
    }

    public Conversions.DecimalConversion getDecimalConversion() {
        return this.decimalConversion;
    }

    public TimeConversions.TimestampMicrosConversion getTimestampConversion() {
        return this.timestampConversion;
    }

    public TimeConversions.DateConversion getDateConversion() {
        return this.dateConversion;
    }
}
