package ai.hopsworks.tutorials.flink.tiktok.pipelines;

import ai.hopsworks.tutorials.flink.tiktok.features.SourceInteractions;
import ai.hopsworks.tutorials.flink.tiktok.simulators.InteractionsGenerator;
import ai.hopsworks.tutorials.flink.tiktok.utils.InteractionsEventKafkaSync;
import ai.hopsworks.tutorials.flink.tiktok.utils.TikTokInteractions;
import ai.hopsworks.tutorials.flink.tiktok.utils.Utils;
import com.logicalclocks.hsfs.util.Constants;
import java.lang.invoke.SerializedLambda;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.tukaani.xz.common.Util;

/* loaded from: input_file:ai/hopsworks/tutorials/flink/tiktok/pipelines/InteractionsEventsGenerator.class */
public class InteractionsEventsGenerator {
    Utils utils = new Utils();

    public void run(String str, Long l, Integer num) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(num.intValue());
        SingleOutputStreamOperator map = executionEnvironment.fromSource(new DataGeneratorSource(new InteractionsGenerator(), Util.VLI_MAX, RateLimiterStrategy.perSecond(l.longValue()), TypeInformation.of(TikTokInteractions.class)), WatermarkStrategy.noWatermarks(), "Generator Source").rescale().rebalance().keyBy((v0) -> {
            return v0.getUserId();
        }).map(new MapFunction<TikTokInteractions, SourceInteractions>() { // from class: ai.hopsworks.tutorials.flink.tiktok.pipelines.InteractionsEventsGenerator.1
            public SourceInteractions map(TikTokInteractions tikTokInteractions) throws Exception {
                SourceInteractions sourceInteractions = new SourceInteractions();
                sourceInteractions.setInteractionId(tikTokInteractions.getInteractionId());
                sourceInteractions.setUserId(tikTokInteractions.getUserId());
                sourceInteractions.setVideoId(tikTokInteractions.getVideoId());
                sourceInteractions.setCategoryId(tikTokInteractions.getCategoryId());
                sourceInteractions.setInteractionType(tikTokInteractions.getInteractionType());
                sourceInteractions.setInteractionDate(tikTokInteractions.getInteractionDate());
                sourceInteractions.setInteractionMonth(tikTokInteractions.getInteractionMonth());
                sourceInteractions.setWatchTime(tikTokInteractions.getWatchTime());
                return sourceInteractions;
            }
        });
        Properties kafkaProperties = this.utils.getKafkaProperties(str);
        map.sinkTo(KafkaSink.builder().setKafkaProducerConfig(kafkaProperties).setBootstrapServers(kafkaProperties.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS)).setRecordSerializer(new InteractionsEventKafkaSync(str)).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build());
        executionEnvironment.execute();
    }

    public static void main(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption(Option.builder("topicName").argName("topicName").required(true).hasArg().build());
        options.addOption(Option.builder("recordsPerSecond").argName("recordsPerSecond").required(true).hasArg().build());
        options.addOption(Option.builder("parallelism").argName("parallelism").required(true).hasArg().build());
        CommandLine parse = new DefaultParser().parse(options, strArr);
        new InteractionsEventsGenerator().run(parse.getOptionValue("topicName"), Long.valueOf(Long.parseLong(parse.getOptionValue("recordsPerSecond"))), Integer.valueOf(Integer.parseInt(parse.getOptionValue("parallelism"))));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 859984188:
                if (implMethodName.equals("getUserId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ai/hopsworks/tutorials/flink/tiktok/utils/TikTokInteractions") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getUserId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
