package org.apache.spark.sql.kinesis;

import java.util.Locale;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.kinesis.shaded.amazonaws.SDKGlobalConfiguration;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.Record;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.NextIterator;
import org.apache.spark.util.SerializableConfiguration;
import scala.Array$;
import scala.MatchError;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.LazyRef;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/* compiled from: KinesisSourceRDD.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]d!\u0002\u000b\u0016\u0001Uy\u0002\u0002C\u001a\u0001\u0005\u0003\u0005\u000b\u0011B\u001b\t\u0011e\u0002!\u0011!Q\u0001\niB\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0019\"A\u0001\u000b\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005_\u0001\t\u0005\t\u0015!\u0003`\u0011!\u0011\u0007A!A!\u0002\u00139\u0005\u0002C2\u0001\u0005\u0003\u0005\u000b\u0011\u00023\t\u0011)\u0004!\u0011!Q\u0001\n\u001dC\u0001b\u001b\u0001\u0003\u0002\u0003\u0006I\u0001\u001c\u0005\u0006_\u0002!\t\u0001\u001d\u0005\u0006y\u0002!\t% \u0005\b\u0003\u001f\u0001A\u0011IA\t\u0011\u001d\ty\u0002\u0001C!\u0003C9!\"a\u000e\u0016\u0003\u0003E\t!FA\u001d\r%!R#!A\t\u0002U\tY\u0004\u0003\u0004p!\u0011\u0005\u0011\u0011\n\u0005\n\u0003\u0017\u0002\u0012\u0013!C\u0001\u0003\u001bB\u0011\"a\u0019\u0011\u0003\u0003%I!!\u001a\u0003!-Kg.Z:jgN{WO]2f%\u0012#%B\u0001\f\u0018\u0003\u001dY\u0017N\\3tSNT!\u0001G\r\u0002\u0007M\fHN\u0003\u0002\u001b7\u0005)1\u000f]1sW*\u0011A$H\u0001\u0007CB\f7\r[3\u000b\u0003y\t1a\u001c:h'\t\u0001\u0001\u0005E\u0002\"I\u0019j\u0011A\t\u0006\u0003Ge\t1A\u001d3e\u0013\t)#EA\u0002S\t\u0012\u0003\"aJ\u0019\u000e\u0003!R!!\u000b\u0016\u0002\u000b5|G-\u001a7\u000b\u0005YY#B\u0001\u0017.\u0003!\u0019XM\u001d<jG\u0016\u001c(B\u0001\u00180\u0003%\tW.\u0019>p]\u0006<8OC\u00011\u0003\r\u0019w.\\\u0005\u0003e!\u0012aAU3d_J$\u0017\u0001D:qCJ\\7i\u001c8uKb$8\u0001\u0001\t\u0003m]j\u0011!G\u0005\u0003qe\u0011Ab\u00159be.\u001cuN\u001c;fqR\fQb]8ve\u000e,w\n\u001d;j_:\u001c\b\u0003B\u001eE\u000f\u001es!\u0001\u0010\"\u0011\u0005u\u0002U\"\u0001 \u000b\u0005}\"\u0014A\u0002\u001fs_>$hHC\u0001B\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0005)\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u000b\u001a\u00131!T1q\u0015\t\u0019\u0005\t\u0005\u0002<\u0011&\u0011\u0011J\u0012\u0002\u0007'R\u0014\u0018N\\4\u0002\u0015M$(/Z1n\u001d\u0006lW-A\u0004cCR\u001c\u0007.\u00133\u0011\u00055sU\"\u0001!\n\u0005=\u0003%\u0001\u0002'p]\u001e\f!b\u001d5be\u0012LeNZ8t!\r\u0011vK\u0017\b\u0003'Vs!!\u0010+\n\u0003\u0005K!A\u0016!\u0002\u000fA\f7m[1hK&\u0011\u0001,\u0017\u0002\u0004'\u0016\f(B\u0001,A!\tYF,D\u0001\u0016\u0013\tiVCA\u0005TQ\u0006\u0014H-\u00138g_\u0006!2.\u001b8fg&\u001c8I]3egB\u0013xN^5eKJ\u0004\"a\u00171\n\u0005\u0005,\"aE*qCJ\\\u0017iV*De\u0016$WM\u001c;jC2\u001c\u0018aC3oIB|\u0017N\u001c;Ve2\fAaY8oMB\u0011Q\r[\u0007\u0002M*\u0011q-G\u0001\u0005kRLG.\u0003\u0002jM\nI2+\u001a:jC2L'0\u00192mK\u000e{gNZ5hkJ\fG/[8o\u00031iW\r^1eCR\f\u0007+\u0019;i\u000391\u0017-\u001b7P]\u0012\u000bG/\u0019'pgN\u0004\"!T7\n\u00059\u0004%a\u0002\"p_2,\u0017M\\\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0017E\u00148\u000f^;wobL(p\u001f\t\u00037\u0002AQaM\u0006A\u0002UBQ!O\u0006A\u0002iBQAS\u0006A\u0002\u001dCQaS\u0006A\u00021CQ\u0001U\u0006A\u0002ECQAX\u0006A\u0002}CQAY\u0006A\u0002\u001dCQaY\u0006A\u0002\u0011DQA[\u0006A\u0002\u001dCqa[\u0006\u0011\u0002\u0003\u0007A.A\u0004qKJ\u001c\u0018n\u001d;\u0015\u0005y|X\"\u0001\u0001\t\u000f\u0005\u0005A\u00021\u0001\u0002\u0004\u0005Aa.Z<MKZ,G\u000e\u0005\u0003\u0002\u0006\u0005-QBAA\u0004\u0015\r\tI!G\u0001\bgR|'/Y4f\u0013\u0011\ti!a\u0002\u0003\u0019M#xN]1hK2+g/\u001a7\u0002\u001b\u001d,G\u000fU1si&$\u0018n\u001c8t+\t\t\u0019\u0002E\u0003N\u0003+\tI\"C\u0002\u0002\u0018\u0001\u0013Q!\u0011:sCf\u00042ANA\u000e\u0013\r\ti\"\u0007\u0002\n!\u0006\u0014H/\u001b;j_:\fqaY8naV$X\r\u0006\u0004\u0002$\u0005%\u0012Q\u0006\t\u0005%\u0006\u0015b%C\u0002\u0002(e\u0013\u0001\"\u0013;fe\u0006$xN\u001d\u0005\b\u0003Wq\u0001\u0019AA\r\u0003\u001d!\b.\u001a)beRDq!a\f\u000f\u0001\u0004\t\t$A\u0004d_:$X\r\u001f;\u0011\u0007Y\n\u0019$C\u0002\u00026e\u00111\u0002V1tW\u000e{g\u000e^3yi\u0006\u00012*\u001b8fg&\u001c8k\\;sG\u0016\u0014F\t\u0012\t\u00037B\u0019R\u0001EA\u001f\u0003\u0007\u00022!TA \u0013\r\t\t\u0005\u0011\u0002\u0007\u0003:L(+\u001a4\u0011\u00075\u000b)%C\u0002\u0002H\u0001\u0013AbU3sS\u0006d\u0017N_1cY\u0016$\"!!\u000f\u00029\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00132aU\u0011\u0011q\n\u0016\u0004Y\u0006E3FAA*!\u0011\t)&a\u0018\u000e\u0005\u0005]#\u0002BA-\u00037\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005u\u0003)\u0001\u0006b]:|G/\u0019;j_:LA!!\u0019\u0002X\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002\u0017I,\u0017\r\u001a*fg>dg/\u001a\u000b\u0003\u0003O\u0002B!!\u001b\u0002t5\u0011\u00111\u000e\u0006\u0005\u0003[\ny'\u0001\u0003mC:<'BAA9\u0003\u0011Q\u0017M^1\n\t\u0005U\u00141\u000e\u0002\u0007\u001f\nTWm\u0019;")
/* loaded from: input_file:org/apache/spark/sql/kinesis/KinesisSourceRDD.class */
public class KinesisSourceRDD extends RDD<Record> {
    private final Map<String, String> sourceOptions;
    private final String streamName;
    private final long batchId;
    private final Seq<ShardInfo> shardInfos;
    private final SparkAWSCredentials kinesisCredsProvider;
    private final String endpointUrl;
    private final SerializableConfiguration conf;
    private final String metadataPath;
    public final boolean org$apache$spark$sql$kinesis$KinesisSourceRDD$$failOnDataLoss;

