package se.sics.nstream.gcp;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import java.io.IOException;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.result.Result;
import se.sics.ktoolbox.util.trysf.Try;
import se.sics.nstream.hops.storage.gcp.GCPEndpoint;
import se.sics.nstream.hops.storage.gcp.GCPHelper;
import se.sics.nstream.hops.storage.gcp.GCPResource;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.DurableStorageProvider;
import se.sics.nstream.storage.durable.events.DStorageRead;
import se.sics.nstream.storage.durable.events.DStorageWrite;
import se.sics.nstream.storage.durable.util.StreamEndpoint;
import se.sics.nstream.storage.durable.util.StreamResource;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:se/sics/nstream/gcp/GCPComp.class */
public class GCPComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) GCPComp.class);
    private String logPrefix;
    private final Identifier self;
    private final String projectName;
    private final BlobId blobId;
    private final GoogleCredentials credentials;
    private final WriteChannel writer;
    private ReadChannel reader;
    Positive<Timer> timerPort = requires(Timer.class);
    private final Negative storagePort = provides(DStoragePort.class);
    private long writePos = 0;
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.gcp.GCPComp.1
        AnonymousClass1() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            GCPComp.LOG.info("{}starting", GCPComp.this.logPrefix);
        }
    };
    Handler handleRead = new Handler<DStorageRead.Request>() { // from class: se.sics.nstream.gcp.GCPComp.2
        AnonymousClass2() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Request request) {
            if (!GCPComp.this.reader.isOpen()) {
                GCPComp.this.reader = GCPHelper.readChannel(GCPComp.this.credentials, GCPComp.this.projectName, GCPComp.this.blobId);
            }
            GCPComp.LOG.debug("{}read:{}", GCPComp.this.logPrefix, request);
            int upperAbsEndpoint = (int) ((request.readRange.upperAbsEndpoint() - request.readRange.lowerAbsEndpoint()) + 1);
            int lowerAbsEndpoint = (int) request.readRange.lowerAbsEndpoint();
            GCPComp.LOG.debug("{}reading at pos:{} amount:{}", GCPComp.this.logPrefix, Integer.valueOf(lowerAbsEndpoint), Integer.valueOf(upperAbsEndpoint));
            try {
                GCPComp.this.answer(request, request.respond(Result.success(new Try.Success(true).flatMap(GCPHelper.readFromBlob(GCPComp.this.reader, lowerAbsEndpoint, upperAbsEndpoint)).checkedGet())));
            } catch (Throwable th) {
                GCPComp.this.answer(request, request.respond(Result.internalFailure((Exception) th)));
            }
        }
    };
    Handler handleReadComplete = new Handler<DStorageRead.Complete>() { // from class: se.sics.nstream.gcp.GCPComp.3
        AnonymousClass3() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Complete complete) {
            if (GCPComp.this.reader.isOpen()) {
                GCPComp.this.reader.close();
            }
        }
    };
    Handler handleWrite = new Handler<DStorageWrite.Request>() { // from class: se.sics.nstream.gcp.GCPComp.4
        AnonymousClass4() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Request request) {
            GCPComp.LOG.debug("{}write:{}", GCPComp.this.logPrefix, request);
            Try flatMap = GCPComp.this.skipExistingBytes(request).flatMap(GCPHelper.writeToBlob(GCPComp.this.writer));
            try {
                long j = GCPComp.this.writePos;
                GCPComp.access$1002(GCPComp.this, GCPComp.this.writePos + ((Integer) flatMap.checkedGet()).intValue());
                GCPComp.LOG.debug("{}write from:{} to:{}", GCPComp.this.logPrefix, Long.valueOf(j), Long.valueOf(GCPComp.this.writePos));
                GCPComp.this.answer(request, request.respond(Result.success(true)));
            } catch (Throwable th) {
                GCPComp.this.answer(request, request.respond(Result.internalFailure((Exception) th)));
            }
        }
    };
    Handler handleWriteComplete = new Handler<DStorageWrite.Complete>() { // from class: se.sics.nstream.gcp.GCPComp.5
        AnonymousClass5() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Complete complete) {
            if (GCPComp.this.writer.isOpen()) {
                try {
                    GCPComp.this.writer.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    };

    /* renamed from: se.sics.nstream.gcp.GCPComp$1 */
    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$1.class */
    class AnonymousClass1 extends Handler<Start> {
        AnonymousClass1() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            GCPComp.LOG.info("{}starting", GCPComp.this.logPrefix);
        }
    }

    /* renamed from: se.sics.nstream.gcp.GCPComp$2 */
    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$2.class */
    class AnonymousClass2 extends Handler<DStorageRead.Request> {
        AnonymousClass2() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Request request) {
            if (!GCPComp.this.reader.isOpen()) {
                GCPComp.this.reader = GCPHelper.readChannel(GCPComp.this.credentials, GCPComp.this.projectName, GCPComp.this.blobId);
            }
            GCPComp.LOG.debug("{}read:{}", GCPComp.this.logPrefix, request);
            int upperAbsEndpoint = (int) ((request.readRange.upperAbsEndpoint() - request.readRange.lowerAbsEndpoint()) + 1);
            int lowerAbsEndpoint = (int) request.readRange.lowerAbsEndpoint();
            GCPComp.LOG.debug("{}reading at pos:{} amount:{}", GCPComp.this.logPrefix, Integer.valueOf(lowerAbsEndpoint), Integer.valueOf(upperAbsEndpoint));
            try {
                GCPComp.this.answer(request, request.respond(Result.success(new Try.Success(true).flatMap(GCPHelper.readFromBlob(GCPComp.this.reader, lowerAbsEndpoint, upperAbsEndpoint)).checkedGet())));
            } catch (Throwable th) {
                GCPComp.this.answer(request, request.respond(Result.internalFailure((Exception) th)));
            }
        }
    }

    /* renamed from: se.sics.nstream.gcp.GCPComp$3 */
    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$3.class */
    class AnonymousClass3 extends Handler<DStorageRead.Complete> {
        AnonymousClass3() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Complete complete) {
            if (GCPComp.this.reader.isOpen()) {
                GCPComp.this.reader.close();
            }
        }
    }

    /* renamed from: se.sics.nstream.gcp.GCPComp$4 */
    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$4.class */
    class AnonymousClass4 extends Handler<DStorageWrite.Request> {
        AnonymousClass4() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Request request) {
            GCPComp.LOG.debug("{}write:{}", GCPComp.this.logPrefix, request);
            Try flatMap = GCPComp.this.skipExistingBytes(request).flatMap(GCPHelper.writeToBlob(GCPComp.this.writer));
            try {
                long j = GCPComp.this.writePos;
                GCPComp.access$1002(GCPComp.this, GCPComp.this.writePos + ((Integer) flatMap.checkedGet()).intValue());
                GCPComp.LOG.debug("{}write from:{} to:{}", GCPComp.this.logPrefix, Long.valueOf(j), Long.valueOf(GCPComp.this.writePos));
                GCPComp.this.answer(request, request.respond(Result.success(true)));
            } catch (Throwable th) {
                GCPComp.this.answer(request, request.respond(Result.internalFailure((Exception) th)));
            }
        }
    }

    /* renamed from: se.sics.nstream.gcp.GCPComp$5 */
    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$5.class */
    class AnonymousClass5 extends Handler<DStorageWrite.Complete> {
        AnonymousClass5() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Complete complete) {
            if (GCPComp.this.writer.isOpen()) {
                try {
                    GCPComp.this.writer.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$Init.class */
    public static class Init extends se.sics.kompics.Init<GCPComp> {
        public final Identifier self;
        public final String projectName;
        public final BlobId blobId;
        public final GoogleCredentials credentials;

        public Init(Identifier identifier, String str, BlobId blobId, GoogleCredentials googleCredentials) {
            this.self = identifier;
            this.projectName = str;
            this.blobId = blobId;
            this.credentials = googleCredentials;
        }
    }

    /* loaded from: input_file:se/sics/nstream/gcp/GCPComp$StorageProvider.class */
    public static class StorageProvider implements DurableStorageProvider<GCPComp> {
        public final Identifier self;
        public final GCPEndpoint endpoint;

        public StorageProvider(Identifier identifier, GCPEndpoint gCPEndpoint) {
            this.self = identifier;
            this.endpoint = gCPEndpoint;
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public Pair<Init, Long> initiate(StreamResource streamResource) {
            BlobId blobId = ((GCPResource) streamResource).getBlobId();
            checkCreateBlob(blobId);
            return Pair.with(new Init(this.self, this.endpoint.projectName, blobId, this.endpoint.credentials), 0L);
        }

        private Blob checkCreateBlob(BlobId blobId) {
            try {
                return (Blob) new Try.Success(GCPHelper.getStorage(this.endpoint.credentials, this.endpoint.projectName)).flatMap(GCPHelper.getBlob(blobId)).recoverWith(GCPHelper.rCreateBlob(blobId)).checkedGet();
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public String getName() {
            return this.endpoint.getEndpointName();
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public Class<GCPComp> getStorageDefinition() {
            return GCPComp.class;
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public StreamEndpoint getEndpoint() {
            return this.endpoint;
        }
    }

    public GCPComp(Init init) {
        this.logPrefix = "";
        this.self = init.self;
        this.projectName = init.projectName;
        this.blobId = init.blobId;
        this.credentials = init.credentials;
        this.writer = GCPHelper.writeChannel(this.credentials, this.projectName, this.blobId);
        this.reader = GCPHelper.readChannel(this.credentials, this.projectName, this.blobId);
        this.logPrefix = "<nid:" + this.self.toString() + ">gcp:" + this.projectName + "/" + init.blobId + " ";
        LOG.info("{}init", this.logPrefix);
        subscribe(this.handleStart, this.control);
        subscribe(this.handleRead, this.storagePort);
        subscribe(this.handleReadComplete, this.storagePort);
        subscribe(this.handleWrite, this.storagePort);
        subscribe(this.handleWriteComplete, this.storagePort);
    }

    @Override // se.sics.kompics.ComponentDefinition
    public void tearDown() {
        LOG.info("{}tearing down", this.logPrefix);
        if (this.reader.isOpen()) {
            this.reader.close();
        }
        if (this.writer.isOpen()) {
            try {
                this.writer.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public Try<byte[]> skipExistingBytes(DStorageWrite.Request request) {
        if (this.writePos >= request.pos + request.value.length) {
            LOG.debug("{}write with pos:{} skipped", this.logPrefix, Long.valueOf(request.pos));
            answer(request, request.respond(Result.success(true)));
            return new Try.Success(new byte[0]);
        }
        if (this.writePos <= request.pos) {
            return this.writePos == request.pos ? new Try.Success(request.value) : new Try.Failure(new IllegalArgumentException("GCPComp can only append - writePos:" + this.writePos + " - reqPos:" + request.pos));
        }
        long j = this.writePos;
        int i = (int) (j - request.pos);
        int length = request.value.length - i;
        byte[] bArr = new byte[length];
        System.arraycopy(request.value, i, bArr, 0, length);
        LOG.debug("{}convert write pos from:{} to:{} write amount from:{} to:{}", this.logPrefix, Long.valueOf(request.pos), Long.valueOf(j), Integer.valueOf(request.value.length), Integer.valueOf(length));
        return new Try.Success(bArr);
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: se.sics.nstream.gcp.GCPComp.access$1002(se.sics.nstream.gcp.GCPComp, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$1002(se.sics.nstream.gcp.GCPComp r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.writePos = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: se.sics.nstream.gcp.GCPComp.access$1002(se.sics.nstream.gcp.GCPComp, long):long");
    }

    static {
    }
}
