package se.sics.nstream.hops.hdfs;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.Component;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.timer.CancelTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.network.ports.One2NChannel;
import se.sics.ktoolbox.util.result.Result;
import se.sics.ktoolbox.util.trysf.Try;
import se.sics.ktoolbox.util.trysf.TryHelper;
import se.sics.nstream.hops.storage.hdfs.HDFSEndpoint;
import se.sics.nstream.hops.storage.hdfs.HDFSHelper;
import se.sics.nstream.hops.storage.hdfs.HDFSResource;
import se.sics.nstream.storage.durable.DStoragePort;
import se.sics.nstream.storage.durable.DurableStorageProvider;
import se.sics.nstream.storage.durable.events.DStorageRead;
import se.sics.nstream.storage.durable.events.DStorageWrite;
import se.sics.nstream.storage.durable.util.StreamEndpoint;
import se.sics.nstream.storage.durable.util.StreamResource;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp.class */
public class HDFSComp extends ComponentDefinition {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HDFSComp.class);
    One2NChannel resourceChannel;
    private final HDFSEndpoint hdfsEndpoint;
    private final HDFSResource hdfsResource;
    private final DistributedFileSystem dfs;
    private final UserGroupInformation ugi;
    private long writePos;
    private UUID flushTimer;
    private int forceFlushCounter;
    private String logPrefix = "";
    Positive<Timer> timerPort = requires(Timer.class);
    Negative<DStoragePort> resourcePort = provides(DStoragePort.class);
    private Map<Identifier, Component> components = new HashMap();
    private final TreeMap<Long, DStorageWrite.Request> pending = new TreeMap<>();
    private boolean progressed = false;
    Handler handleStart = new Handler<Start>() { // from class: se.sics.nstream.hops.hdfs.HDFSComp.1
        AnonymousClass1() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            HDFSComp.LOG.info("{}starting", HDFSComp.this.logPrefix);
            HDFSComp.this.schedulePeriodicCheck();
        }
    };
    Handler handleReadRequest = new Handler<DStorageRead.Request>() { // from class: se.sics.nstream.hops.hdfs.HDFSComp.2
        AnonymousClass2() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Request request) {
            HDFSComp.LOG.trace("{}received:{}", HDFSComp.this.logPrefix, request);
            DStorageRead.Response respond = request.respond(HDFSComp.this.convert(HDFSHelper.read(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource, request.readRange)));
            HDFSComp.LOG.trace("{}answering:{}", HDFSComp.this.logPrefix, respond);
            HDFSComp.this.answer(request, respond);
        }
    };
    Handler handleWriteRequest = new Handler<DStorageWrite.Request>() { // from class: se.sics.nstream.hops.hdfs.HDFSComp.3
        AnonymousClass3() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Request request) {
            byte[] bArr;
            HDFSComp.LOG.info("{}write:{}", HDFSComp.this.logPrefix, request);
            if (HDFSComp.this.writePos >= request.pos + request.value.length) {
                HDFSComp.LOG.info("{}write with pos:{} skipped", HDFSComp.this.logPrefix, Long.valueOf(request.pos));
                HDFSComp.this.answer(request, request.respond(Result.success(true)));
                return;
            }
            long j = request.pos;
            byte[] bArr2 = request.value;
            if (HDFSComp.this.writePos > request.pos) {
                long j2 = HDFSComp.this.writePos;
                int i = (int) (j2 - HDFSComp.this.writePos);
                int length = request.value.length - i;
                bArr = new byte[length];
                System.arraycopy(request.value, i, bArr, 0, length);
                HDFSComp.LOG.info("{}convert write pos from:{} to:{} write amount from:{} to:{}", HDFSComp.this.logPrefix, Long.valueOf(request.pos), Long.valueOf(j2), Integer.valueOf(request.value.length), Integer.valueOf(length));
            } else {
                bArr = request.value;
            }
            Try<Boolean> append = HDFSHelper.append(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource, bArr);
            if (!append.isSuccess()) {
                request.respond(HDFSComp.this.convert(append));
                HDFSComp.this.answer(request, request.respond(Result.success(true)));
                return;
            }
            HDFSComp.access$902(HDFSComp.this, HDFSComp.this.writePos + bArr.length);
            HDFSComp.this.pending.put(Long.valueOf(HDFSComp.this.writePos), request);
            if (HDFSComp.this.pending.size() > HDFSComp.this.forceFlushCounter) {
                HDFSComp.this.inspectHDFSFile();
            }
        }
    };
    Handler handleFlush = new Handler<FlushTimeout>() { // from class: se.sics.nstream.hops.hdfs.HDFSComp.4
        AnonymousClass4() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(FlushTimeout flushTimeout) {
            if (HDFSComp.this.pending.isEmpty() || HDFSComp.this.progressed) {
                HDFSComp.this.progressed = false;
                return;
            }
            HDFSHelper.flush(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource);
            HDFSComp.this.inspectHDFSFile();
            HDFSComp.this.progressed = false;
        }
    };

    /* renamed from: se.sics.nstream.hops.hdfs.HDFSComp$1 */
    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$1.class */
    class AnonymousClass1 extends Handler<Start> {
        AnonymousClass1() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(Start start) {
            HDFSComp.LOG.info("{}starting", HDFSComp.this.logPrefix);
            HDFSComp.this.schedulePeriodicCheck();
        }
    }

    /* renamed from: se.sics.nstream.hops.hdfs.HDFSComp$2 */
    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$2.class */
    class AnonymousClass2 extends Handler<DStorageRead.Request> {
        AnonymousClass2() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageRead.Request request) {
            HDFSComp.LOG.trace("{}received:{}", HDFSComp.this.logPrefix, request);
            DStorageRead.Response respond = request.respond(HDFSComp.this.convert(HDFSHelper.read(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource, request.readRange)));
            HDFSComp.LOG.trace("{}answering:{}", HDFSComp.this.logPrefix, respond);
            HDFSComp.this.answer(request, respond);
        }
    }

    /* renamed from: se.sics.nstream.hops.hdfs.HDFSComp$3 */
    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$3.class */
    class AnonymousClass3 extends Handler<DStorageWrite.Request> {
        AnonymousClass3() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(DStorageWrite.Request request) {
            byte[] bArr;
            HDFSComp.LOG.info("{}write:{}", HDFSComp.this.logPrefix, request);
            if (HDFSComp.this.writePos >= request.pos + request.value.length) {
                HDFSComp.LOG.info("{}write with pos:{} skipped", HDFSComp.this.logPrefix, Long.valueOf(request.pos));
                HDFSComp.this.answer(request, request.respond(Result.success(true)));
                return;
            }
            long j = request.pos;
            byte[] bArr2 = request.value;
            if (HDFSComp.this.writePos > request.pos) {
                long j2 = HDFSComp.this.writePos;
                int i = (int) (j2 - HDFSComp.this.writePos);
                int length = request.value.length - i;
                bArr = new byte[length];
                System.arraycopy(request.value, i, bArr, 0, length);
                HDFSComp.LOG.info("{}convert write pos from:{} to:{} write amount from:{} to:{}", HDFSComp.this.logPrefix, Long.valueOf(request.pos), Long.valueOf(j2), Integer.valueOf(request.value.length), Integer.valueOf(length));
            } else {
                bArr = request.value;
            }
            Try<Boolean> append = HDFSHelper.append(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource, bArr);
            if (!append.isSuccess()) {
                request.respond(HDFSComp.this.convert(append));
                HDFSComp.this.answer(request, request.respond(Result.success(true)));
                return;
            }
            HDFSComp.access$902(HDFSComp.this, HDFSComp.this.writePos + bArr.length);
            HDFSComp.this.pending.put(Long.valueOf(HDFSComp.this.writePos), request);
            if (HDFSComp.this.pending.size() > HDFSComp.this.forceFlushCounter) {
                HDFSComp.this.inspectHDFSFile();
            }
        }
    }

    /* renamed from: se.sics.nstream.hops.hdfs.HDFSComp$4 */
    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$4.class */
    class AnonymousClass4 extends Handler<FlushTimeout> {
        AnonymousClass4() {
        }

        @Override // se.sics.kompics.Handler
        public void handle(FlushTimeout flushTimeout) {
            if (HDFSComp.this.pending.isEmpty() || HDFSComp.this.progressed) {
                HDFSComp.this.progressed = false;
                return;
            }
            HDFSHelper.flush(HDFSComp.this.dfs, HDFSComp.this.ugi, HDFSComp.this.hdfsEndpoint, HDFSComp.this.hdfsResource);
            HDFSComp.this.inspectHDFSFile();
            HDFSComp.this.progressed = false;
        }
    }

    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$FlushTimeout.class */
    public static class FlushTimeout extends Timeout {
        public FlushTimeout(SchedulePeriodicTimeout schedulePeriodicTimeout) {
            super(schedulePeriodicTimeout);
        }
    }

    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$HardCodedConfig.class */
    public static class HardCodedConfig {
        public static final long flushPeriod = 1000;
        public static final long delaBlockSize = 10485760;
        public static final long minForceFlushDataSize = 104857600;
    }

    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$Init.class */
    public static class Init extends se.sics.kompics.Init<HDFSComp> {
        public final HDFSEndpoint endpoint;
        public final HDFSResource resource;
        public final UserGroupInformation ugi;
        public final DistributedFileSystem dfs;
        public final long streamPos;

        public Init(DistributedFileSystem distributedFileSystem, UserGroupInformation userGroupInformation, HDFSEndpoint hDFSEndpoint, HDFSResource hDFSResource, long j) {
            this.endpoint = hDFSEndpoint;
            this.resource = hDFSResource;
            this.ugi = userGroupInformation;
            this.dfs = distributedFileSystem;
            this.streamPos = j;
        }
    }

    /* loaded from: input_file:se/sics/nstream/hops/hdfs/HDFSComp$StorageProvider.class */
    public static class StorageProvider implements DurableStorageProvider<HDFSComp> {
        public final Identifier self;
        public final HDFSEndpoint endpoint;

        public StorageProvider(Identifier identifier, HDFSEndpoint hDFSEndpoint) {
            this.self = identifier;
            this.endpoint = hDFSEndpoint;
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public Pair<Init, Long> initiate(StreamResource streamResource) {
            try {
                DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(this.endpoint.hdfsConfig);
                HDFSResource hDFSResource = (HDFSResource) streamResource;
                UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(this.endpoint.user);
                Try<Long> length = HDFSHelper.length(distributedFileSystem, createRemoteUser, this.endpoint, hDFSResource);
                if (!length.isSuccess()) {
                    throw new RuntimeException(TryHelper.tryError(length));
                }
                if (length.get().longValue() == -1) {
                    Try<Boolean> simpleCreate = HDFSHelper.simpleCreate(distributedFileSystem, createRemoteUser, this.endpoint, hDFSResource);
                    if (simpleCreate.isFailure()) {
                        throw new RuntimeException(TryHelper.tryError(simpleCreate));
                    }
                }
                return Pair.with(new Init(distributedFileSystem, createRemoteUser, this.endpoint, hDFSResource, length.get().longValue()), length.get());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public String getName() {
            return this.endpoint.getEndpointName();
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public Class<HDFSComp> getStorageDefinition() {
            return HDFSComp.class;
        }

        @Override // se.sics.nstream.storage.durable.DurableStorageProvider
        public StreamEndpoint getEndpoint() {
            return this.endpoint;
        }
    }

    public HDFSComp(Init init) {
        LOG.info("{}init", this.logPrefix);
        this.hdfsEndpoint = init.endpoint;
        this.hdfsResource = init.resource;
        this.dfs = init.dfs;
        this.ugi = init.ugi;
        this.writePos = init.streamPos;
        this.forceFlushCounter = forceFlushCounter();
        subscribe(this.handleStart, this.control);
        subscribe(this.handleFlush, this.timerPort);
        subscribe(this.handleReadRequest, this.resourcePort);
        subscribe(this.handleWriteRequest, this.resourcePort);
    }

    private int forceFlushCounter() {
        Try<Long> blockSize = HDFSHelper.blockSize(this.dfs, this.ugi, this.hdfsEndpoint, this.hdfsResource);
        return (int) (Math.max(104857600L, blockSize.isSuccess() ? blockSize.get().longValue() : 0L) / 10485760);
    }

    @Override // se.sics.kompics.ComponentDefinition
    public void tearDown() {
        cancelPeriodicCheck();
    }

    public void inspectHDFSFile() {
        Try<Long> length = HDFSHelper.length(this.dfs, this.ugi, this.hdfsEndpoint, this.hdfsResource);
        if (length.isSuccess()) {
            Iterator<Map.Entry<Long, DStorageWrite.Request>> it = this.pending.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, DStorageWrite.Request> next = it.next();
                if (next.getKey().longValue() > length.get().longValue()) {
                    return;
                }
                answer(next.getValue(), next.getValue().respond(Result.success(true)));
                it.remove();
                this.progressed = true;
            }
        }
    }

    public void schedulePeriodicCheck() {
        if (this.flushTimer != null) {
            return;
        }
        SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(1000L, 1000L);
        FlushTimeout flushTimeout = new FlushTimeout(schedulePeriodicTimeout);
        schedulePeriodicTimeout.setTimeoutEvent(flushTimeout);
        this.flushTimer = flushTimeout.getTimeoutId();
        trigger(schedulePeriodicTimeout, this.timerPort);
    }

    private void cancelPeriodicCheck() {
        if (this.flushTimer == null) {
            return;
        }
        CancelTimeout cancelTimeout = new CancelTimeout(this.flushTimer);
        this.flushTimer = null;
        trigger(cancelTimeout, this.timerPort);
    }

    public <O> Result<O> convert(Try<O> r3) {
        return r3.isSuccess() ? Result.success(r3.get()) : Result.internalFailure((Exception) TryHelper.tryError(r3));
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: se.sics.nstream.hops.hdfs.HDFSComp.access$902(se.sics.nstream.hops.hdfs.HDFSComp, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$902(se.sics.nstream.hops.hdfs.HDFSComp r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.writePos = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: se.sics.nstream.hops.hdfs.HDFSComp.access$902(se.sics.nstream.hops.hdfs.HDFSComp, long):long");
    }

    static {
    }
}
