package ai.hopsworks.tutorials.flink.tiktok.utils;

import ai.hopsworks.tutorials.flink.tiktok.features.SourceInteractions;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import javax.annotation.Nullable;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:ai/hopsworks/tutorials/flink/tiktok/utils/InteractionsEventKafkaSync.class */
public class InteractionsEventKafkaSync implements KafkaRecordSerializationSchema<SourceInteractions> {
    private static final int EVENT_TIME_LAG_WINDOW_SIZE = 10000;
    private transient DescriptiveStatisticsHistogram eventTimeLag;
    private final String topic;

    public InteractionsEventKafkaSync(String str) {
        this.topic = str;
    }

    public byte[] serializeValue(SourceInteractions sourceInteractions) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        new SpecificDatumWriter(SourceInteractions.class).write(sourceInteractions, binaryEncoder);
        binaryEncoder.flush();
        return byteArrayOutputStream.toByteArray();
    }

    public byte[] serializeKey(SourceInteractions sourceInteractions) {
        return String.valueOf(sourceInteractions.getUserId()).getBytes(StandardCharsets.UTF_8);
    }

    public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) throws Exception {
        super.open(initializationContext, kafkaSinkContext);
        this.eventTimeLag = initializationContext.getMetricGroup().histogram("interactionsEventKafkaSyncLag", new DescriptiveStatisticsHistogram(10000));
    }

    @Nullable
    public ProducerRecord<byte[], byte[]> serialize(SourceInteractions sourceInteractions, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
        byte[] serializeKey = serializeKey(sourceInteractions);
        byte[] serializeValue = serializeValue(sourceInteractions);
        this.eventTimeLag.update(Instant.now().toEpochMilli() - sourceInteractions.getInteractionDate().longValue());
        return new ProducerRecord<>(this.topic, (Integer) null, l, serializeKey, serializeValue);
    }
}
