package com.logicalclocks.onlinefs.rondb;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.logicalclocks.onlinefs.DatabaseType;
import com.logicalclocks.onlinefs.conf.OnlineFSConf;
import com.logicalclocks.onlinefs.handler.Embedding;
import com.logicalclocks.onlinefs.handler.Row;
import com.logicalclocks.onlinefs.notification.NotificationManager;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import com.mysql.clusterj.ClusterJDatastoreException;
import com.mysql.clusterj.DynamicObject;
import com.mysql.clusterj.Session;
import io.hops.hopsworks.vectordb.Index;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/rondb/Committer.class */
public class Committer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Committer.class);
    private static final String HIDDEN_PK_COLUMN_NAME = "$PK";
    private final Integer maxBatchSize;
    private final Boolean isVectorDb;
    private final String consumerGroupType;

    public Committer(Configuration configuration, DatabaseType databaseType) {
        this.maxBatchSize = Integer.valueOf(configuration.getInt(OnlineFSConf.RONDB_BATCH_SIZE, OnlineFSConf.RONDB_BATCH_SIZE_DEFAULT.intValue()));
        this.isVectorDb = Boolean.valueOf(databaseType.equals(DatabaseType.VECTORDB));
        this.consumerGroupType = databaseType.getConsumerGroupType();
    }

    public void commitRows(List<Row> list) {
        ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getSubjectId();
        }))).forEach((num, list2) -> {
            try {
                LoggingUtils.log(LOGGER, Level.DEBUG, "Committing rows", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.SUBJECT_ID, num), new LogArgument(LogArgumentKey.COUNT, Integer.valueOf(list2.size())));
                if (this.isVectorDb.booleanValue()) {
                    commitFeatureGroupRowsVectorDb(list2);
                } else {
                    commitFeatureGroupRowsClusterj(list2);
                }
            } catch (Exception e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "Error committing", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.COUNT, Integer.valueOf(list2.size())));
            }
        });
    }

    protected void commitFeatureGroupRowsVectorDb(List<Row> list) throws Exception {
        Row row = list.get(0);
        String databaseName = row.getDatabaseName();
        int i = 0;
        Session session = null;
        while (i < 3) {
            try {
                session = SharedCommitHelper.getInstance().getSession(databaseName);
                commitFeatureGroupRows(session, list, null);
                LoggingUtils.log(LOGGER, Level.DEBUG, "Committed rows to vector db", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.SUBJECT_ID, list.get(0)), new LogArgument(LogArgumentKey.COUNT, Integer.valueOf(list.size())));
                if (session != null) {
                    SharedCommitHelper.getInstance().closeSession(session, databaseName);
                    return;
                }
                return;
            } catch (InterruptedException e) {
                try {
                    LoggingUtils.log(LOGGER, Level.WARN, "Interrupted commit of feature group rows!", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.ATTEMPT, Integer.valueOf(i)), new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.DATABASE_NAME, databaseName));
                    Thread.currentThread().interrupt();
                    Thread.sleep(getWaitTimeExp(i));
                    i++;
                    if (session != null) {
                        SharedCommitHelper.getInstance().closeSession(session, databaseName);
                    }
                } catch (Throwable th) {
                    if (session != null) {
                        SharedCommitHelper.getInstance().closeSession(session, databaseName);
                    }
                    throw th;
                }
            }
        }
    }

    protected void commitFeatureGroupRowsClusterj(List<Row> list) throws Exception {
        Row row = list.get(0);
        int i = 0;
        String databaseName = row.getDatabaseName();
        String tableName = row.getTableName();
        Class<?> rowClass = SharedCommitHelper.getInstance().getRowClass(databaseName, tableName);
        Exception exc = null;
        while (i < 3) {
            Session session = null;
            try {
                session = SharedCommitHelper.getInstance().getSession(databaseName);
                commitFeatureGroupRows(session, list, rowClass);
                SharedCommitHelper.getInstance().closeSession(session, databaseName);
                return;
            } catch (InterruptedException e) {
                LoggingUtils.log(LOGGER, Level.WARN, "Interrupted commit of feature group rows!", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.ATTEMPT, Integer.valueOf(i)), new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.DATABASE_NAME, databaseName), new LogArgument(LogArgumentKey.TABLE_NAME, tableName));
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LoggingUtils.log(LOGGER, Level.WARN, "Exception committing rows- Reset schema, dynamic object and session", e2, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.ATTEMPT, Integer.valueOf(i)), new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.DATABASE_NAME, databaseName), new LogArgument(LogArgumentKey.TABLE_NAME, tableName));
                if (session != null) {
                    if (rowClass != null && (e2 instanceof ClusterJDatastoreException)) {
                        session.unloadSchema(rowClass);
                    }
                    SharedCommitHelper.getInstance().closeSession(session, databaseName);
                }
                if ((e2 instanceof ClusterJDatastoreException) && e2.getMessage().contains("No such table existed")) {
                    SharedCommitHelper.getInstance().getSubjectBlacklist().add(row.getSubjectId());
                    throw e2;
                }
                exc = e2;
                Thread.sleep(getWaitTimeExp(i));
                i++;
            }
        }
        throw exc;
    }

    protected void commitFeatureGroupRows(Session session, List<Row> list, Class<?> cls) throws Exception {
        for (List<Row> list2 : Iterables.partition((Iterable) list.stream().sorted(Comparator.comparingLong((v0) -> {
            return v0.getOffset();
        })).collect(Collectors.toList()), this.maxBatchSize.intValue())) {
            if (session.isClosed()) {
                LoggingUtils.log(LOGGER, Level.INFO, "The session is closed, no point in continuing hammering it");
                throw new NullPointerException("Session is already closed");
            }
            ArrayList arrayList = new ArrayList(list2.size());
            try {
                session.currentTransaction().begin();
                HashMap hashMap = new HashMap();
                ArrayList newArrayList = Lists.newArrayList();
                HashMap newHashMap = Maps.newHashMap();
                for (Row row : list2) {
                    hashMap.put(new TopicPartition(row.getTopic(), row.getPartition()), Long.valueOf(row.getOffset() + 1));
                    if (this.isVectorDb.booleanValue()) {
                        Map<String, Object> rowVectorDb = getRowVectorDb(row);
                        String docId = getDocId(row.getFeatureGroupDto().getPrimaryKeyWithPrefix(), rowVectorDb, row.getFeatureGroupDto().getBytesFieldsWithPrefix(), row.getFeatureGroupId());
                        if (docId != null) {
                            newHashMap.put(docId, rowVectorDb);
                        } else {
                            newArrayList.add(rowVectorDb);
                        }
                    } else {
                        LoggingUtils.log(LOGGER, Level.DEBUG, "Committing row", new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.TABLE_NAME, cls.getName()), new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
                        commitRowClusterj(session, arrayList, row, cls);
                    }
                }
                if (this.isVectorDb.booleanValue() && list.size() > 0) {
                    if (newArrayList.size() > 0) {
                        SharedCommitHelper.getInstance().getVectorDatabase().batchWriteMap(new Index(list.get(0).getEmbedding().getIndexName()), newArrayList);
                    }
                    if (newHashMap.size() > 0) {
                        SharedCommitHelper.getInstance().getVectorDatabase().batchWriteMap(new Index(list.get(0).getEmbedding().getIndexName()), newHashMap);
                    }
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    SharedCommitHelper.getInstance().saveOffset(session, (TopicPartition) entry.getKey(), (Long) entry.getValue(), this.consumerGroupType, arrayList);
                }
                commitAndReport(session, list2);
                Iterator<Object> it = arrayList.iterator();
                while (it.hasNext()) {
                    SharedCommitHelper.getInstance().releaseSessionObject(session, it.next());
                }
                if (session.currentTransaction().isActive()) {
                    session.currentTransaction().rollback();
                }
            } catch (Throwable th) {
                Iterator<Object> it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    SharedCommitHelper.getInstance().releaseSessionObject(session, it2.next());
                }
                if (session.currentTransaction().isActive()) {
                    session.currentTransaction().rollback();
                }
                throw th;
            }
        }
    }

    protected String getDocId(List<String> list, Map<String, Object> map, Set<String> set, Integer num) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        ArrayList newArrayList = Lists.newArrayList(num.toString());
        for (String str : list) {
            Object obj = map.get(str);
            if (obj == null) {
                throw new IllegalStateException(String.format("Primary key '%s' not found from available set [%s].", str, Joiner.on(", ").join(list)));
            }
            if (set == null || !set.contains(str)) {
                newArrayList.add(obj.toString());
            } else {
                newArrayList.add(Base64.getEncoder().encodeToString((byte[]) obj));
            }
        }
        return Joiner.on("|").join(newArrayList);
    }

    protected void commitAndReport(Session session, List<Row> list) {
        String num = list.get(0).getFeatureGroupId().toString();
        try {
            session.currentTransaction().commit();
            SharedCommitHelper.getInstance().recordProcessingTime(list, Instant.now());
            SharedCommitHelper.getInstance().getSuccessCounter().labels(num).inc(list.size());
            NotificationManager.getInstance().sendNotifications(list);
        } catch (RuntimeException e) {
            SharedCommitHelper.getInstance().getErrorCounter().labels(num).inc(list.size());
            throw e;
        }
    }

    protected void commitRowClusterj(Session session, List<Object> list, Row row, Class<?> cls) throws Exception {
        DynamicObject dynamicObject = (DynamicObject) session.newInstance(cls);
        list.add(dynamicObject);
        boolean z = false;
        for (int i = 0; i < dynamicObject.columnMetadata().length; i++) {
            try {
                String name = dynamicObject.columnMetadata()[i].name();
                if (name.equals(HIDDEN_PK_COLUMN_NAME)) {
                    z = true;
                } else {
                    dynamicObject.set(i, getAndCastValue(row, name));
                }
            } catch (Exception e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "Could not commit the row for feature group", new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.TABLE_NAME, cls.getName()), new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
                throw e;
            }
        }
        if (z) {
            session.makePersistent(dynamicObject);
        } else {
            session.savePersistent(dynamicObject);
        }
    }

    protected Map<String, Object> getRowVectorDb(Row row) throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        try {
            Iterator<Schema.Field> it = row.getGenericRecord().getSchema().getFields().iterator();
            while (it.hasNext()) {
                String name = it.next().name();
                Embedding.EmbeddingFeature orDefault = row.getEmbedding().getFeatures().getOrDefault(name, null);
                String colPrefix = row.getEmbedding().getColPrefix();
                String str = colPrefix != null ? colPrefix + name : name;
                if (orDefault != null) {
                    newHashMap.put(str, deserializeEmbedding(row, orDefault));
                } else {
                    newHashMap.put(str, getAndCastValue(row, name));
                }
            }
            return newHashMap;
        } catch (Exception e) {
            LoggingUtils.log(LOGGER, Level.ERROR, "Failed to index to opensearch", new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
            throw e;
        }
    }

    protected Object getAndCastValue(Row row, String str) {
        Object obj;
        GenericRecord genericRecord = row.getGenericRecord();
        if (!genericRecord.hasField(str) || (obj = genericRecord.get(str)) == null) {
            return null;
        }
        Schema schema = genericRecord.getSchema().getField(str).schema().getTypes().get(1);
        switch (schema.getType()) {
            case STRING:
                return obj.toString();
            case BOOLEAN:
                return BooleanUtils.toIntegerObject((Boolean) obj);
            case BYTES:
                return schema.getLogicalType() instanceof LogicalTypes.Decimal ? SharedCommitHelper.getInstance().getDecimalConversion().fromBytes((ByteBuffer) obj, schema, schema.getLogicalType()) : ((ByteBuffer) obj).array();
            case LONG:
                return schema.getLogicalType() instanceof LogicalTypes.TimestampMicros ? Timestamp.from(SharedCommitHelper.getInstance().getTimestampConversion().fromLong((Long) obj, schema, schema.getLogicalType())) : obj;
            case INT:
                return schema.getLogicalType() instanceof LogicalTypes.Date ? Date.valueOf(SharedCommitHelper.getInstance().getDateConversion().fromInt((Integer) obj, schema, schema.getLogicalType())) : obj;
            case DOUBLE:
                if (Double.isNaN(((Double) obj).doubleValue())) {
                    return null;
                }
                return obj;
            case FLOAT:
                if (Float.isNaN(((Float) obj).floatValue())) {
                    return null;
                }
                return obj;
            default:
                return obj;
        }
    }

    protected List<Object> deserializeEmbedding(Row row, Embedding.EmbeddingFeature embeddingFeature) throws IOException {
        Object obj;
        GenericRecord genericRecord = row.getGenericRecord();
        if (genericRecord.hasField(embeddingFeature.getName()) && (obj = genericRecord.get(embeddingFeature.getName())) != null) {
            return (List) new GenericDatumReader(embeddingFeature.getSchema()).read(null, DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(((ByteBuffer) obj).array()), (BinaryDecoder) null));
        }
        return null;
    }

    private long getWaitTimeExp(int i) {
        long pow = ((long) Math.pow(2.0d, i)) * 100;
        LoggingUtils.log(LOGGER, Level.DEBUG, "Sleeping", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.WAIT_TIME, Long.valueOf(pow)));
        return pow;
    }
}
