package se.sics.nstream.hops.libmngr.fsm;

import com.google.common.base.Optional;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentProxy;
import se.sics.kompics.Promise;
import se.sics.kompics.fsm.FSMBasicStateNames;
import se.sics.kompics.fsm.FSMException;
import se.sics.kompics.fsm.FSMStateName;
import se.sics.kompics.fsm.handler.FSMBasicEventHandler;
import se.sics.kompics.util.Identifier;
import se.sics.ktoolbox.util.Either;
import se.sics.ktoolbox.util.identifiable.overlay.OverlayId;
import se.sics.ktoolbox.util.network.KAddress;
import se.sics.ktoolbox.util.result.Result;
import se.sics.ktoolbox.util.trysf.Try;
import se.sics.ktoolbox.util.trysf.TryHelper;
import se.sics.nstream.FileId;
import se.sics.nstream.StreamId;
import se.sics.nstream.TorrentIds;
import se.sics.nstream.gcp.DelaGCPHelper;
import se.sics.nstream.gcp.GCPComp;
import se.sics.nstream.gcp.GCPFED;
import se.sics.nstream.hops.HopsFED;
import se.sics.nstream.hops.hdfs.HDFSComp;
import se.sics.nstream.hops.hdfs.disk.DiskComp;
import se.sics.nstream.hops.hdfs.disk.DiskFED;
import se.sics.nstream.hops.kafka.KafkaComp;
import se.sics.nstream.hops.kafka.KafkaEndpoint;
import se.sics.nstream.hops.kafka.KafkaResource;
import se.sics.nstream.hops.libmngr.fsm.LibTInternal;
import se.sics.nstream.hops.library.Details;
import se.sics.nstream.hops.library.event.core.HopsTorrentDownloadEvent;
import se.sics.nstream.hops.library.event.core.HopsTorrentStopEvent;
import se.sics.nstream.hops.library.event.core.HopsTorrentUploadEvent;
import se.sics.nstream.hops.manifest.DiskHelper;
import se.sics.nstream.hops.storage.disk.DiskEndpoint;
import se.sics.nstream.hops.storage.disk.DiskResource;
import se.sics.nstream.hops.storage.gcp.GCPConfig;
import se.sics.nstream.hops.storage.gcp.GCPEndpoint;
import se.sics.nstream.hops.storage.gcp.GCPResource;
import se.sics.nstream.hops.storage.hdfs.HDFSEndpoint;
import se.sics.nstream.hops.storage.hdfs.HDFSHelper;
import se.sics.nstream.hops.storage.hdfs.HDFSResource;
import se.sics.nstream.hops.storage.hops.ManifestHelper;
import se.sics.nstream.hops.storage.hops.ManifestJSON;
import se.sics.nstream.library.event.torrent.TorrentExtendedStatusEvent;
import se.sics.nstream.library.restart.TorrentRestart;
import se.sics.nstream.storage.durable.DurableStorageProvider;
import se.sics.nstream.storage.durable.events.DEndpoint;
import se.sics.nstream.storage.durable.util.MyStream;
import se.sics.nstream.torrent.event.StartTorrent;
import se.sics.nstream.torrent.event.StopTorrent;
import se.sics.nstream.torrent.status.event.DownloadSummaryEvent;
import se.sics.nstream.torrent.tracking.event.StatusSummaryEvent;
import se.sics.nstream.torrent.transfer.event.ctrl.GetRawTorrent;
import se.sics.nstream.torrent.transfer.event.ctrl.SetupTransfer;
import se.sics.nstream.transfer.MyTorrent;

