package se.sics.nstream.hops.kafka;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.LinkedList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentProxy;
import se.sics.ktoolbox.util.result.Result;
import se.sics.nstream.hops.kafka.avro.AvroMsgProducer;
import se.sics.nstream.hops.kafka.avro.AvroParser;
import se.sics.nstream.storage.durable.events.DStorageWrite;

/* loaded from: input_file:se/sics/nstream/hops/kafka/KafkaProducerMngr.class */
public class KafkaProducerMngr {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaComp.class);
    private final ComponentProxy proxy;
    private final AvroMsgProducer producer;
    private String logPrefix = "";
    public int producedMsgs = 0;
    private final List<DStorageWrite.Request> waitingOnLeftover = new LinkedList();
    private ByteBuf leftover = Unpooled.buffer();

    public KafkaProducerMngr(ComponentProxy componentProxy, KafkaEndpoint kafkaEndpoint, KafkaResource kafkaResource) {
        this.proxy = componentProxy;
        this.producer = kafkaEndpoint.getProducer(kafkaResource);
    }

    public void write(DStorageWrite.Request request) {
        Schema schema = this.producer.getSchema();
        this.leftover.writeBytes(request.value);
        getRidOfLeftovers(schema);
        parseWithLeftovers(schema, request);
    }

    private void getRidOfLeftovers(Schema schema) {
        GenericRecord blobToAvro;
        if (this.waitingOnLeftover.isEmpty() || (blobToAvro = AvroParser.blobToAvro(schema, this.leftover)) == null) {
            return;
        }
        this.producedMsgs++;
        this.producer.append(blobToAvro);
        for (DStorageWrite.Request request : this.waitingOnLeftover) {
            DStorageWrite.Response respond = request.respond(Result.success(true));
            LOG.trace("{}answering:{}", this.logPrefix, respond);
            this.proxy.answer(request, respond);
        }
        this.waitingOnLeftover.clear();
    }

    private void parseWithLeftovers(Schema schema, DStorageWrite.Request request) {
        while (true) {
            GenericRecord blobToAvro = AvroParser.blobToAvro(schema, this.leftover);
            if (blobToAvro == null) {
                break;
            }
            this.producedMsgs++;
            this.producer.append(blobToAvro);
        }
        int writerIndex = this.leftover.writerIndex() - this.leftover.readerIndex();
        if (writerIndex <= 0) {
            this.leftover = Unpooled.buffer();
            DStorageWrite.Response respond = request.respond(Result.success(true));
            LOG.trace("{}answering:{}", this.logPrefix, respond);
            this.proxy.answer(request, respond);
            return;
        }
        LOG.debug("{}leftover:{}", this.logPrefix, Integer.valueOf(writerIndex));
        byte[] bArr = new byte[writerIndex];
        this.leftover.readBytes(bArr);
        this.leftover = Unpooled.buffer();
        this.leftover.writeBytes(bArr);
        this.waitingOnLeftover.add(request);
    }

    public void start() {
    }

    public void close() {
    }
}
