package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.class */
public class JavaDirectKafkaStreamSuite implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.ssc = new JavaStreamingContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()), Durations.milliseconds(200L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        final AtomicReference atomicReference = new AtomicReference();
        String[] createTopicAndSendData = createTopicAndSendData("topic1");
        String[] createTopicAndSendData2 = createTopicAndSendData("topic2");
        HashSet hashSet = new HashSet();
        hashSet.addAll(Arrays.asList(createTopicAndSendData));
        hashSet.addAll(Arrays.asList(createTopicAndSendData2));
        Random random = new Random();
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaTestUtils.brokerAddress());
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        hashMap.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        JavaDStream map = KafkaUtils.createDirectStream(this.ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList("topic1"), hashMap)).transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<ConsumerRecord<String, String>>>() { // from class: org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.2
            public JavaRDD<ConsumerRecord<String, String>> call(JavaRDD<ConsumerRecord<String, String>> javaRDD) {
                OffsetRange[] offsetRanges = javaRDD.rdd().offsetRanges();
                atomicReference.set(offsetRanges);
                Assert.assertEquals("topic1", offsetRanges[0].topic());
                return javaRDD;
            }
        }).map(new Function<ConsumerRecord<String, String>, String>() { // from class: org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.1
            public String call(ConsumerRecord<String, String> consumerRecord) {
                return consumerRecord.value();
            }
        });
        HashMap hashMap2 = new HashMap(hashMap);
        hashMap2.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        JavaDStream union = map.union(KafkaUtils.createDirectStream(this.ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList("topic2"), hashMap2)).transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<ConsumerRecord<String, String>>>() { // from class: org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.4
            public JavaRDD<ConsumerRecord<String, String>> call(JavaRDD<ConsumerRecord<String, String>> javaRDD) {
                OffsetRange[] offsetRanges = javaRDD.rdd().offsetRanges();
                atomicReference.set(offsetRanges);
                Assert.assertEquals("topic2", offsetRanges[0].topic());
                return javaRDD;
            }
        }).map(new Function<ConsumerRecord<String, String>, String>() { // from class: org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.3
            public String call(ConsumerRecord<String, String> consumerRecord) {
                return consumerRecord.value();
            }
        }));
        final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        union.foreachRDD(new VoidFunction<JavaRDD<String>>() { // from class: org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.5
            public void call(JavaRDD<String> javaRDD) {
                synchronizedSet.addAll(javaRDD.collect());
            }
        });
        this.ssc.start();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        while (!z && System.currentTimeMillis() - currentTimeMillis < 20000) {
            z = hashSet.size() == synchronizedSet.size();
            Thread.sleep(50L);
        }
        Assert.assertEquals(hashSet, synchronizedSet);
        this.ssc.stop();
    }

    private String[] createTopicAndSendData(String str) {
        String[] strArr = {str + "-1", str + "-2", str + "-3"};
        this.kafkaTestUtils.createTopic(str);
        this.kafkaTestUtils.sendMessages(str, strArr);
        return strArr;
    }
}
