package org.apache.uniffle.common.metrics.prometheus;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.AbstractMetricReporter;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.collect.Sets;
import org.apache.uniffle.shaded.io.prometheus.client.CollectorRegistry;
import org.apache.uniffle.shaded.io.prometheus.client.exporter.PushGateway;
import org.apache.uniffle.shaded.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.class */
public class PrometheusPushGatewayMetricReporter extends AbstractMetricReporter implements ReconfigurableRegistry.ReconfigureListener {
    private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayMetricReporter.class);
    static final String PUSHGATEWAY_ADDR = "rss.metrics.prometheus.pushgateway.addr";
    static final String GROUPING_KEY = "rss.metrics.prometheus.pushgateway.groupingkey";
    static final String JOB_NAME = "rss.metrics.prometheus.pushgateway.jobname";
    static final String REPORT_INTEVAL = "rss.metrics.prometheus.pushgateway.report.interval.seconds";
    private ScheduledExecutorService scheduledExecutorService;
    private PushGateway pushGateway;

    public PrometheusPushGatewayMetricReporter(RssConf rssConf, String str) {
        super(rssConf, str);
    }

    @Override // org.apache.uniffle.common.metrics.MetricReporter
    public void start() {
        startInternal();
        ReconfigurableRegistry.register(Sets.newHashSet(REPORT_INTEVAL), this);
    }

    private void startInternal() {
        if (this.pushGateway == null) {
            String string = this.conf.getString(PUSHGATEWAY_ADDR, (String) null);
            if (StringUtils.isEmpty(string)) {
                throw new RssException("rss.metrics.prometheus.pushgateway.addr should not be empty!");
            }
            this.pushGateway = new PushGateway(string);
        }
        String string2 = this.conf.getString(JOB_NAME, (String) null);
        if (StringUtils.isEmpty(string2)) {
            throw new RssException("rss.metrics.prometheus.pushgateway.jobname should not be empty!");
        }
        Map<String, String> parseGroupingKey = parseGroupingKey(this.conf.getString(GROUPING_KEY, ""));
        parseGroupingKey.put("instance", this.instanceId);
        int integer = this.conf.getInteger(REPORT_INTEVAL, 10);
        this.scheduledExecutorService = ThreadUtils.getDaemonSingleThreadScheduledExecutor("PrometheusPushGatewayMetricReporter");
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            Iterator<CollectorRegistry> it = this.registryList.iterator();
            while (it.hasNext()) {
                try {
                    this.pushGateway.pushAdd(it.next(), string2, (Map<String, String>) parseGroupingKey);
                } catch (Throwable th) {
                    LOG.error("Failed to send metrics to push gateway.", th);
                }
            }
        }, 0L, integer, TimeUnit.SECONDS);
    }

    @Override // org.apache.uniffle.common.metrics.MetricReporter
    public void stop() {
        stopInternal();
        ReconfigurableRegistry.unregister(this);
    }

    private void stopInternal() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    private void restart() {
        stopInternal();
        startInternal();
    }

    @VisibleForTesting
    void setPushGateway(PushGateway pushGateway) {
        this.pushGateway = pushGateway;
    }

    static Map<String, String> parseGroupingKey(String str) {
        HashMap hashMap = new HashMap();
        if (str.isEmpty()) {
            return hashMap;
        }
        for (String str2 : str.split(";")) {
            int indexOf = str2.indexOf(Constants.EQUAL_SPLIT_CHAR);
            if (indexOf < 0) {
                LOG.warn("Invalid prometheusPushGateway groupingKey:{}, will be ignored", str2);
            } else {
                String substring = str2.substring(0, indexOf);
                String substring2 = str2.substring(indexOf + 1);
                if (StringUtils.isEmpty(substring) || StringUtils.isEmpty(substring2)) {
                    LOG.warn("Invalid groupingKey {labelKey:{}, labelValue:{}} must not be empty", substring, substring2);
                } else {
                    hashMap.put(substring, substring2);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.uniffle.common.ReconfigurableRegistry.ReconfigureListener
    public void update(RssConf rssConf, Set<String> set) {
        if (set != null && set.contains(REPORT_INTEVAL)) {
            restart();
        }
    }
}