/* loaded from: input_file:se/sics/nstream/hops/libmngr/fsm/LibTHandlers.class */
public class LibTHandlers {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LibTFSM.class);
    static FSMBasicEventHandler fallbackDownloadStart = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentDownloadEvent.StartRequest>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.1
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentDownloadEvent.StartRequest startRequest) {
            libTExternal.getProxy().answer(startRequest, startRequest.fail2(Result.logicalFail("torrent:" + libTInternal.getTorrentId() + "is active already")));
            return fSMStateName;
        }
    };
    static FSMBasicEventHandler fallbackUploadStart = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentUploadEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.2
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentUploadEvent.Request request) {
            libTExternal.getProxy().answer(request, request.fail2(Result.logicalFail("torrent:" + libTInternal.getTorrentId() + "is active already")));
            return fSMStateName;
        }
    };
    static FSMBasicEventHandler stop0 = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentStopEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.3
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentStopEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>stop received for torrent:{} - stopping", request.getLibTFSMId(), request.torrentId);
            LibTHandlers.stop(libTInternal, libTExternal.getProxy(), request);
            LibTHandlers.success(libTInternal, libTExternal.getProxy(), Result.success(true));
            libTExternal.library.killed(libTInternal.getTorrentId());
            return FSMBasicStateNames.FINAL;
        }
    };
    static FSMBasicEventHandler stop1 = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentStopEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.4
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentStopEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>stop received for torrent:{} - move to cleaning endpoints", request.getLibTFSMId(), request.torrentId);
            LibTHandlers.stop(libTInternal, libTExternal.getProxy(), request);
            LibTHandlers.cleanStorage(libTExternal, libTInternal);
            libTExternal.library.killing(libTInternal.getTorrentId());
            return LibTStates.CLEAN_STORAGE;
        }
    };
    static FSMBasicEventHandler stop2 = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentStopEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.5
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentStopEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>stop received for torrent:{} - move to cleaning transfer", request.getLibTFSMId(), request.torrentId);
            LibTHandlers.stop(libTInternal, libTExternal.getProxy(), request);
            LibTHandlers.cleanTransfer(libTExternal, libTInternal);
            libTExternal.library.killing(libTInternal.getTorrentId());
            return LibTStates.CLEAN_TRANSFER;
        }
    };
    static FSMBasicEventHandler stop3 = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentStopEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.6
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentStopEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>stop received for torrent:{} - move to cleaning transfer", request.getLibTFSMId(), libTInternal.getTorrentId());
            LibTHandlers.stop(libTInternal, libTExternal.getProxy(), request);
            LibTHandlers.cleanTransfer(libTExternal, libTInternal);
            libTExternal.library.killing(libTInternal.getTorrentId());
            return LibTStates.CLEAN_TRANSFER;
        }
    };
    static FSMBasicEventHandler stop4 = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentStopEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.7
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentStopEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>stop received for torrent:{} - working on cleaning", request.getLibTFSMId(), libTInternal.getTorrentId());
            LibTHandlers.stop(libTInternal, libTExternal.getProxy(), request);
            return fSMStateName;
        }
    };
    static FSMBasicEventHandler status = new FSMBasicEventHandler<LibTExternal, LibTInternal, TorrentExtendedStatusEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.8
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, TorrentExtendedStatusEvent.Request request) throws FSMException {
            libTInternal.statusReq = Optional.of(request);
            libTExternal.getProxy().trigger(new StatusSummaryEvent.Request(libTInternal.getTorrentId()), libTExternal.torrentStatusPort());
            return fSMStateName;
        }
    };
    static FSMBasicEventHandler statusReport = new FSMBasicEventHandler<LibTExternal, LibTInternal, StatusSummaryEvent.Response>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.9
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, StatusSummaryEvent.Response response) throws FSMException {
            if (libTInternal.statusReq.isPresent()) {
                libTExternal.getProxy().answer(libTInternal.statusReq.get(), libTInternal.statusReq.get().succes(response.result));
            }
            return fSMStateName;
        }
    };
    static FSMBasicEventHandler initDownload = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentDownloadEvent.StartRequest>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.10
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentDownloadEvent.StartRequest startRequest) throws FSMException {
            LibTHandlers.LOG.info("{}accepting new download:{}", startRequest.getLibTFSMId(), startRequest.torrentName);
            libTInternal.setDownload(startRequest);
            return LibTHandlers.downloadInit(libTExternal, libTInternal, startRequest.torrentId, startRequest.torrentName, startRequest.projectId, startRequest.datasetId, LibTHandlers.manifestStreamSetup(libTExternal, libTInternal, startRequest.hdfsEndpoint, startRequest.manifest), startRequest.partners);
        }
    };
    static FSMBasicEventHandler initDownloadRestart = new FSMBasicEventHandler<LibTExternal, LibTInternal, TorrentRestart.DwldReq>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.11
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, TorrentRestart.DwldReq dwldReq) throws FSMException {
            LibTHandlers.LOG.info("{}restarting download:{}", dwldReq.getLibTFSMId(), dwldReq.torrentName);
            libTInternal.setDownloadRestart(dwldReq);
            return LibTHandlers.downloadInit(libTExternal, libTInternal, dwldReq.torrentId, dwldReq.torrentName, dwldReq.projectId, dwldReq.datasetId, dwldReq.manifestStream, dwldReq.partners);
        }
    };
    static FSMBasicEventHandler initUpload = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentUploadEvent.Request>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.12
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentUploadEvent.Request request) throws FSMException {
            LibTHandlers.LOG.info("<{}>accepting new upload:{}", request.getLibTFSMId(), request.torrentName);
            libTInternal.setUpload(request);
            return LibTHandlers.initUpload(libTExternal, libTInternal, request.torrentId, request.torrentName, request.projectId, request.datasetId, LibTHandlers.manifestStreamSetup(libTExternal, libTInternal, request.hdfsEndpoint, request.manifestResource));
        }
    };
    static FSMBasicEventHandler initUploadRestart = new FSMBasicEventHandler<LibTExternal, LibTInternal, TorrentRestart.UpldReq>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.13
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, TorrentRestart.UpldReq upldReq) throws FSMException {
            LibTHandlers.LOG.info("<{}>accepting new upload:{}", upldReq.getLibTFSMId(), upldReq.torrentName);
            libTInternal.setUploadRestart(upldReq);
            return LibTHandlers.initUpload(libTExternal, libTInternal, upldReq.torrentId, upldReq.torrentName, upldReq.projectId, upldReq.datasetId, upldReq.manifestStream);
        }
    };
    static FSMBasicEventHandler prepareManifestStorage = new FSMBasicEventHandler<LibTExternal, LibTInternal, DEndpoint.Success>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.14
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, DEndpoint.Success success) {
            LibTHandlers.LOG.debug("<{}>endpoint:{} prepared", success.getLibTFSMId(), success.req.endpointProvider.getName());
            libTInternal.storageRegistry.connected(success.req.endpointId);
            if (!libTInternal.storageRegistry.isComplete()) {
                return LibTStates.PREPARE_MANIFEST_STORAGE;
            }
            libTInternal.getSetupState().storageSetupComplete(libTInternal.storageRegistry.getSetup());
            LibTHandlers.setupTransfer(libTExternal, libTInternal);
            return LibTStates.PREPARE_TRANSFER;
        }
    };
    static FSMBasicEventHandler prepareTransfer = new FSMBasicEventHandler<LibTExternal, LibTInternal, StartTorrent.Response>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.15
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, StartTorrent.Response response) {
            if (!response.result.isSuccess()) {
                LibTHandlers.LOG.warn("<{}>torrent:{} - start failed", response.getLibTFSMId(), response.overlayId());
                throw new RuntimeException("todo deal with failure");
            }
            LibTHandlers.LOG.debug("<{}>torrent:{} - prepared", response.getLibTFSMId(), response.overlayId());
            if (libTInternal.getSetupState().isDownloadSetup()) {
                LibTHandlers.getManifest(libTExternal, libTInternal);
                return LibTStates.DOWNLOAD_MANIFEST;
            }
            LibTHandlers.readManifest(libTExternal, libTInternal);
            LibTHandlers.prepareBasicDetails(libTExternal, libTInternal);
            libTInternal.advanceTransfer();
            LibTHandlers.advanceTransfer(libTExternal, libTInternal);
            return LibTStates.ADVANCE_TRANSFER;
        }
    };
    static FSMBasicEventHandler downloadManifest = new FSMBasicEventHandler<LibTExternal, LibTInternal, GetRawTorrent.Response>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.16
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, GetRawTorrent.Response response) throws FSMException {
            if (!response.result.isSuccess()) {
                LibTHandlers.LOG.warn("<{}>torrent:{} - start failed", response.getLibTFSMId(), response.overlayId());
                throw new RuntimeException(response.result.getException());
            }
            LibTHandlers.LOG.debug("<{}>torrent:{} - advanced", response.getLibTFSMId(), response.overlayId());
            LibTHandlers.writeManifest(libTExternal, libTInternal, response.result.getValue());
            if (LibTHandlers.withExtendedDetails(libTExternal)) {
                LibTHandlers.success(libTInternal, libTExternal.getProxy(), Result.success(true));
                return LibTStates.EXTENDED_DETAILS;
            }
            LibTHandlers.prepareBasicDetails(libTExternal, libTInternal);
            libTInternal.advanceTransfer();
            LibTHandlers.advanceTransfer(libTExternal, libTInternal);
            return LibTStates.ADVANCE_TRANSFER;
        }
    };
    static FSMBasicEventHandler extendedDetails = new FSMBasicEventHandler<LibTExternal, LibTInternal, HopsTorrentDownloadEvent.AdvanceRequest>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.17
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, HopsTorrentDownloadEvent.AdvanceRequest advanceRequest) throws FSMException {
            if (!advanceRequest.result.isSuccess()) {
                LibTHandlers.LOG.warn("<{}>torrent:{} - start failed", advanceRequest.getLibTFSMId(), advanceRequest.torrentId);
                throw new RuntimeException(advanceRequest.result.getException());
            }
            LibTHandlers.LOG.debug("<{}>torrent:{} - extended details", advanceRequest.getLibTFSMId(), advanceRequest.torrentId);
            libTInternal.setDownloadAdvance(advanceRequest);
            if (LibTHandlers.setupFileStorageEndpoints(libTExternal, libTInternal, advanceRequest.kafkaEndpoint)) {
                return LibTStates.PREPARE_FILES_STORAGE;
            }
            LibTHandlers.prepareBasicDetails(libTExternal, libTInternal);
            libTInternal.advanceTransfer();
            LibTHandlers.advanceTransfer(libTExternal, libTInternal);
            return LibTStates.ADVANCE_TRANSFER;
        }
    };
    static FSMBasicEventHandler prepareFilesStorage = new FSMBasicEventHandler<LibTExternal, LibTInternal, DEndpoint.Success>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.18
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, DEndpoint.Success success) {
            LibTHandlers.LOG.debug("<{}>endpoint:{} prepared", success.getLibTFSMId(), success.req.endpointProvider.getName());
            libTInternal.storageRegistry.connected(success.req.endpointId);
            if (!libTInternal.storageRegistry.isComplete()) {
                return LibTStates.PREPARE_FILES_STORAGE;
            }
            LibTHandlers.prepareExtendedDetails(libTExternal, libTInternal);
            libTInternal.getSetupState().storageSetupComplete(libTInternal.storageRegistry.getSetup());
            libTInternal.advanceTransfer();
            LibTHandlers.advanceTransfer(libTExternal, libTInternal);
            return LibTStates.ADVANCE_TRANSFER;
        }
    };
    static FSMBasicEventHandler advanceTransfer = new FSMBasicEventHandler<LibTExternal, LibTInternal, SetupTransfer.Response>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.19
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, SetupTransfer.Response response) throws FSMException {
            if (!response.result.isSuccess()) {
                LibTHandlers.LOG.debug("<{}>torrent:{} - transfer - set up failed", response.getLibTFSMId(), response.overlayId());
                throw new RuntimeException(response.result.getException());
            }
            LibTHandlers.LOG.debug("<{}>torrent:{} - transfer - set up", response.getLibTFSMId(), response.overlayId());
            if (!libTExternal.library.containsTorrent(response.overlayId())) {
                throw new RuntimeException("mismatch between library and fsm - broken");
            }
            LibTHandlers.success(libTInternal, libTExternal.getProxy(), Result.success(true));
            LibTInternal.TComplete completeState = libTInternal.getCompleteState();
            if (completeState.download) {
                libTExternal.library.download(libTInternal.getTorrentId(), completeState.manifestStream);
                return LibTStates.DOWNLOADING;
            }
            libTExternal.library.upload(libTInternal.getTorrentId(), completeState.manifestStream);
            return LibTStates.UPLOADING;
        }
    };
    static FSMBasicEventHandler downloadCompleted = new FSMBasicEventHandler<LibTExternal, LibTInternal, DownloadSummaryEvent>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.20
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, DownloadSummaryEvent downloadSummaryEvent) {
            LibTHandlers.LOG.debug("<{}>torrent:{} - download completed", downloadSummaryEvent.getLibTFSMId(), downloadSummaryEvent.torrentId);
            if (!libTExternal.library.containsTorrent(downloadSummaryEvent.torrentId)) {
                throw new RuntimeException("mismatch between library and fsm - critical logical error");
            }
            libTExternal.library.finishDownload(downloadSummaryEvent.torrentId);
            return LibTStates.UPLOADING;
        }
    };
    static FSMBasicEventHandler endpointCleaning = new FSMBasicEventHandler<LibTExternal, LibTInternal, DEndpoint.Disconnected>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.21
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, DEndpoint.Disconnected disconnected) throws FSMException {
            LibTHandlers.LOG.debug("<{}>endpoint:{} cleaned", disconnected.getLibTFSMId(), disconnected.req.endpointId);
            libTInternal.storageRegistry.cleaned(disconnected.req.endpointId);
            libTExternal.endpointIdRegistry.release(disconnected.req.endpointId);
            LibTHandlers.LOG.debug("<{}> active:{} endpoints:{}", disconnected.getLibTFSMId(), Integer.valueOf(libTInternal.storageRegistry.endpointView().size()), libTInternal.storageRegistry.endpointView());
            if (!libTInternal.storageRegistry.cleaningComplete()) {
                return LibTStates.CLEAN_STORAGE;
            }
            LibTHandlers.success(libTInternal, libTExternal.getProxy(), Result.success(true));
            libTExternal.library.killed(libTInternal.getTorrentId());
            LibTHandlers.LOG.info("<{}>terminating fsm", disconnected.getLibTFSMId());
            return FSMBasicStateNames.FINAL;
        }
    };
    static FSMBasicEventHandler transferCleaning = new FSMBasicEventHandler<LibTExternal, LibTInternal, StopTorrent.Response>() { // from class: se.sics.nstream.hops.libmngr.fsm.LibTHandlers.22
        @Override // se.sics.kompics.fsm.handler.FSMBasicEventHandler
        public FSMStateName handle(FSMStateName fSMStateName, LibTExternal libTExternal, LibTInternal libTInternal, StopTorrent.Response response) {
            LibTHandlers.LOG.debug("<{}>torrent:{} cleaned", response.getLibTFSMId(), response.torrentId);
            if (!response.result.isSuccess()) {
                throw new RuntimeException("TODO Alex - what do you do when the cleaning operation fails");
            }
            LibTHandlers.cleanStorage(libTExternal, libTInternal);
            return LibTStates.CLEAN_STORAGE;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    public static LibTStates downloadInit(LibTExternal libTExternal, LibTInternal libTInternal, OverlayId overlayId, String str, Integer num, Integer num2, MyStream myStream, List<KAddress> list) {
        if (libTExternal.library.containsTorrent(overlayId)) {
            throw new RuntimeException("library and fsm do not agree - cannot fix it while running - logic error");
        }
        libTExternal.library.prepareDownload(overlayId, num, num2, str, list);
        setupManifestStorageEndpoint(libTExternal, libTInternal, saveManifestStream(libTExternal, libTInternal, myStream));
        if (!libTInternal.storageRegistry.isComplete()) {
            return LibTStates.PREPARE_MANIFEST_STORAGE;
        }
        libTInternal.getSetupState().storageSetupComplete(libTInternal.storageRegistry.getSetup());
        setupTransfer(libTExternal, libTInternal);
        return LibTStates.PREPARE_TRANSFER;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static LibTStates initUpload(LibTExternal libTExternal, LibTInternal libTInternal, OverlayId overlayId, String str, Integer num, Integer num2, MyStream myStream) {
        if (libTExternal.library.containsTorrent(overlayId)) {
            throw new RuntimeException("library and fsm do not agree - cannot fix it while running - logic error");
        }
        libTExternal.library.prepareUpload(overlayId, num, num2, str);
        setupManifestStorageEndpoint(libTExternal, libTInternal, saveManifestStream(libTExternal, libTInternal, myStream));
        if (!libTInternal.storageRegistry.isComplete()) {
            return LibTStates.PREPARE_MANIFEST_STORAGE;
        }
        libTInternal.getSetupState().storageSetupComplete(libTInternal.storageRegistry.getSetup());
        setupTransfer(libTExternal, libTInternal);
        return LibTStates.PREPARE_TRANSFER;
    }

    private static Set<String> saveManifestStream(LibTExternal libTExternal, LibTInternal libTInternal, MyStream myStream) {
        HashSet hashSet = new HashSet();
        Identifier lookup = libTExternal.endpointIdRegistry.lookup(myStream.endpoint.getEndpointName());
        if (lookup == null) {
            hashSet.add(myStream.endpoint.getEndpointName());
            lookup = libTExternal.endpointIdRegistry.register(myStream.endpoint.getEndpointName());
        }
        libTInternal.getSetupState().setManifestStream(TorrentIds.streamId(lookup, TorrentIds.fileId(libTInternal.getTorrentId(), 0)), myStream);
        return hashSet;
    }

    private static void setupManifestStorageEndpoint(LibTExternal libTExternal, LibTInternal libTInternal, Set<String> set) {
        LinkedList linkedList = new LinkedList();
        linkedList.add(getManifestStorageProvider(libTExternal, libTInternal));
        setupStorageEndpoints(libTExternal, libTInternal, linkedList, set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean setupFileStorageEndpoints(LibTExternal libTExternal, LibTInternal libTInternal, Optional<KafkaEndpoint> optional) {
        return setupStorageEndpoints(libTExternal, libTInternal, getFilesStorageProvider(libTExternal, libTInternal, optional), new HashSet());
    }

    private static boolean setupStorageEndpoints(LibTExternal libTExternal, LibTInternal libTInternal, List<DurableStorageProvider> list, Set<String> set) {
        boolean z = false;
        for (DurableStorageProvider durableStorageProvider : list) {
            String name = durableStorageProvider.getName();
            Identifier lookup = libTExternal.endpointIdRegistry.lookup(name);
            if (lookup == null) {
                lookup = libTExternal.endpointIdRegistry.register(name);
            }
            libTInternal.storageRegistry.addWaiting(name, lookup, durableStorageProvider.getEndpoint());
            libTExternal.endpointIdRegistry.use(lookup);
            libTExternal.getProxy().trigger(new DEndpoint.Connect(libTInternal.getTorrentId(), lookup, durableStorageProvider), libTExternal.endpointPort());
            z = true;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void cleanStorage(LibTExternal libTExternal, LibTInternal libTInternal) {
        Iterator<Identifier> it = libTInternal.storageRegistry.selfCleaning().iterator();
        while (it.hasNext()) {
            libTExternal.getProxy().trigger(new DEndpoint.Disconnect(libTInternal.getTorrentId(), it.next()), libTExternal.endpointPort());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void readManifest(LibTExternal libTExternal, LibTInternal libTInternal) {
        MyStream manifestStream = libTInternal.getSetupState().getManifestStream();
        Result<ManifestJSON> readManifestFromStorage = readManifestFromStorage(libTExternal, manifestStream);
        if (readManifestFromStorage.isSuccess()) {
            libTInternal.getSetupState().setManifest(Either.right(readManifestFromStorage.getValue()));
        } else {
            LOG.error("could not read manifest file:{}", manifestStream.resource);
            throw new RuntimeException("could not read manifest file:" + manifestStream.resource);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void writeManifest(LibTExternal libTExternal, LibTInternal libTInternal, MyTorrent.Manifest manifest) {
        writeManifestToStorage(libTExternal, libTInternal, manifest);
        libTInternal.getSetupState().setManifest(Either.left(manifest));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setupTransfer(LibTExternal libTExternal, LibTInternal libTInternal) {
        libTExternal.getProxy().trigger(new StartTorrent.Request(libTInternal.getTorrentId(), libTInternal.getPartners()), libTExternal.torrentMngrPort());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void cleanTransfer(LibTExternal libTExternal, LibTInternal libTInternal) {
        libTExternal.getProxy().trigger(new StopTorrent.Request(libTInternal.getTorrentId()), libTExternal.torrentMngrPort());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void getManifest(LibTExternal libTExternal, LibTInternal libTInternal) {
        libTExternal.getProxy().trigger(new GetRawTorrent.Request(libTInternal.getTorrentId()), libTExternal.transferCtrlPort());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void advanceTransfer(LibTExternal libTExternal, LibTInternal libTInternal) {
        libTExternal.getProxy().trigger(new SetupTransfer.Request(libTInternal.getTorrentId(), libTInternal.getCompleteState().torrent), libTExternal.transferCtrlPort());
    }

    private static DurableStorageProvider getManifestStorageProvider(LibTExternal libTExternal, LibTInternal libTInternal) {
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            return new DiskComp.StorageProvider(libTExternal.selfAdr.getId());
        }
        if (libTExternal.fsmType.equals(Details.Types.GCP)) {
            return new GCPComp.StorageProvider(libTExternal.selfAdr.getId(), (GCPEndpoint) libTInternal.getSetupState().getManifestStream().endpoint);
        }
        return new HDFSComp.StorageProvider(libTExternal.selfAdr.getId(), (HDFSEndpoint) libTInternal.getSetupState().getManifestStream().endpoint);
    }

    private static List<DurableStorageProvider> getFilesStorageProvider(LibTExternal libTExternal, LibTInternal libTInternal, Optional<KafkaEndpoint> optional) {
        LinkedList linkedList = new LinkedList();
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            linkedList.add(new DiskComp.StorageProvider(libTExternal.selfAdr.getId()));
        } else if (libTExternal.fsmType.equals(Details.Types.GCP)) {
            linkedList.add(new GCPComp.StorageProvider(libTExternal.selfAdr.getId(), (GCPEndpoint) libTInternal.getSetupState().getManifestStream().endpoint));
        } else {
            linkedList.add(new HDFSComp.StorageProvider(libTExternal.selfAdr.getId(), (HDFSEndpoint) libTInternal.getSetupState().getManifestStream().endpoint));
            if (optional.isPresent()) {
                linkedList.add(new KafkaComp.StorageProvider(libTExternal.selfAdr.getId(), optional.get()));
            }
        }
        return linkedList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MyStream manifestStreamSetup(LibTExternal libTExternal, LibTInternal libTInternal, HDFSEndpoint hDFSEndpoint, HDFSResource hDFSResource) {
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            return new MyStream(new DiskEndpoint(), new DiskResource(hDFSResource.dirPath, hDFSResource.fileName));
        }
        if (!libTExternal.fsmType.equals(Details.Types.GCP)) {
            return new MyStream(hDFSEndpoint, hDFSResource);
        }
        try {
            GCPConfig checkedGet = GCPConfig.read(libTExternal.getConfig()).checkedGet();
            return new MyStream(new GCPEndpoint(checkedGet.credentials, checkedGet.project), new GCPResource(checkedGet.bucket, hDFSResource.dirPath, hDFSResource.fileName));
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private static Result<ManifestJSON> readManifestFromStorage(LibTExternal libTExternal, MyStream myStream) {
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            return DiskHelper.readManifest((DiskResource) myStream.resource);
        }
        if (libTExternal.fsmType.equals(Details.Types.GCP)) {
            return DelaGCPHelper.readManifest((GCPEndpoint) myStream.endpoint, (GCPResource) myStream.resource);
        }
        HDFSEndpoint hDFSEndpoint = (HDFSEndpoint) myStream.endpoint;
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(hDFSEndpoint.user);
        try {
            DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(hDFSEndpoint.hdfsConfig);
            Throwable th = null;
            try {
                try {
                    Result<ManifestJSON> convert = convert(HDFSHelper.readManifest(distributedFileSystem, createRemoteUser, hDFSEndpoint, (HDFSResource) myStream.resource));
                    if (distributedFileSystem != null) {
                        if (0 != 0) {
                            try {
                                distributedFileSystem.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            distributedFileSystem.close();
                        }
                    }
                    return convert;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static <O> Result<O> convert(Try<O> r2) {
        return r2.isSuccess() ? Result.success(r2.get()) : Result.internalFailure((Exception) TryHelper.tryError(r2));
    }

    private static Result<Boolean> writeManifestToStorage(LibTExternal libTExternal, LibTInternal libTInternal, MyTorrent.Manifest manifest) {
        MyStream manifestStream = libTInternal.getSetupState().getManifestStream();
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            return DiskHelper.writeManifest((DiskResource) manifestStream.resource, ManifestHelper.getManifestJSON(manifest.manifestByte));
        }
        if (libTExternal.fsmType.equals(Details.Types.GCP)) {
            return DelaGCPHelper.writeManifest((GCPEndpoint) manifestStream.endpoint, (GCPResource) manifestStream.resource, ManifestHelper.getManifestJSON(manifest.manifestByte));
        }
        ManifestJSON manifestJSON = ManifestHelper.getManifestJSON(manifest.manifestByte);
        HDFSEndpoint hDFSEndpoint = (HDFSEndpoint) manifestStream.endpoint;
        HDFSResource hDFSResource = (HDFSResource) manifestStream.resource;
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(hDFSEndpoint.user);
        try {
            DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(hDFSEndpoint.hdfsConfig);
            Throwable th = null;
            try {
                try {
                    Result<Boolean> convert = convert(HDFSHelper.writeManifest(distributedFileSystem, createRemoteUser, hDFSEndpoint, hDFSResource, manifestJSON));
                    if (distributedFileSystem != null) {
                        if (0 != 0) {
                            try {
                                distributedFileSystem.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            distributedFileSystem.close();
                        }
                    }
                    return convert;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void prepareBasicDetails(LibTExternal libTExternal, LibTInternal libTInternal) {
        HashMap hashMap = new HashMap();
        StreamId manifestStreamId = libTInternal.getSetupState().getManifestStreamId();
        MyStream manifestStream = libTInternal.getSetupState().getManifestStream();
        if (libTExternal.fsmType.equals(Details.Types.DISK)) {
            DiskResource diskResource = (DiskResource) manifestStream.resource;
            for (Map.Entry<String, FileId> entry : libTInternal.getSetupState().getFiles().entrySet()) {
                hashMap.put(entry.getValue(), new DiskFED(manifestStreamId.withFile(entry.getValue()), manifestStream.withResource(diskResource.withFile(entry.getKey()))));
            }
        } else if (libTExternal.fsmType.equals(Details.Types.GCP)) {
            GCPResource gCPResource = (GCPResource) manifestStream.resource;
            for (Map.Entry<String, FileId> entry2 : libTInternal.getSetupState().getFiles().entrySet()) {
                hashMap.put(entry2.getValue(), new GCPFED(manifestStreamId.withFile(entry2.getValue()), manifestStream.withResource(gCPResource.withFile(entry2.getKey()))));
            }
        } else {
            HDFSResource hDFSResource = (HDFSResource) manifestStream.resource;
            for (Map.Entry<String, FileId> entry3 : libTInternal.getSetupState().getFiles().entrySet()) {
                StreamId withFile = manifestStreamId.withFile(entry3.getValue());
                MyStream withResource = manifestStream.withResource(hDFSResource.withFile(entry3.getKey()));
                hashMap.put(entry3.getValue(), new HopsFED(Pair.with(withFile, withResource), Optional.absent()));
            }
        }
        libTInternal.getSetupState().setDetails(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean withExtendedDetails(LibTExternal libTExternal) {
        return (libTExternal.fsmType.equals(Details.Types.DISK) || libTExternal.fsmType.equals(Details.Types.GCP)) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void prepareExtendedDetails(LibTExternal libTExternal, LibTInternal libTInternal) {
        HashMap hashMap = new HashMap();
        StreamId manifestStreamId = libTInternal.getSetupState().getManifestStreamId();
        MyStream manifestStream = libTInternal.getSetupState().getManifestStream();
        HDFSResource hDFSResource = (HDFSResource) manifestStream.resource;
        LOG.debug("kafka files:{}", Integer.valueOf(libTInternal.auxState.getKafkaDetails().size()));
        for (Map.Entry<String, FileId> entry : libTInternal.getSetupState().getFiles().entrySet()) {
            LOG.debug("preparing file:{}", entry.getKey());
            StreamId withFile = manifestStreamId.withFile(entry.getValue());
            MyStream withResource = manifestStream.withResource(hDFSResource.withFile(entry.getKey()));
            Optional absent = Optional.absent();
            KafkaResource kafkaResource = libTInternal.auxState.getKafkaDetails().get(entry.getKey());
            if (kafkaResource != null) {
                LOG.debug("preparing kafka:{}", kafkaResource.getSinkName());
                Identifier nameToId = libTInternal.storageRegistry.nameToId(libTInternal.auxState.getKafkaEndpointName());
                absent = Optional.of(Pair.with(TorrentIds.streamId(nameToId, entry.getValue()), new MyStream(libTInternal.storageRegistry.getEndpoint(nameToId), kafkaResource)));
            }
            hashMap.put(entry.getValue(), new HopsFED(Pair.with(withFile, withResource), absent));
        }
        libTInternal.getSetupState().setDetails(hashMap);
    }

    public static void failed(LibTInternal libTInternal, ComponentProxy componentProxy, Result result) throws FSMException {
        Optional<Promise> active = libTInternal.ar.active();
        if (!active.isPresent()) {
            throw new FSMException("no active req");
        }
        componentProxy.answer(active.get(), active.get().fail2(result));
        libTInternal.ar.reset();
    }

    public static void success(LibTInternal libTInternal, ComponentProxy componentProxy, Result result) throws FSMException {
        Optional<Promise> active = libTInternal.ar.active();
        if (!active.isPresent()) {
            throw new FSMException("no active req");
        }
        componentProxy.answer(active.get(), active.get().success2(result));
        libTInternal.ar.reset();
    }

    public static void stop(LibTInternal libTInternal, ComponentProxy componentProxy, HopsTorrentStopEvent.Request request) throws FSMException {
        if (libTInternal.ar.isStopping()) {
            componentProxy.answer(request, request.fail2(Result.logicalFail("stop already in progress")));
        } else {
            try {
                failed(libTInternal, componentProxy, Result.logicalFail("stop event"));
            } catch (FSMException e) {
            }
            libTInternal.setStop(request);
        }
    }
}
