package ai.hopsworks.tutorials.flink.tiktok.pipelines;

import ai.hopsworks.tutorials.flink.tiktok.features.SourceInteractions;
import ai.hopsworks.tutorials.flink.tiktok.simulators.InteractionsGenerator;
import ai.hopsworks.tutorials.flink.tiktok.utils.TikTokInteractions;
import com.logicalclocks.hsfs.flink.FeatureStore;
import com.logicalclocks.hsfs.flink.HopsworksConnection;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.tukaani.xz.common.Util;
import software.amazon.awssdk.http.HttpStatusCode;

/* loaded from: input_file:ai/hopsworks/tutorials/flink/tiktok/pipelines/TikTokStreamFeatureAggregations.class */
public class TikTokStreamFeatureAggregations {
    public static final int CHECKPOINTING_INTERVAL_MS = 5000;
    private static final String JOB_NAME = "TikTok Streaming Pipeline";
    private FeatureStore featureStore = HopsworksConnection.builder().build().getFeatureStore();

    public void stream() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(HttpStatusCode.OK);
        interactionSlidingWindow(executionEnvironment, 60, 30, 1000000L, HttpStatusCode.OK);
        executionEnvironment.execute(JOB_NAME);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
    }

    private void interactionSlidingWindow(StreamExecutionEnvironment streamExecutionEnvironment, int i, int i2, Long l, int i3) throws Exception {
        StreamFeatureGroup streamFeatureGroup = this.featureStore.getStreamFeatureGroup("interactions", (Integer) 1);
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(30L)).withTimestampAssigner((tikTokInteractions, j) -> {
            return tikTokInteractions.getInteractionDate().longValue();
        });
        streamFeatureGroup.insertStream((DataStream<?>) streamExecutionEnvironment.fromSource(new DataGeneratorSource(new InteractionsGenerator(), Util.VLI_MAX, RateLimiterStrategy.perSecond(l.longValue()), TypeInformation.of(TikTokInteractions.class)), WatermarkStrategy.noWatermarks(), "Generator Source").setParallelism(i3).rescale().rebalance().keyBy((v0) -> {
            return v0.getUserId();
        }).map(tikTokInteractions2 -> {
            SourceInteractions sourceInteractions = new SourceInteractions();
            sourceInteractions.setInteractionId(tikTokInteractions2.getInteractionId());
            sourceInteractions.setUserId(tikTokInteractions2.getUserId());
            sourceInteractions.setVideoId(tikTokInteractions2.getVideoId());
            sourceInteractions.setCategoryId(tikTokInteractions2.getCategoryId());
            sourceInteractions.setInteractionType(tikTokInteractions2.getInteractionType());
            sourceInteractions.setInteractionDate(Long.valueOf(tikTokInteractions2.getInteractionDate().longValue() * 1000));
            sourceInteractions.setInteractionMonth(tikTokInteractions2.getInteractionMonth());
            sourceInteractions.setWatchTime(tikTokInteractions2.getWatchTime());
            return sourceInteractions;
        }));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 444798551:
                if (implMethodName.equals("lambda$interactionSlidingWindow$928ea9b3$1")) {
                    z = false;
                    break;
                }
                break;
            case 859984188:
                if (implMethodName.equals("getUserId")) {
                    z = 2;
                    break;
                }
                break;
            case 1947298260:
                if (implMethodName.equals("lambda$interactionSlidingWindow$dd705fce$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ai/hopsworks/tutorials/flink/tiktok/pipelines/TikTokStreamFeatureAggregations") && serializedLambda.getImplMethodSignature().equals("(Lai/hopsworks/tutorials/flink/tiktok/utils/TikTokInteractions;)Lai/hopsworks/tutorials/flink/tiktok/features/SourceInteractions;")) {
                    return tikTokInteractions2 -> {
                        SourceInteractions sourceInteractions = new SourceInteractions();
                        sourceInteractions.setInteractionId(tikTokInteractions2.getInteractionId());
                        sourceInteractions.setUserId(tikTokInteractions2.getUserId());
                        sourceInteractions.setVideoId(tikTokInteractions2.getVideoId());
                        sourceInteractions.setCategoryId(tikTokInteractions2.getCategoryId());
                        sourceInteractions.setInteractionType(tikTokInteractions2.getInteractionType());
                        sourceInteractions.setInteractionDate(Long.valueOf(tikTokInteractions2.getInteractionDate().longValue() * 1000));
                        sourceInteractions.setInteractionMonth(tikTokInteractions2.getInteractionMonth());
                        sourceInteractions.setWatchTime(tikTokInteractions2.getWatchTime());
                        return sourceInteractions;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("ai/hopsworks/tutorials/flink/tiktok/pipelines/TikTokStreamFeatureAggregations") && serializedLambda.getImplMethodSignature().equals("(Lai/hopsworks/tutorials/flink/tiktok/utils/TikTokInteractions;J)J")) {
                    return (tikTokInteractions, j) -> {
                        return tikTokInteractions.getInteractionDate().longValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ai/hopsworks/tutorials/flink/tiktok/utils/TikTokInteractions") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getUserId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
