package org.apache.spark.sql.kinesis;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.ByteBuffer;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Cast;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection$;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.kinesis.shaded.amazonaws.SDKGlobalConfiguration;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.producer.KinesisProducer;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.producer.UserRecordResult;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.util.Try$;

/* compiled from: KinesisWriteTask.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005c!\u0002\f\u0018\u0001]\t\u0003\u0002\u0003\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\t\u0011y\u0002!\u0011!Q\u0001\n}BQ\u0001\u0015\u0001\u0005\u0002EC\u0011B\u0016\u0001A\u0002\u0003\u0007I\u0011B,\t\u0013\u0011\u0004\u0001\u0019!a\u0001\n\u0013)\u0007\"C6\u0001\u0001\u0004\u0005\t\u0015)\u0003Y\u0011\u001da\u0007A1A\u0005\n5Da!\u001d\u0001!\u0002\u0013q\u0007b\u0002:\u0001\u0005\u0004%Ia\u001d\u0005\u0007i\u0002\u0001\u000b\u0011B\u001e\t\u000fU\u0004!\u0019!C\u0005m\"1!\u0010\u0001Q\u0001\n]D\u0011b\u001f\u0001A\u0002\u0003\u0007I\u0011\u0002?\t\u0017\u0005\u0005\u0001\u00011AA\u0002\u0013%\u00111\u0001\u0005\u000b\u0003\u000f\u0001\u0001\u0019!A!B\u0013i\bbBA\u0005\u0001\u0011\u0005\u00111\u0002\u0005\b\u0003?\u0001A\u0011AA\u0011\u0011\u001d\t9\u0004\u0001C\u0005\u0003sAq!a\u000f\u0001\t\u0003\tI\u0004C\u0004\u0002>\u0001!\t!!\u000f\t\r\u0005}\u0002\u0001\"\u0003n\u0005AY\u0015N\\3tSN<&/\u001b;f)\u0006\u001c8N\u0003\u0002\u00193\u000591.\u001b8fg&\u001c(B\u0001\u000e\u001c\u0003\r\u0019\u0018\u000f\u001c\u0006\u00039u\tQa\u001d9be.T!AH\u0010\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0001\u0013aA8sON\u0019\u0001A\t\u0015\u0011\u0005\r2S\"\u0001\u0013\u000b\u0003\u0015\nQa]2bY\u0006L!a\n\u0013\u0003\r\u0005s\u0017PU3g!\tIC&D\u0001+\u0015\tY3$\u0001\u0005j]R,'O\\1m\u0013\ti#FA\u0004M_\u001e<\u0017N\\4\u0002+A\u0014x\u000eZ;dKJ\u001cuN\u001c4jOV\u0014\u0018\r^5p]\u000e\u0001\u0001\u0003B\u00199wmr!A\r\u001c\u0011\u0005M\"S\"\u0001\u001b\u000b\u0005Uz\u0013A\u0002\u001fs_>$h(\u0003\u00028I\u00051\u0001K]3eK\u001aL!!\u000f\u001e\u0003\u00075\u000b\u0007O\u0003\u00028IA\u0011\u0011\u0007P\u0005\u0003{i\u0012aa\u0015;sS:<\u0017aC5oaV$8k\u00195f[\u0006\u00042\u0001Q#I\u001d\t\t5I\u0004\u00024\u0005&\tQ%\u0003\u0002EI\u00059\u0001/Y2lC\u001e,\u0017B\u0001$H\u0005\r\u0019V-\u001d\u0006\u0003\t\u0012\u0002\"!\u0013(\u000e\u0003)S!a\u0013'\u0002\u0017\u0015D\bO]3tg&|gn\u001d\u0006\u0003\u001bf\t\u0001bY1uC2L8\u000f^\u0005\u0003\u001f*\u0013\u0011\"\u0011;ue&\u0014W\u000f^3\u0002\rqJg.\u001b;?)\r\u0011F+\u0016\t\u0003'\u0002i\u0011a\u0006\u0005\u0006]\r\u0001\r\u0001\r\u0005\u0006}\r\u0001\raP\u0001\taJ|G-^2feV\t\u0001\f\u0005\u0002ZE6\t!L\u0003\u0002W7*\u0011\u0001\u0004\u0018\u0006\u0003;z\u000b\u0001b]3sm&\u001cWm\u001d\u0006\u0003?\u0002\f\u0011\"Y7bu>t\u0017m^:\u000b\u0003\u0005\f1aY8n\u0013\t\u0019'LA\bLS:,7/[:Qe>$WoY3s\u00031\u0001(o\u001c3vG\u0016\u0014x\fJ3r)\t1\u0017\u000e\u0005\u0002$O&\u0011\u0001\u000e\n\u0002\u0005+:LG\u000fC\u0004k\u000b\u0005\u0005\t\u0019\u0001-\u0002\u0007a$\u0013'A\u0005qe>$WoY3sA\u0005Q\u0001O]8kK\u000e$\u0018n\u001c8\u0016\u00039\u0004\"!S8\n\u0005AT%\u0001E+og\u00064W\r\u0015:pU\u0016\u001cG/[8o\u0003-\u0001(o\u001c6fGRLwN\u001c\u0011\u0002\u0015M$(/Z1n\u001d\u0006lW-F\u0001<\u0003-\u0019HO]3b[:\u000bW.\u001a\u0011\u0002%\u0019dWo\u001d5XC&$H+[7f\u001b&dGn]\u000b\u0002oB\u00111\u0005_\u0005\u0003s\u0012\u0012A\u0001T8oO\u0006\u0019b\r\\;tQ^\u000b\u0017\u000e\u001e+j[\u0016l\u0015\u000e\u001c7tA\u0005Ya-Y5mK\u0012<&/\u001b;f+\u0005i\bC\u0001!\u007f\u0013\tyxIA\u0005UQJ|w/\u00192mK\u0006ya-Y5mK\u0012<&/\u001b;f?\u0012*\u0017\u000fF\u0002g\u0003\u000bAqA\u001b\b\u0002\u0002\u0003\u0007Q0\u0001\u0007gC&dW\rZ,sSR,\u0007%A\u0004fq\u0016\u001cW\u000f^3\u0015\u0007\u0019\fi\u0001C\u0004\u0002\u0010A\u0001\r!!\u0005\u0002\u0011%$XM]1u_J\u0004R\u0001QA\n\u0003/I1!!\u0006H\u0005!IE/\u001a:bi>\u0014\b\u0003BA\r\u00037i\u0011\u0001T\u0005\u0004\u0003;a%aC%oi\u0016\u0014h.\u00197S_^\f\u0001b]3oI\u0012\u000bG/\u0019\u000b\u0006w\u0005\r\u0012q\u0005\u0005\u0007\u0003K\t\u0002\u0019A\u001e\u0002\u0019A\f'\u000f^5uS>t7*Z=\t\u000f\u0005%\u0012\u00031\u0001\u0002,\u0005!A-\u0019;b!\u0015\u0019\u0013QFA\u0019\u0013\r\ty\u0003\n\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004G\u0005M\u0012bAA\u001bI\t!!)\u001f;f\u0003]1G.^:i%\u0016\u001cwN\u001d3t\u0013\u001atUmY3tg\u0006\u0014\u0018\u0010F\u0001g\u00039\u0019\u0007.Z2l\r>\u0014XI\u001d:peN\fQa\u00197pg\u0016\f\u0001c\u0019:fCR,\u0007K]8kK\u000e$\u0018n\u001c8")
/* loaded from: input_file:org/apache/spark/sql/kinesis/KinesisWriteTask.class */
public class KinesisWriteTask implements Logging {
    private final Map<String, String> producerConfiguration;
    private final Seq<Attribute> inputSchema;
    private KinesisProducer producer;
    private final UnsafeProjection projection;
    private final String org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName;
    private final long flushWaitTimeMills;
    private Throwable org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private KinesisProducer producer() {
        return this.producer;
    }