    /* renamed from: persist, reason: merged with bridge method [inline-methods] */
    public KinesisSourceRDD m13persist(StorageLevel storageLevel) {
        logError(() -> {
            return "Kinesis Record is not serializable. Use .map to extract fields before calling .persist or .window";
        });
        return (KinesisSourceRDD) super.persist(storageLevel);
    }

    public Partition[] getPartitions() {
        return (Partition[]) ((TraversableOnce) ((TraversableLike) this.shardInfos.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new KinesisSourceRDDPartition(tuple2._2$mcI$sp(), (ShardInfo) tuple2._1());
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Partition.class));
    }

    public Iterator<Record> compute(Partition partition, TaskContext taskContext) {
        LazyRef lazyRef = new LazyRef();
        final KinesisSourceRDDPartition kinesisSourceRDDPartition = (KinesisSourceRDDPartition) partition;
        String shardId = kinesisSourceRDDPartition.shardInfo().shardId();
        final KinesisReader kinesisReader = new KinesisReader(this.sourceOptions, this.streamName, this.kinesisCredsProvider, this.endpointUrl);
        final long j = new StringOps(Predef$.MODULE$.augmentString((String) this.sourceOptions.getOrElse("executor.maxFetchTimeInMs".toLowerCase(Locale.ROOT), () -> {
            return "1000";
        }))).toLong();
        final long j2 = new StringOps(Predef$.MODULE$.augmentString((String) this.sourceOptions.getOrElse("executor.maxFetchRecordsPerShard".toLowerCase(Locale.ROOT), () -> {
            return "100000";
        }))).toLong();
        final int i = new StringOps(Predef$.MODULE$.augmentString((String) this.sourceOptions.getOrElse("executor.maxRecordPerRead".toLowerCase(Locale.ROOT), () -> {
            return "10000";
        }))).toInt();
        final boolean z = new StringOps(Predef$.MODULE$.augmentString((String) this.sourceOptions.getOrElse("executor.addIdleTimeBetweenReads".toLowerCase(Locale.ROOT), () -> {
            return "false";
        }))).toBoolean();
        final long j3 = new StringOps(Predef$.MODULE$.augmentString((String) this.sourceOptions.getOrElse("executor.idleTimeBetweenReadsInMs".toLowerCase(Locale.ROOT), () -> {
            return "1000";
        }))).toLong();
        final long currentTimeMillis = System.currentTimeMillis();
        final LongRef create = LongRef.create(0L);
        final ObjectRef create2 = ObjectRef.create(SDKGlobalConfiguration.DEFAULT_AWS_CSM_CLIENT_ID);
        final LongRef create3 = LongRef.create(0L);
        final BooleanRef create4 = BooleanRef.create(false);
        NextIterator<Record> nextIterator = new NextIterator<Record>(this, kinesisReader, kinesisSourceRDDPartition, currentTimeMillis, j, z, create, j3, i, create4, create3, j2, create2) { // from class: org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1
            private String _shardIterator;
            private Record[] fetchedRecords;
            private int currentIndex;
            private boolean fetchNext;
            private final /* synthetic */ KinesisSourceRDD $outer;
            private final KinesisReader kinesisReader$1;
            private final KinesisSourceRDDPartition sourcePartition$1;
            private final long startTimestamp$1;
            private final long maxFetchTimeInMs$1;
            private final boolean enableIdleTimeBetweenReads$1;
            private final LongRef lastReadTimeMs$1;
            private final long idleTimeBetweenReads$1;
            private final int recordPerRequest$1;
            private final BooleanRef hasShardClosed$1;
            private final LongRef numRecordRead$1;
            private final long maxRecordsPerShard$1;
            private final ObjectRef lastReadSequenceNumber$1;

            public String _shardIterator() {
                return this._shardIterator;
            }

            public void _shardIterator_$eq(String str) {
                this._shardIterator = str;
            }

            public Record[] fetchedRecords() {
                return this.fetchedRecords;
            }

            public void fetchedRecords_$eq(Record[] recordArr) {
                this.fetchedRecords = recordArr;
            }

            public int currentIndex() {
                return this.currentIndex;
            }

            public void currentIndex_$eq(int i2) {
                this.currentIndex = i2;
            }

            public boolean fetchNext() {
                return this.fetchNext;
            }

            public void fetchNext_$eq(boolean z2) {
                this.fetchNext = z2;
            }

            public String getShardIterator() {
                if (_shardIterator() == null) {
                    _shardIterator_$eq(this.kinesisReader$1.getShardIterator(this.sourcePartition$1.shardInfo().shardId(), this.sourcePartition$1.shardInfo().iteratorType(), this.sourcePartition$1.shardInfo().iteratorPosition(), this.$outer.org$apache$spark$sql$kinesis$KinesisSourceRDD$$failOnDataLoss));
                    if (!this.$outer.org$apache$spark$sql$kinesis$KinesisSourceRDD$$failOnDataLoss && _shardIterator() == null) {
                        this.$outer.logWarning(() -> {
                            return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(356).append("\n                 | Some data may have been lost because ").append(this.sourcePartition$1.shardInfo().shardId()).append("\n                 | is not available in Kinesis any more. The shard has\n                 | we have processed all records in it. We would ignore th\n                 | processing. If you want your streaming query to\n                 |  set the source option \"failOnDataLoss\" to \"true\"\n                ").toString())).stripMargin();
                        });
                        return _shardIterator();
                    }
                }
                Predef$.MODULE$.assert(_shardIterator() != null);
                return _shardIterator();
            }

