package com.logicalclocks.onlinefs.kafka;

import com.google.common.collect.Iterables;
import com.logicalclocks.onlinefs.DatabaseType;
import com.logicalclocks.onlinefs.hopsworks.api.ProjectApi;
import com.logicalclocks.onlinefs.hopsworks.dto.ProjectDto;
import com.logicalclocks.onlinefs.rondb.KafkaOffsets;
import com.logicalclocks.onlinefs.rondb.SharedCommitHelper;
import com.logicalclocks.onlinefs.util.LogArgument;
import com.logicalclocks.onlinefs.util.LogArgumentKey;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import com.logicalclocks.onlinefs.util.OnlineFeatureStoreException;
import com.mysql.clusterj.Query;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.query.QueryDomainType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/kafka/ManageOffsetsOnRebalance.class */
public class ManageOffsetsOnRebalance implements ConsumerRebalanceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ManageOffsetsOnRebalance.class);
    protected static final List<ProjectDto> projectDtoList = new ArrayList();
    private static final ReadWriteLock projectListLock = new ReentrantReadWriteLock();
    private final Consumer<?, ?> consumer;
    private final DatabaseType databaseType;

    public ManageOffsetsOnRebalance(Consumer<?, ?> consumer, DatabaseType databaseType) {
        this.consumer = consumer;
        this.databaseType = databaseType;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        try {
            projectListLock.writeLock().lock();
            projectDtoList.clear();
            projectListLock.writeLock().unlock();
        } catch (Throwable th) {
            projectListLock.writeLock().unlock();
            throw th;
        }
    }

    private void setProjectDtoList() {
        try {
            try {
                projectListLock.writeLock().lock();
                if (projectDtoList.isEmpty()) {
                    projectDtoList.addAll(ProjectApi.getProjects());
                }
                projectListLock.writeLock().unlock();
            } catch (OnlineFeatureStoreException | IOException e) {
                LoggingUtils.log(LOGGER, Level.ERROR, "Failed to get all projects", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()));
                projectListLock.writeLock().unlock();
            }
        } catch (Throwable th) {
            projectListLock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        setProjectDtoList();
        Collection<TopicPartition> arrayList = new ArrayList(collection);
        try {
            projectListLock.readLock().lock();
            Iterator<ProjectDto> it = projectDtoList.iterator();
            while (it.hasNext()) {
                String lowerCase = it.next().getName().toLowerCase();
                LoggingUtils.log(LOGGER, Level.DEBUG, "Getting kafka offsets", new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.DATABASE_NAME, lowerCase));
                Session session = null;
                try {
                    try {
                        session = SharedCommitHelper.getInstance().getSession(lowerCase);
                        arrayList = setOffset(session, arrayList);
                        if (session != null) {
                            SharedCommitHelper.getInstance().closeSession(session, lowerCase);
                        }
                    } catch (Throwable th) {
                        if (session != null) {
                            SharedCommitHelper.getInstance().closeSession(session, lowerCase);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    LoggingUtils.log(LOGGER, Level.ERROR, "Error getting kafka offset", e, new LogArgument(LogArgumentKey.THREAD_NAME, Thread.currentThread().getName()), new LogArgument(LogArgumentKey.DATABASE_NAME, lowerCase));
                    if (session != null) {
                        SharedCommitHelper.getInstance().closeSession(session, lowerCase);
                    }
                }
            }
            projectListLock.readLock().unlock();
        } catch (Throwable th2) {
            projectListLock.readLock().unlock();
            throw th2;
        }
    }

    private Collection<TopicPartition> setOffset(Session session, Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : collection) {
            KafkaOffsets offset = getOffset(session, topicPartition);
            if (offset == null) {
                LoggingUtils.log(LOGGER, Level.DEBUG, "No kafka offset was found", new LogArgument(LogArgumentKey.CONSUMER_GROUP_TYPE, getConsumerGroup()), new LogArgument(LogArgumentKey.TOPIC, topicPartition.topic()), new LogArgument(LogArgumentKey.PARTITION, Integer.valueOf(topicPartition.partition())));
                arrayList.add(topicPartition);
            } else {
                this.consumer.seek(topicPartition, offset.getOffset());
            }
        }
        return arrayList;
    }

    protected KafkaOffsets getOffset(Session session, TopicPartition topicPartition) {
        QueryDomainType createQueryDefinition = session.getQueryBuilder().createQueryDefinition(KafkaOffsets.class);
        createQueryDefinition.where(createQueryDefinition.get("topic").equal(createQueryDefinition.param("topic_param")).and(createQueryDefinition.get("partition").equal(createQueryDefinition.param("partition_param"))).and(createQueryDefinition.get("consumerGroup").equal(createQueryDefinition.param("consumer_group_param"))));
        Query createQuery = session.createQuery(createQueryDefinition);
        createQuery.setParameter("topic_param", topicPartition.topic());
        createQuery.setParameter("partition_param", Short.valueOf((short) topicPartition.partition()));
        createQuery.setParameter("consumer_group_param", getConsumerGroup());
        LoggingUtils.log(LOGGER, Level.DEBUG, "Getting offset", new LogArgument(LogArgumentKey.CONSUMER_GROUP_TYPE, getConsumerGroup()), new LogArgument(LogArgumentKey.TOPIC, topicPartition.topic()), new LogArgument(LogArgumentKey.PARTITION, Integer.valueOf(topicPartition.partition())));
        return (KafkaOffsets) Iterables.getFirst(createQuery.getResultList(), null);
    }

    String getConsumerGroup() {
        return this.databaseType.getConsumerGroupType();
    }
}