    private void producer_$eq(KinesisProducer kinesisProducer) {
        this.producer = kinesisProducer;
    }

    private UnsafeProjection projection() {
        return this.projection;
    }

    public String org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName() {
        return this.org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName;
    }

    private long flushWaitTimeMills() {
        return this.flushWaitTimeMills;
    }

    public Throwable org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite() {
        return this.org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite;
    }

    public void org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite_$eq(Throwable th) {
        this.org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite = th;
    }

    public void execute(Iterator<InternalRow> iterator) {
        producer_$eq(CachedKinesisProducer$.MODULE$.getOrCreate(this.producerConfiguration));
        while (iterator.hasNext() && org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite() == null) {
            UnsafeRow apply = projection().apply((InternalRow) iterator.next());
            sendData(apply.getString(0), apply.getBinary(1));
        }
    }

    public String sendData(String str, byte[] bArr) {
        final ObjectRef create = ObjectRef.create(new String());
        Futures.addCallback(producer().addUserRecord(org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName(), str, ByteBuffer.wrap(bArr)), new FutureCallback<UserRecordResult>(this, create) { // from class: org.apache.spark.sql.kinesis.KinesisWriteTask$$anon$1
            private final /* synthetic */ KinesisWriteTask $outer;
            private final ObjectRef sentSeqNumbers$1;

            public void onFailure(Throwable th) {
                if (this.$outer.org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite() != null || th == null) {
                    return;
                }
                this.$outer.org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite_$eq(th);
                this.$outer.logError(() -> {
                    return new StringBuilder(27).append("Writing to  ").append(this.$outer.org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName()).append(" failed due to ").append(th.getCause()).toString();
                });
            }

            public void onSuccess(UserRecordResult userRecordResult) {
                userRecordResult.getShardId();
                this.sentSeqNumbers$1.elem = userRecordResult.getSequenceNumber();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.sentSeqNumbers$1 = create;
            }
        }, MoreExecutors.directExecutor());
        return (String) create.elem;
    }

