package io.hops.examples.spark.kafka;

import io.hops.util.Hops;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.avro.package$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.Predef$;
import scala.Symbol;
import scala.collection.Seq$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.SymbolLiteral;

/* compiled from: StructuredStreamingKafka.scala */
/* loaded from: input_file:io/hops/examples/spark/kafka/StructuredStreamingKafka$.class */
public final class StructuredStreamingKafka$ {
    public static StructuredStreamingKafka$ MODULE$;

    static {
        new StructuredStreamingKafka$();
    }

    public void main(String[] strArr) {
        Logger logger = LogManager.getLogger(getClass());
        logger.setLevel(Level.INFO);
        logger.info("Starting Structured Streaming Kafka with Avro Job");
        if (strArr.length != 2) {
            logger.error("Program expects at two arguments, type<producer|consumer> and a topic name");
            System.exit(1);
        } else {
            String str = strArr[0];
            if (str != null ? !str.equals("producer") : "producer" != 0) {
                String str2 = strArr[0];
                if (str2 != null ? !str2.equals("consumer") : "consumer" != 0) {
                    logger.error("Valid types are <producer|consumer>");
                    System.exit(1);
                }
            }
        }
        SparkSession orCreate = SparkSession$.MODULE$.builder().appName(Hops.getJobName()).config(new SparkConf()).enableHiveSupport().getOrCreate();
        orCreate.sparkContext();
        String str3 = strArr[0];
        String str4 = strArr[1];
        String str5 = strArr[0];
        if (str5 != null ? !str5.equals("producer") : "producer" != 0) {
            orCreate.readStream().format("kafka").option("kafka.bootstrap.servers", Hops.getBrokerEndpoints()).option("subscribe", str4).option("startingOffsets", "earliest").option("kafka.security.protocol", "SSL").option("kafka.ssl.truststore.location", Hops.getTrustStore()).option("kafka.ssl.truststore.password", Hops.getKeystorePwd()).option("kafka.ssl.keystore.location", Hops.getKeyStore()).option("kafka.ssl.keystore.password", Hops.getKeystorePwd()).option("kafka.ssl.key.password", Hops.getKeystorePwd()).option("kafka.ssl.endpoint.identification.algorithm", "").load().select(Predef$.MODULE$.wrapRefArray(new Column[]{package$.MODULE$.from_avro(functions$.MODULE$.col("value"), Hops.getSchema(str4)).as((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "logs").dynamicInvoker().invoke() /* invoke-custom */)})).writeStream().format("parquet").option("path", new StringBuilder(29).append("/Projects/").append(Hops.getProjectName()).append("/Resources/").append(str4).append("-parquet").toString()).option("checkpointLocation", new StringBuilder(32).append("/Projects/").append(Hops.getProjectName()).append("/Resources/").append(str4).append("-checkpoint").toString()).start().awaitTermination();
        } else {
            Dataset createDataFrame = orCreate.createDataFrame(orCreate.sparkContext().parallelize(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS").format(new Date()), "INFO", "org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl", "Container container_e01_1494850115055_0016_01_000002 succeeded"})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS").format(new Date()), "DEBUG", "org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl", "Cannot create writer for app application_1494433225517_0008. Skip log upload this time."})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS").format(new Date()), "WARN", "org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl", "\"Sending out 2 container statuses: \"\n          + \"[ContainerStatus: [ContainerId: container_e01_1494850115055_0016_01_000001, State: RUNNING, \"\n          + \"Diagnostics: , ExitStatus: -1000, ], \"\n          + \"ContainerStatus: [ContainerId: container_e01_1494850115055_0016_01_000002, \"\n+ \"State: RUNNING, Diagnostics: , ExitStatus: -1000, ]]\""})), Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS").format(new Date()), "INFO", "org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl", "Container container_e01_1494850115055_0016_01_000002 succeeded"}))})), orCreate.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(Row.class)), new StructType(new StructField[]{new StructField("timestamp", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), new StructField("priority", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), new StructField("logger", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), new StructField("message", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4())}));
            createDataFrame.select(Predef$.MODULE$.wrapRefArray(new Column[]{package$.MODULE$.to_avro(functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(createDataFrame.columns())).map(str6 -> {
                return functions$.MODULE$.column(str6);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))))).alias("value")})).write().format("kafka").option("kafka.bootstrap.servers", Hops.getBrokerEndpoints()).option("kafka.security.protocol", "SSL").option("kafka.ssl.truststore.location", Hops.getTrustStore()).option("kafka.ssl.truststore.password", Hops.getKeystorePwd()).option("kafka.ssl.keystore.location", Hops.getKeyStore()).option("kafka.ssl.keystore.password", Hops.getKeystorePwd()).option("kafka.ssl.key.password", Hops.getKeystorePwd()).option("kafka.ssl.endpoint.identification.algorithm", "").option("topic", str4).save();
        }
        logger.info("Shutting down spark job");
        orCreate.close();
    }

    private StructuredStreamingKafka$() {
        MODULE$ = this;
    }
}
