package com.logicalclocks.onlinefs.handler;

import com.logicalclocks.onlinefs.DatabaseType;
import com.logicalclocks.onlinefs.hopsworks.HopsworksCacheKey;
import com.logicalclocks.onlinefs.rondb.SharedCommitHelper;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import com.mysql.clusterj.Session;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/handler/OnlineFsHandler.class */
public class OnlineFsHandler {
    private final BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(new byte[0], (BinaryDecoder) null);
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OnlineFsHandler.class);

    public List<Row> filterAndDeserialize(ConsumerRecords<String, ByteBuffer> consumerRecords, DatabaseType databaseType) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Iterator<ConsumerRecord<String, ByteBuffer>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, ByteBuffer> next = it.next();
            try {
                Row row = getRow(next);
                try {
                    row.setFeatureStoreDto(SharedCommitHelper.getInstance().getFeatureStoreMap().get(row.getProjectId()));
                    if (isBlackList(row)) {
                        setOffset(hashMap, row);
                    } else {
                        try {
                            SharedCommitHelper.HopsworksMetadata hopsworksMetadata = SharedCommitHelper.getInstance().getFeatureGroupMap().get(new HopsworksCacheKey(row).getKey());
                            row.setEmbedding(hopsworksMetadata.getEmbedding());
                            row.setFeatureGroupDto(hopsworksMetadata.getFeatureGroupDto());
                            row.setTableName(hopsworksMetadata.getSchema().getName());
                            try {
                            } catch (IOException e) {
                                LoggingUtils.log(LOGGER, Level.ERROR, "Error deserializing", e, new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, row.getFeatureStoreDto().getFeaturestoreId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.RECORD, next));
                            }
                            if (isDatabaseType(row, databaseType).booleanValue()) {
                                row.setGenericRecord(deserialize(hopsworksMetadata, next));
                                LoggingUtils.log(LOGGER, Level.DEBUG, "Deserialized row", new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, row.getFeatureStoreDto().getFeaturestoreId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.RECORD, next), new LogArgument(LogArgumentKey.DESERIALIZED_RECORD, row.getGenericRecord()), new LogArgument(LogArgumentKey.DESERIALIZATION_TIME, row.getDeserializationNanos()));
                                arrayList.add(row);
                            } else {
                                setOffset(hashMap, row);
                            }
                        } catch (ExecutionException e2) {
                            LoggingUtils.log(LOGGER, Level.ERROR, "Failed to get hopsworks metadata", e2, new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_STORE_ID, row.getFeatureStoreDto().getFeaturestoreId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.RECORD, next));
                            setOffset(hashMap, row);
                        }
                    }
                } catch (ExecutionException e3) {
                    LoggingUtils.log(LOGGER, Level.ERROR, "Failed to get feature store for project", e3, new LogArgument(LogArgumentKey.PROJECT_ID, row.getProjectId()), new LogArgument(LogArgumentKey.FEATURE_GROUP_ID, row.getFeatureGroupId()), new LogArgument(LogArgumentKey.SUBJECT_ID, row.getSubjectId()), new LogArgument(LogArgumentKey.RECORD, next));
                }
            } catch (Exception e4) {
                LoggingUtils.log(LOGGER, Level.ERROR, "Failed to create Row from ConsumerRecord", e4, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.RECORD, next), new LogArgument(LogArgumentKey.TOPIC, next.topic()));
            }
        }
        saveOffset(hashMap, databaseType);
        return arrayList;
    }

    private void setOffset(Map<String, Map<TopicPartition, Long>> map, Row row) {
        String databaseName = row.getDatabaseName();
        Map<TopicPartition, Long> orDefault = map.getOrDefault(databaseName, new HashMap());
        orDefault.put(new TopicPartition(row.getTopic(), row.getPartition()), Long.valueOf(row.getOffset() + 1));
        map.put(databaseName, orDefault);
    }

    private void saveOffset(Map<String, Map<TopicPartition, Long>> map, DatabaseType databaseType) {
        for (Map.Entry<String, Map<TopicPartition, Long>> entry : map.entrySet()) {
            String key = entry.getKey();
            ArrayList arrayList = new ArrayList();
            Session session = null;
            try {
                try {
                    session = SharedCommitHelper.getInstance().getSession(key);
                    for (Map.Entry<TopicPartition, Long> entry2 : entry.getValue().entrySet()) {
                        SharedCommitHelper.getInstance().saveOffset(session, entry2.getKey(), entry2.getValue(), databaseType.getConsumerGroupType(), arrayList);
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        SharedCommitHelper.getInstance().releaseSessionObject(session, it.next());
                    }
                    if (session != null) {
                        SharedCommitHelper.getInstance().closeSession(session, key);
                    }
                } catch (Exception e) {
                    LoggingUtils.log(LOGGER, Level.WARN, "Failed to save offsets", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.DATABASE_TYPE, databaseType), new LogArgument(LogArgumentKey.DATABASE_NAME, key));
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        SharedCommitHelper.getInstance().releaseSessionObject(session, it2.next());
                    }
                    if (session != null) {
                        SharedCommitHelper.getInstance().closeSession(session, key);
                    }
                }
            } catch (Throwable th) {
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    SharedCommitHelper.getInstance().releaseSessionObject(session, it3.next());
                }
                if (session != null) {
                    SharedCommitHelper.getInstance().closeSession(session, key);
                }
                throw th;
            }
        }
    }

    private Row getRow(ConsumerRecord<String, ByteBuffer> consumerRecord) {
        Row row = new Row(consumerRecord.serializedValueSize(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key());
        Headers headers = consumerRecord.headers();
        row.setProjectId(Integer.valueOf(getHeader(headers, "projectId")));
        row.setSubjectId(Integer.valueOf(getHeader(headers, "subjectId")));
        row.setFeatureGroupId(Integer.valueOf(getHeader(headers, "featureGroupId")));
        return row;
    }

    private Boolean isDatabaseType(Row row, DatabaseType databaseType) {
        if (row.getEmbedding() != null) {
            return Boolean.valueOf(databaseType == DatabaseType.VECTORDB);
        }
        return Boolean.valueOf(databaseType == DatabaseType.RONDB);
    }

    private boolean isBlackList(Row row) {
        return SharedCommitHelper.getInstance().getSubjectBlacklist().contains(row.getSubjectId());
    }

    private GenericRecord deserialize(SharedCommitHelper.HopsworksMetadata hopsworksMetadata, ConsumerRecord<String, ByteBuffer> consumerRecord) throws IOException {
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(consumerRecord.value().array(), this.binaryDecoder);
        return hopsworksMetadata.getDatumReader().read(new GenericData.Record(hopsworksMetadata.getSchema()), binaryDecoder);
    }

    private static String getHeader(Headers headers, String str) {
        Header lastHeader = headers.lastHeader(str);
        if (lastHeader != null) {
            return new String(lastHeader.value(), StandardCharsets.UTF_8);
        }
        return null;
    }
}
