package se.sics.nstream.hops.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentProxy;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.events.DStorageRead;
import se.sics.nstream.storage.durable.events.DStorageWrite;

/* loaded from: input_file:se/sics/nstream/hops/kafka/KafkaProxy.class */
public class KafkaProxy {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaComp.class);
    private final ComponentProxy proxy;
    Negative<DStoragePort> streamPort;
    private final KafkaEndpoint kafkaEndpoint;
    private final KafkaResource kafkaResource;
    private String logPrefix = "";
    private final Map<String, KafkaProducerMngr> producers = new HashMap();
    Handler handleReadRequest = new Handler<DStorageRead.Request>() { // from class: se.sics.nstream.hops.kafka.KafkaProxy.1
        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Request request) {
            throw new RuntimeException("Kafka does not support reads");
        }
    };
    Handler handleWriteRequest = new Handler<DStorageWrite.Request>() { // from class: se.sics.nstream.hops.kafka.KafkaProxy.2
        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Request request) {
            KafkaProxy.LOG.trace("{}received:{}", KafkaProxy.this.logPrefix, request);
            KafkaProducerMngr kafkaProducerMngr = (KafkaProducerMngr) KafkaProxy.this.producers.get(KafkaProxy.this.kafkaResource.topicName);
            if (kafkaProducerMngr == null) {
                kafkaProducerMngr = new KafkaProducerMngr(KafkaProxy.this.proxy, KafkaProxy.this.kafkaEndpoint, KafkaProxy.this.kafkaResource);
                kafkaProducerMngr.start();
                KafkaProxy.this.producers.put(KafkaProxy.this.kafkaResource.topicName, kafkaProducerMngr);
            }
            kafkaProducerMngr.write(request);
            KafkaProxy.LOG.debug("{}produced:{}", KafkaProxy.this.logPrefix, Integer.valueOf(kafkaProducerMngr.producedMsgs));
        }
    };

    public KafkaProxy(ComponentProxy componentProxy, KafkaEndpoint kafkaEndpoint, KafkaResource kafkaResource) {
        LOG.info("{}init", this.logPrefix);
        this.proxy = componentProxy;
        this.kafkaEndpoint = kafkaEndpoint;
        this.kafkaResource = kafkaResource;
        this.streamPort = componentProxy.getPositive(DStoragePort.class).getPair();
        componentProxy.subscribe(this.handleReadRequest, this.streamPort);
        componentProxy.subscribe(this.handleWriteRequest, this.streamPort);
    }

    public void start() {
    }

    public void close() {
        Iterator<KafkaProducerMngr> it = this.producers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.producers.clear();
    }
}