            public boolean canFetchMoreRecords(long j4) {
                return j4 - this.startTimestamp$1 < this.maxFetchTimeInMs$1;
            }

            public void addDelayInFetchingRecords(long j4) {
                if (!this.enableIdleTimeBetweenReads$1 || this.lastReadTimeMs$1.elem <= 0) {
                    return;
                }
                long j5 = this.idleTimeBetweenReads$1 - (j4 - this.lastReadTimeMs$1.elem);
                if (j5 > 0) {
                    this.$outer.logInfo(() -> {
                        return new StringBuilder(15).append("Sleeping for ").append(j5).append("ms").toString();
                    });
                    Thread.sleep(j5);
                }
            }

            /* renamed from: getNext, reason: merged with bridge method [inline-methods] */
            public Record m15getNext() {
                if (fetchedRecords().length == 0 || currentIndex() >= fetchedRecords().length) {
                    fetchedRecords_$eq((Record[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Record.class)));
                    currentIndex_$eq(0);
                    while (fetchedRecords().length == 0 && fetchNext()) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (!canFetchMoreRecords(currentTimeMillis2) || getShardIterator() == null) {
                            fetchNext_$eq(false);
                        } else {
                            addDelayInFetchingRecords(currentTimeMillis2);
                            GetRecordsResult kinesisRecords = this.kinesisReader$1.getKinesisRecords(_shardIterator(), this.recordPerRequest$1);
                            fetchedRecords_$eq((Record[]) ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(this.kinesisReader$1.deaggregateRecords(kinesisRecords.getRecords(), null)).asScala()).toArray(ClassTag$.MODULE$.apply(Record.class)));
                            _shardIterator_$eq(kinesisRecords.getNextShardIterator());
                            this.lastReadTimeMs$1.elem = System.currentTimeMillis();
                            this.$outer.logDebug(() -> {
                                return new StringBuilder(21).append("Milli secs behind is ").append(kinesisRecords.getMillisBehindLatest().longValue()).toString();
                            });
                            if (_shardIterator() == null) {
                                this.hasShardClosed$1.elem = true;
                                fetchNext_$eq(false);
                            }
                            if (kinesisRecords.getMillisBehindLatest().longValue() == 0) {
                                fetchNext_$eq(false);
                            }
                        }
                    }
                }
                if (fetchedRecords().length == 0) {
                    finished_$eq(true);
                    return null;
                }
                Record record = fetchedRecords()[currentIndex()];
                currentIndex_$eq(currentIndex() + 1);
                this.numRecordRead$1.elem++;
                if (this.numRecordRead$1.elem > this.maxRecordsPerShard$1) {
                    fetchNext_$eq(false);
                }
                this.lastReadSequenceNumber$1.elem = record.getSequenceNumber();
                return record;
            }

            public synchronized void close() {
                this.kinesisReader$1.close();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.kinesisReader$1 = kinesisReader;
                this.sourcePartition$1 = kinesisSourceRDDPartition;
                this.startTimestamp$1 = currentTimeMillis;
                this.maxFetchTimeInMs$1 = j;
                this.enableIdleTimeBetweenReads$1 = z;
                this.lastReadTimeMs$1 = create;
                this.idleTimeBetweenReads$1 = j3;
                this.recordPerRequest$1 = i;
                this.hasShardClosed$1 = create4;
                this.numRecordRead$1 = create3;
                this.maxRecordsPerShard$1 = j2;
                this.lastReadSequenceNumber$1 = create2;
                this._shardIterator = null;
                this.fetchedRecords = (Record[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Record.class));
                this.currentIndex = 0;
                this.fetchNext = true;
            }
        };
        taskContext.addTaskCompletionListener(taskContext2 -> {
            $anonfun$compute$10(this, create4, kinesisSourceRDDPartition, create2, shardId, lazyRef, taskContext2);
            return BoxedUnit.UNIT;
        });
        return nextIterator;
    }

    private final /* synthetic */ MetadataCommitter metadataCommitter$lzycompute$1(LazyRef lazyRef) {
        MetadataCommitter metadataCommitter;
        MetadataCommitter metadataCommitter2;
        synchronized (lazyRef) {
            if (lazyRef.initialized()) {
                metadataCommitter = (MetadataCommitter) lazyRef.value();
            } else {
                if (!"hdfs".equals(metaDataCommitterType$1().toLowerCase(Locale.ROOT))) {
                    throw new IllegalArgumentException("only HDFS is supported");
                }
                metadataCommitter = (MetadataCommitter) lazyRef.initialize(new HDFSMetadataCommitter(metaDataCommitterPath$1(), this.conf, this.sourceOptions, ClassTag$.MODULE$.apply(ShardInfo.class)));
            }
            metadataCommitter2 = metadataCommitter;
        }
        return metadataCommitter2;
    }

    private final MetadataCommitter metadataCommitter$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (MetadataCommitter) lazyRef.value() : metadataCommitter$lzycompute$1(lazyRef);
    }

    private final String metaDataCommitterType$1() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.committer", () -> {
            return "hdfs";
        })).toString();
    }

    private final String metaDataCommitterPath$1() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.path", () -> {
            return this.metadataPath;
        })).toString();
    }

    private final void updateMetadata$1(TaskContext taskContext, BooleanRef booleanRef, KinesisSourceRDDPartition kinesisSourceRDDPartition, ObjectRef objectRef, String str, LazyRef lazyRef) {
        ShardInfo shardInfo;
        if (booleanRef.elem) {
            shardInfo = new ShardInfo(kinesisSourceRDDPartition.shardInfo().shardId(), new ShardEnd());
        } else if (((String) objectRef.elem).isEmpty()) {
            logInfo(() -> {
                return "No Records were processed in this batch";
            });
            shardInfo = kinesisSourceRDDPartition.shardInfo();
        } else {
            shardInfo = new ShardInfo(kinesisSourceRDDPartition.shardInfo().shardId(), new AfterSequenceNumber((String) objectRef.elem));
        }
        logInfo(() -> {
            return new StringBuilder(43).append("Batch ").append(this.batchId).append(" : Committing End Shard position for ").append(str).toString();
        });
        metadataCommitter$1(lazyRef).add(this.batchId, str, shardInfo);
    }

    public static final /* synthetic */ void $anonfun$compute$10(KinesisSourceRDD kinesisSourceRDD, BooleanRef booleanRef, KinesisSourceRDDPartition kinesisSourceRDDPartition, ObjectRef objectRef, String str, LazyRef lazyRef, TaskContext taskContext) {
        kinesisSourceRDD.logInfo(() -> {
            return "Task Completed";
        });
        kinesisSourceRDD.updateMetadata$1(taskContext, booleanRef, kinesisSourceRDDPartition, objectRef, str, lazyRef);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KinesisSourceRDD(SparkContext sparkContext, Map<String, String> map, String str, long j, Seq<ShardInfo> seq, SparkAWSCredentials sparkAWSCredentials, String str2, SerializableConfiguration serializableConfiguration, String str3, boolean z) {
        super(sparkContext, Nil$.MODULE$, ClassTag$.MODULE$.apply(Record.class));
        this.sourceOptions = map;
        this.streamName = str;
        this.batchId = j;
        this.shardInfos = seq;
        this.kinesisCredsProvider = sparkAWSCredentials;
        this.endpointUrl = str2;
        this.conf = serializableConfiguration;
        this.metadataPath = str3;
        this.org$apache$spark$sql$kinesis$KinesisSourceRDD$$failOnDataLoss = z;
    }
}