    private void flushRecordsIfNecessary() {
        if (producer() != null) {
            while (producer().getOutstandingRecordsCount() > 0) {
                try {
                    producer().flush();
                    Thread.sleep(flushWaitTimeMills());
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    checkForErrors();
                    throw th;
                }
                checkForErrors();
            }
        }
    }

    public void checkForErrors() {
        if (org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite() != null) {
            throw org$apache$spark$sql$kinesis$KinesisWriteTask$$failedWrite();
        }
    }

    public void close() {
        checkForErrors();
        flushRecordsIfNecessary();
        checkForErrors();
        producer_$eq(null);
    }

    private UnsafeProjection createProjection() {
        Attribute attribute = (Attribute) this.inputSchema.find(attribute2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$createProjection$1(attribute2));
        }).getOrElse(() -> {
            throw new IllegalStateException(new StringBuilder(31).append("Required attribute ").append("'").append(KinesisWriter$.MODULE$.PARTITION_KEY_ATTRIBUTE_NAME()).append("' not found").toString());
        });
        DataType dataType = attribute.dataType();
        if (!(StringType$.MODULE$.equals(dataType) ? true : BinaryType$.MODULE$.equals(dataType))) {
            throw new IllegalStateException(new StringBuilder(46).append(KinesisWriter$.MODULE$.PARTITION_KEY_ATTRIBUTE_NAME()).append(" ").append("attribute type must be a String or BinaryType").toString());
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        Attribute attribute3 = (Attribute) this.inputSchema.find(attribute4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$createProjection$3(attribute4));
        }).getOrElse(() -> {
            throw new IllegalStateException(new StringBuilder(31).append("Required attribute ").append("'").append(KinesisWriter$.MODULE$.DATA_ATTRIBUTE_NAME()).append("' not found").toString());
        });
        DataType dataType2 = attribute3.dataType();
        if (!(StringType$.MODULE$.equals(dataType2) ? true : BinaryType$.MODULE$.equals(dataType2))) {
            throw new IllegalStateException(new StringBuilder(46).append(KinesisWriter$.MODULE$.DATA_ATTRIBUTE_NAME()).append(" ").append("attribute type must be a String or BinaryType").toString());
        }
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        return UnsafeProjection$.MODULE$.create(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Cast[]{new Cast(attribute, StringType$.MODULE$, Cast$.MODULE$.apply$default$3()), new Cast(attribute3, StringType$.MODULE$, Cast$.MODULE$.apply$default$3())})), this.inputSchema);
    }

    public static final /* synthetic */ boolean $anonfun$createProjection$1(Attribute attribute) {
        String name = attribute.name();
        String PARTITION_KEY_ATTRIBUTE_NAME = KinesisWriter$.MODULE$.PARTITION_KEY_ATTRIBUTE_NAME();
        return name != null ? name.equals(PARTITION_KEY_ATTRIBUTE_NAME) : PARTITION_KEY_ATTRIBUTE_NAME == null;
    }

    public static final /* synthetic */ boolean $anonfun$createProjection$3(Attribute attribute) {
        String name = attribute.name();
        String DATA_ATTRIBUTE_NAME = KinesisWriter$.MODULE$.DATA_ATTRIBUTE_NAME();
        return name != null ? name.equals(DATA_ATTRIBUTE_NAME) : DATA_ATTRIBUTE_NAME == null;
    }

    public KinesisWriteTask(Map<String, String> map, Seq<Attribute> seq) {
        this.producerConfiguration = map;
        this.inputSchema = seq;
        Logging.$init$(this);
        this.projection = createProjection();
        this.org$apache$spark$sql$kinesis$KinesisWriteTask$$streamName = (String) map.getOrElse(KinesisSourceProvider$.MODULE$.SINK_STREAM_NAME_KEY(), () -> {
            return SDKGlobalConfiguration.DEFAULT_AWS_CSM_CLIENT_ID;
        });
        this.flushWaitTimeMills = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
            return new StringOps(Predef$.MODULE$.augmentString((String) this.producerConfiguration.getOrElse(KinesisSourceProvider$.MODULE$.SINK_FLUSH_WAIT_TIME_MILLIS(), () -> {
                return KinesisSourceProvider$.MODULE$.DEFAULT_FLUSH_WAIT_TIME_MILLIS();
            }))).toLong();
        }).getOrElse(() -> {
            throw new IllegalArgumentException(new StringBuilder(29).append(KinesisSourceProvider$.MODULE$.SINK_FLUSH_WAIT_TIME_MILLIS()).append(" has to be a positive integer").toString());
        }));
    }
}
