package com.logicalclocks.onlinefs.kafka;

import com.google.common.collect.Sets;
import com.logicalclocks.onlinefs.util.LoggingUtils;
import io.prometheus.client.Gauge;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:com/logicalclocks/onlinefs/kafka/KafkaAdmin.class */
public class KafkaAdmin implements Runnable {
    private final AdminClient adminClient;
    private static KafkaAdmin instance;
    private final Gauge consumerGroupMemberCount = Gauge.build().name("onlinefs_members_in_consumer_group").labelNames("consumer_group").help("Total number of consumer group members.").register();
    private final Gauge consumerGroupState = Gauge.build().name("onlinefs_consumer_group_state").labelNames("consumer_group").help("Consumer group state.").register();
    private static final Object $LOCK = new Object[0];
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaAdmin.class);
    private static final Set<String> consumerGroups = Sets.newConcurrentHashSet();

    /* loaded from: input_file:com/logicalclocks/onlinefs/kafka/KafkaAdmin$ConsumerGroupStateEnum.class */
    private enum ConsumerGroupStateEnum {
        STABLE(0),
        DEAD(1),
        EMPTY(2),
        COMPLETING_REBALANCE(3),
        PREPARING_REBALANCE(4),
        UNKNOWN(5);

        private final int value;

        ConsumerGroupStateEnum(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }

        public static ConsumerGroupStateEnum fromString(String str) {
            for (ConsumerGroupStateEnum consumerGroupStateEnum : values()) {
                if (consumerGroupStateEnum.name().equalsIgnoreCase(str)) {
                    return consumerGroupStateEnum;
                }
            }
            throw new IllegalArgumentException("Invalid ConsumerGroupStateEnum: " + str);
        }
    }

    public static void createInstance(Properties properties) {
        synchronized ($LOCK) {
            consumerGroups.add(properties.getProperty("group.id"));
            if (instance == null) {
                instance = new KafkaAdmin(properties);
            }
        }
    }

    public KafkaAdmin(Properties properties) {
        this.adminClient = AdminClient.create(properties);
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this, 0L, 15L, TimeUnit.SECONDS);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> entry : this.adminClient.describeConsumerGroups(consumerGroups).describedGroups().entrySet()) {
                ConsumerGroupDescription consumerGroupDescription = entry.getValue().get(3000L, TimeUnit.MILLISECONDS);
                this.consumerGroupMemberCount.labels(entry.getKey()).set(consumerGroupDescription.members().size());
                this.consumerGroupState.labels(entry.getKey()).set(ConsumerGroupStateEnum.fromString(consumerGroupDescription.state().name()).getValue());
            }
        } catch (InterruptedException e) {
            LoggingUtils.log(LOGGER, Level.WARN, "Interrupted admin thread!", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            LoggingUtils.log(LOGGER, Level.ERROR, "Failed to describe consumer group", e2);
        }
    }

    public Gauge getConsumerGroupMemberCount() {
        return this.consumerGroupMemberCount;
    }

    public Gauge getConsumerGroupState() {
        return this.consumerGroupState;
    }
}
