/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CloudProvider;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RemovedBlock;
import org.apache.hadoop.hdfs.server.common.CloudHelper;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CloudFsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.cloud.CloudPersistenceProvider;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.cloud.CloudPersistenceProviderFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.cloud.PartRef;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.cloud.UploadID;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;

public class CloudFsDatasetImpl
extends FsDatasetImpl {
    public static final String GEN_STAMP = "GEN_STAMP";
    public static final String OBJECT_SIZE = "OBJECT_SIZE";
    public static final String META_FILE_SIZE = "META_FILE_SIZE";
    public static final String BLOCK_FILE_SIZE = "BLOCK_FILE_SIZE";
    static final Log LOG = LogFactory.getLog(CloudFsDatasetImpl.class);
    private CloudPersistenceProvider cloud;
    private final boolean bypassCache;
    private final int prefixSize;
    private ExecutorService threadPoolExecutor;
    private final boolean isVersioningSupported;
    private final int readCIDRetries;

    CloudFsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf) throws IOException {
        super(datanode, storage, conf);
        this.bypassCache = conf.getBoolean("dfs.dn.cloud.bypass.cache", false);
        this.prefixSize = conf.getInt("dfs.cloud.prefix.size", 500);
        this.readCIDRetries = conf.getInt("dfs.cloud.read.cid.retires", 60);
        this.cloud = CloudPersistenceProviderFactory.getCloudClient(conf);
        this.cloud.checkAllBuckets(CloudHelper.getBucketsFromConf(conf));
        this.isVersioningSupported = this.cloud.isVersioningSupported(CloudHelper.getBucketsFromConf(conf).get(0));
        this.threadPoolExecutor = Executors.newFixedThreadPool(this.cloud.getXferThreads());
        this.checkCID();
    }

    void checkCID() throws IOException {
        assert (CloudHelper.getBucketsFromConf(this.conf).size() == 1);
        String bucket = CloudHelper.getBucketsFromConf(this.conf).get(0);
        String cid = null;
        for (int i = 0; i < this.readCIDRetries; ++i) {
            try {
                cid = this.cloud.getCID(bucket);
                break;
            }
            catch (Exception e) {
                LOG.warn((Object)("Error reading CID. Exception: " + e));
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
        if (cid == null) {
            throw new IOException("Unable to read CID from the bucket");
        }
        if (cid.compareTo(this.dataStorage.clusterID) != 0) {
            String msg = "ClusterID does not match. Expecting: " + this.dataStorage.clusterID + " Got: " + cid;
            LOG.error((Object)msg);
            throw new IOException(msg);
        }
    }

    @Override
    public void syncToCloud(ExtendedBlock b) throws IOException {
        if (!b.isProvidedBlock()) {
            super.syncToCloud(b);
        } else {
            this.syncToCloudInternal(b);
        }
    }

    private void syncToCloudInternal(ExtendedBlock b) throws IOException {
        ReplicaInfo replicaInfo = this.getReplicaInfo(b);
        File blockFile = replicaInfo.getBlockFile();
        File metaFile = replicaInfo.getMetaFile();
        String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
        if (replicaInfo instanceof ProvidedReplicaBeingWritten) {
            ((ProvidedReplicaBeingWritten)replicaInfo).setSynced(true);
            ((ProvidedReplicaBeingWritten)replicaInfo).setCancellMultipart(true);
            boolean isMultiPart = ((ProvidedReplicaBeingWritten)replicaInfo).isMultipart();
            if (isMultiPart) {
                ((ProvidedReplicaBeingWritten)replicaInfo).setMultipart(false);
                this.cloud.abortMultipartUpload(b.getCloudBucket(), blockFileKey, ((ProvidedReplicaBeingWritten)replicaInfo).getUploadID());
            }
        }
        HashMap<String, String> metadata = this.getMetaMetadataRBW(replicaInfo, metaFile, blockFile);
        this.cloud.uploadObject(b.getCloudBucket(), metaFileKey, metaFile, metadata);
        this.cloud.uploadObject(b.getCloudBucket(), blockFileKey, blockFile, this.getBlockFileMetadata(b.getLocalBlock()));
        LOG.info((Object)("HopsFS-Cloud. Sync an open block to the cloud. Block: " + b.getLocalBlock() + " Block size: " + metadata.get(OBJECT_SIZE) + " Block file size " + metadata.get(META_FILE_SIZE) + " Meta file size " + metadata.get(BLOCK_FILE_SIZE)));
        if (this.isVersioningSupported) {
            if (this.cloud.objectExists(b.getCloudBucket(), blockFileKey)) {
                this.cloud.deleteOldVersions(b.getCloudBucket(), blockFileKey);
            }
            if (this.cloud.objectExists(b.getCloudBucket(), metaFileKey)) {
                this.cloud.deleteOldVersions(b.getCloudBucket(), metaFileKey);
            }
        }
    }

    @Override
    public void preFinalize(ExtendedBlock b) throws IOException {
        if (!b.isProvidedBlock()) {
            super.preFinalize(b);
        } else {
            this.preFinalizeInternal(b);
        }
    }

    public void preFinalizeInternal(ExtendedBlock b) throws IOException {
        LOG.debug((Object)("HopsFS-Cloud. Prefinalize Stage. Uploading... Block: " + b.getLocalBlock()));
        ReplicaInfo replicaInfo = this.getReplicaInfo(b);
        boolean isMultiPart = false;
        boolean isSynced = false;
        boolean isAppend = false;
        boolean isRecover = false;
        boolean expectedToExistInCloud = false;
        if (replicaInfo instanceof ProvidedReplicaBeingWritten) {
            isMultiPart = ((ProvidedReplicaBeingWritten)replicaInfo).isMultipart();
            isSynced = ((ProvidedReplicaBeingWritten)replicaInfo).isSynced();
            isAppend = ((ProvidedReplicaBeingWritten)replicaInfo).isAppend();
            isRecover = ((ProvidedReplicaBeingWritten)replicaInfo).isRecovered();
            boolean bl = expectedToExistInCloud = isSynced || isAppend || isRecover;
        }
        if (isMultiPart) assert (!expectedToExistInCloud);
        File blockFile = replicaInfo.getBlockFile();
        File metaFile = replicaInfo.getMetaFile();
        String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
        if (!expectedToExistInCloud && this.cloud.objectExists(b.getCloudBucket(), metaFileKey)) {
            LOG.error((Object)("HopsFS-Cloud. Block: " + b + " meta file already exists."));
            throw new IOException("Block: " + b + " meta file already exists.");
        }
        this.cloud.uploadObject(b.getCloudBucket(), metaFileKey, metaFile, this.getMetaMetadata(b.getLocalBlock(), metaFile, blockFile));
        if (isMultiPart) {
            while (!((ProvidedReplicaBeingWritten)replicaInfo).isMultipartComplete()) {
                try {
                    Thread.sleep(30L);
                }
                catch (InterruptedException interruptedException) {}
            }
        } else if (expectedToExistInCloud || !this.cloud.objectExists(b.getCloudBucket(), blockFileKey)) {
            this.cloud.uploadObject(b.getCloudBucket(), blockFileKey, blockFile, this.getBlockFileMetadata(b.getLocalBlock()));
        } else {
            LOG.error((Object)("HopsFS-Cloud. Block: " + b + " already exists."));
            throw new IOException("Block: " + b + " already exists.");
        }
        if (isAppend || isRecover) {
            for (long gs : ((ProvidedReplicaBeingWritten)replicaInfo).getOldGS()) {
                Block oldBlk = new Block(b.getLocalBlock());
                oldBlk.setGenerationStampNoPersistance(gs);
                String oldBlockObjKey = CloudHelper.getBlockKey(this.prefixSize, oldBlk);
                String oldMetaObjKey = CloudHelper.getMetaFileKey(this.prefixSize, oldBlk);
                this.cloud.deleteObject(oldBlk.getCloudBucket(), oldBlockObjKey);
                this.cloud.deleteObject(oldBlk.getCloudBucket(), oldMetaObjKey);
            }
            if (this.isVersioningSupported) {
                this.cloud.deleteOldVersions(b.getCloudBucket(), blockFileKey);
                this.cloud.deleteOldVersions(b.getCloudBucket(), metaFileKey);
            }
        }
    }

    @Override
    public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
        if (!b.isProvidedBlock()) {
            super.finalizeBlock(b);
        } else {
            this.finalizeBlockInternal(b);
        }
    }

    private synchronized void finalizeBlockInternal(ExtendedBlock b) throws IOException {
        LOG.debug((Object)("HopsFS-Cloud. Finalizing bloclk. Block: " + b.getLocalBlock()));
        if (Thread.interrupted()) {
            throw new IOException("Cannot finalize block from Interrupted Thread");
        }
        ReplicaInfo replicaInfo = this.getReplicaInfo(b);
        File blockFile = replicaInfo.getBlockFile();
        File metaFile = replicaInfo.getMetaFile();
        long dfsBytes = blockFile.length() + metaFile.length();
        FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
        v.releaseReservedSpace(replicaInfo.getBytesReserved());
        v.decDfsUsed(b.getBlockPoolId(), dfsBytes);
        this.volumeMap.remove(b.getBlockPoolId(), replicaInfo.getBlockId());
        if (this.bypassCache) {
            blockFile.delete();
            metaFile.delete();
        } else {
            FsVolumeImpl cloudVol = this.getCloudVolume();
            File cDir = cloudVol.getCacheDir(b.getBlockPoolId());
            File movedBlock = new File(cDir, CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock()));
            File movedMetaFile = new File(cDir, CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock()));
            this.moveToCache(blockFile, movedBlock, b.getBlockPoolId());
            this.moveToCache(metaFile, movedMetaFile, b.getBlockPoolId());
        }
    }

    @Override
    public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException {
        if (!b.isProvidedBlock() || b.isProvidedBlock() && this.volumeMap.get(b.getBlockPoolId(), b.getBlockId()) != null) {
            if (b.isProvidedBlock() && LOG.isDebugEnabled()) {
                LOG.debug((Object)("HopsFS-Cloud. The block is being written. Get block inputstream " + b.getLocalBlock()));
            }
            return super.getBlockInputStream(b, seekOffset);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("HopsFS-Cloud. Get block inputstream " + b.getLocalBlock()));
        }
        FsVolumeImpl cloudVolume = this.getCloudVolume();
        File localBlkCopy = new File(cloudVolume.getCacheDir(b.getBlockPoolId()), CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock()));
        String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        return this.getInputStreamInternal(b.getCloudBucket(), blockFileKey, localBlkCopy, b.getBlockPoolId(), seekOffset);
    }

    @Override
    public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException {
        if (!b.isProvidedBlock() || b.isProvidedBlock() && this.volumeMap.get(b.getBlockPoolId(), b.getBlockId()) != null) {
            if (b.isProvidedBlock() && LOG.isDebugEnabled()) {
                LOG.debug((Object)("HopsFS-Cloud. The block is being written. Get block's metadata inputstream " + b.getLocalBlock()));
            }
            return super.getMetaDataInputStream(b);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("HopsFS-Cloud. Get block's metadata inputstream " + b.getLocalBlock()));
        }
        FsVolumeImpl cloudVolume = this.getCloudVolume();
        String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
        File localMetaFileCopy = new File(cloudVolume.getCacheDir(b.getBlockPoolId()), CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock()));
        InputStream is = this.getInputStreamInternal(b.getCloudBucket(), metaFileKey, localMetaFileCopy, b.getBlockPoolId(), 0L);
        LengthInputStream lis = new LengthInputStream(is, localMetaFileCopy.length());
        return lis;
    }

    private InputStream getInputStreamInternal(String cloudBucket, String objectKey, File localCopy, String bpid, long seekOffset) throws IOException {
        try {
            long startTime = System.currentTimeMillis();
            boolean download = this.bypassCache;
            if (!this.bypassCache) {
                if (localCopy.exists()) {
                    boolean sizeMatches;
                    long cloudBlockLen = this.cloud.getObjectSize(cloudBucket, objectKey);
                    boolean bl = sizeMatches = cloudBlockLen == localCopy.length();
                    if (!sizeMatches) {
                        LOG.warn((Object)("HopsFS-Cloud. Ignoring cached block. The size of the block in cache does not match with the block size in cloud. Block key: " + objectKey + "Block size in cache: " + localCopy.length() + " Block size in cloud: " + cloudBlockLen));
                        localCopy.delete();
                        download = true;
                    }
                } else {
                    download = true;
                }
            }
            if (download) {
                this.cloud.downloadObject(cloudBucket, objectKey, localCopy);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("HopsFS-Cloud. Reading provided block from cache. Block: " + objectKey));
            }
            FileInputStream ioStream = new FileInputStream(localCopy);
            ((InputStream)ioStream).skip(seekOffset);
            this.providedBlocksCacheUpdateTS(bpid, localCopy);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("HopsFS-Cloud. " + objectKey + " GetInputStream Fn Time(ms) :" + (System.currentTimeMillis() - startTime)));
            }
            return ioStream;
        }
        catch (IOException e) {
            LOG.warn((Object)("Could not read " + objectKey + ". "), (Throwable)e);
            throw e;
        }
    }

    @Override
    @Deprecated
    public ReplicaInfo getReplica(ExtendedBlock b) {
        if (!b.isProvidedBlock()) {
            return super.getReplica(b);
        }
        if (b.isProvidedBlock() && this.volumeMap.get(b.getBlockPoolId(), b.getBlockId()) != null) {
            return super.getReplica(b);
        }
        return this.getReplicaInternal(b);
    }

    public ReplicaInfo getReplicaInternal(ExtendedBlock b) {
        ReplicaInfo replicaInfo = super.getReplica(b);
        if (replicaInfo != null) {
            return replicaInfo;
        }
        try {
            String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
            Map<String, String> metadata = this.cloud.getUserMetaData(b.getCloudBucket(), metaFileKey);
            long genStamp = Long.parseLong(metadata.get(GEN_STAMP));
            long size = Long.parseLong(metadata.get(OBJECT_SIZE));
            FinalizedReplica info = new FinalizedReplica(b.getBlockId(), size, genStamp, b.getCloudBucket(), this.getCloudVolume(), this.getCloudVolume().getCacheDir(b.getBlockPoolId()));
            return info;
        }
        catch (IOException up) {
            LOG.info((Object)up, (Throwable)up);
            return null;
        }
    }

    public boolean isProvideBlockFinalized(ExtendedBlock b) {
        assert (b.isProvidedBlock());
        return super.getReplica(b) == null;
    }

    private String getCloudProviderName() {
        return this.conf.get("dfs.cloud.provider", "AWS");
    }

    @Override
    FsVolumeImpl getNewFsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException {
        if (storageType == StorageType.CLOUD) {
            if (this.getCloudProviderName().compareToIgnoreCase(CloudProvider.AWS.name()) == 0 || this.getCloudProviderName().compareToIgnoreCase(CloudProvider.AZURE.name()) == 0 || this.getCloudProviderName().compareToIgnoreCase(CloudProvider.GCS.name()) == 0) {
                return new CloudFsVolumeImpl(this, storageID, currentDir, conf, storageType);
            }
            throw new UnsupportedOperationException("Cloud provider '" + this.getCloudProviderName() + "' is not supported");
        }
        return new FsVolumeImpl(this, storageID, currentDir, conf, storageType);
    }

    @Override
    public void invalidate(String bpid, RemovedBlock[] invalidBlks) throws IOException {
        ArrayList<String> errors = new ArrayList<String>();
        for (RemovedBlock b : invalidBlks) {
            if (!b.isProvidedBlock()) {
                super.invalidateBlock(bpid, (Block)b, errors);
                LOG.debug((Object)("HopsFS-Cloud. Not a provided block. Calling super to delete the block. Block: " + b));
                continue;
            }
            ReplicaInfo replicaInfo = this.volumeMap.get(bpid, (Block)b);
            boolean rbw = false;
            boolean inCloud = false;
            if (b.isProvidedBlock() && replicaInfo != null) {
                rbw = true;
                if (replicaInfo instanceof ProvidedReplicaBeingWritten) {
                    inCloud = ((ProvidedReplicaBeingWritten)replicaInfo).isSynced() || ((ProvidedReplicaBeingWritten)replicaInfo).isAppend() || ((ProvidedReplicaBeingWritten)replicaInfo).isRecovered();
                }
            } else if (b.isProvidedBlock() && replicaInfo == null) {
                inCloud = true;
            }
            if (rbw) {
                LOG.info((Object)("HopsFS-Cloud. Scheduling deletion of RBW Block: " + b));
                super.invalidateBlock(bpid, (Block)b, errors);
            }
            if (!inCloud) continue;
            LOG.info((Object)("HopsFS-Cloud. Scheduling deletion of Cloud Block: " + b));
            this.invalidateProvidedBlock(bpid, b, errors);
        }
        this.printInvalidationErrors(errors, invalidBlks.length);
    }

    private void invalidateProvidedBlock(String bpid, RemovedBlock invalidBlk, List<String> errors) throws IOException {
        FsVolumeImpl cloudVolume = this.getCloudVolume();
        if (cloudVolume == null) {
            errors.add("HopsFS-Cloud. Failed to delete replica " + invalidBlk);
        }
        File localBlkCopy = new File(cloudVolume.getCacheDir(bpid), CloudHelper.getBlockKey(this.prefixSize, (Block)invalidBlk));
        File localMetaFileCopy = new File(cloudVolume.getCacheDir(bpid), CloudHelper.getMetaFileKey(this.prefixSize, (Block)invalidBlk));
        LOG.info((Object)("HopsFS-Cloud. Scheduling async deletion of block: " + invalidBlk));
        File volumeDir = cloudVolume.getCurrentDir();
        this.asyncDiskService.deleteAsyncProvidedBlock(new ExtendedBlock(bpid, (Block)invalidBlk), invalidBlk.isDeleteCloudCopy(), this.cloud, localBlkCopy, localMetaFileCopy, volumeDir);
    }

    @Override
    FinalizedReplica updateReplicaUnderRecovery(String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength, String cloudBucket) throws IOException {
        LOG.info((Object)("HopsFS-Cloud. update replica under recovery rur: " + rur));
        if (!rur.isProvidedBlock()) {
            return super.updateReplicaUnderRecovery(bpid, rur, recoveryId, newBlockId, newlength, cloudBucket);
        }
        boolean uploadedToTheCloud = true;
        ReplicaInfo ri = this.volumeMap.get(bpid, rur.getBlockId());
        if (ri != null) {
            try {
                this.checkReplicaFilesInternal(ri);
                uploadedToTheCloud = true;
            }
            catch (IOException e) {
                super.checkReplicaFiles(ri);
                uploadedToTheCloud = false;
            }
        }
        if (!uploadedToTheCloud) {
            if (ri instanceof ProvidedReplicaUnderRecovery && ((ProvidedReplicaUnderRecovery)ri).isPartiallyUploaded()) {
                String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, ((ProvidedReplicaUnderRecovery)ri).getBlock());
                this.cloud.abortMultipartUpload(ri.getCloudBucket(), blockFileKey, ((ProvidedReplicaUnderRecovery)ri).getUploadID());
            }
            FinalizedReplica fr = super.updateReplicaUnderRecovery(bpid, rur, recoveryId, newBlockId, newlength, cloudBucket);
            this.uploadFinalizedBlockToCloud(bpid, fr);
            return fr;
        }
        return this.updateReplicaUnderRecoveryInternal(bpid, rur, recoveryId, newBlockId, newlength, cloudBucket);
    }

    private void uploadFinalizedBlockToCloud(String bpid, FinalizedReplica fr) throws IOException {
        ExtendedBlock eb = new ExtendedBlock(bpid, new Block(fr.getBlockId(), fr.getVisibleLength(), fr.getGenerationStamp(), fr.getCloudBucket()));
        this.preFinalizeInternal(eb);
        this.finalizeBlockInternal(eb);
    }

    FinalizedReplica updateReplicaUnderRecoveryInternal(String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength, String cloudBlock) throws IOException {
        boolean copyOnTruncate;
        if (rur.getRecoveryID() != recoveryId) {
            throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId + ", rur=" + rur);
        }
        boolean bl = copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
        if (copyOnTruncate) {
            throw new UnsupportedOperationException("Truncate using copy is not supported");
        }
        if (rur.getNumBytes() < newlength) {
            throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur);
        }
        LOG.info((Object)("HopsFS-Cloud. update replica under recovery rur: " + rur + ". Creating a new replica in the cloud"));
        if (rur.getNumBytes() >= newlength) {
            this.truncateProvidedBlock(bpid, rur, rur.getNumBytes(), newlength, recoveryId);
            rur.setNumBytesNoPersistance(newlength);
            rur.setGenerationStampNoPersistance(recoveryId);
        }
        return new FinalizedReplica(rur, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void truncateProvidedBlock(String bpid, ReplicaInfo rur, long oldlen, long newlen, long newGS) throws IOException {
        LOG.info((Object)("HopsFS-Cloud. Truncating a block: " + rur.getBlockId() + "_" + rur.getGenerationStamp()));
        Block bOld = new Block(rur.getBlockId(), rur.getNumBytes(), rur.getGenerationStamp(), rur.getCloudBucket());
        String oldBlkKey = CloudHelper.getBlockKey(this.prefixSize, bOld);
        String oldBlkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, bOld);
        if (newlen > oldlen) {
            throw new IOException("Cannot truncate block to from oldlen (=" + oldlen + ") to newlen (=" + newlen + ")");
        }
        FsVolumeImpl vol = this.getCloudVolume();
        File blockFile = new File(vol.getCacheDir(bpid), oldBlkKey);
        File metaFile = new File(vol.getCacheDir(bpid), oldBlkMetaKey);
        if (!blockFile.exists() || blockFile.length() != bOld.getNumBytes()) {
            blockFile.delete();
            this.cloud.downloadObject(rur.getCloudBucket(), oldBlkKey, blockFile);
            this.providedBlocksCacheUpdateTS(bpid, blockFile);
        }
        if (!metaFile.exists() || metaFile.length() <= 0L) {
            metaFile.delete();
            this.cloud.downloadObject(rur.getCloudBucket(), oldBlkMetaKey, metaFile);
            this.providedBlocksCacheUpdateTS(bpid, metaFile);
        }
        DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
        int checksumsize = dcs.getChecksumSize();
        int bpc = dcs.getBytesPerChecksum();
        long n = (newlen - 1L) / (long)bpc + 1L;
        long newmetalen = (long)BlockMetadataHeader.getHeaderSize() + n * (long)checksumsize;
        long lastchunkoffset = (n - 1L) * (long)bpc;
        int lastchunksize = (int)(newlen - lastchunkoffset);
        byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
        try (RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");){
            blockRAF.setLength(newlen);
            blockRAF.seek(lastchunkoffset);
            blockRAF.readFully(b, 0, lastchunksize);
        }
        dcs.update(b, 0, lastchunksize);
        dcs.writeValue(b, 0, false);
        try (RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");){
            metaRAF.setLength(newmetalen);
            metaRAF.seek(newmetalen - (long)checksumsize);
            metaRAF.write(b, 0, checksumsize);
        }
        LOG.info((Object)("HopsFS-Cloud. Truncated on disk copy of the block: " + bOld));
        Block bNew = new Block(rur.getBlockId(), newlen, newGS, rur.getCloudBucket());
        String newBlkKey = CloudHelper.getBlockKey(this.prefixSize, bNew);
        String newBlkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, bNew);
        if (this.cloud.objectExists(rur.getCloudBucket(), newBlkKey) || this.cloud.objectExists(rur.getCloudBucket(), newBlkMetaKey)) {
            LOG.error((Object)("HopsFS-Cloud. Block: " + b + " alreay exists."));
            throw new IOException("Block: " + b + " alreay exists.");
        }
        LOG.info((Object)("HopsFS-Cloud. Uploading Truncated Block: " + bNew));
        this.cloud.uploadObject(rur.getCloudBucket(), newBlkMetaKey, metaFile, this.getMetaMetadata(bNew, metaFile, blockFile));
        this.cloud.uploadObject(rur.getCloudBucket(), newBlkKey, blockFile, this.getBlockFileMetadata(bNew));
        LOG.info((Object)("HopsFS-Cloud. Deleting old block from cloud. Block: " + bOld));
        this.cloud.deleteObject(rur.getCloudBucket(), oldBlkKey);
        this.cloud.deleteObject(rur.getCloudBucket(), oldBlkMetaKey);
        LOG.info((Object)("HopsFS-Cloud. Deleting disk tmp copy: " + bOld));
        blockFile.delete();
        metaFile.delete();
        this.volumeMap.remove(bpid, bNew.getBlockId());
    }

    @Override
    public void checkReplicaFiles(ReplicaInfo r) throws IOException {
        try {
            this.checkReplicaFilesInternal(r);
        }
        catch (IOException e) {
            super.checkReplicaFiles(r);
        }
    }

    public void checkReplicaFilesInternal(ReplicaInfo r) throws IOException {
        Block b = new Block(r.getBlockId(), r.getNumBytes(), r.getGenerationStamp(), r.getCloudBucket());
        String blockKey = CloudHelper.getBlockKey(this.prefixSize, b);
        String metaKey = CloudHelper.getMetaFileKey(this.prefixSize, b);
        if (!this.cloud.objectExists(r.getCloudBucket(), blockKey)) {
            throw new IOException("Block: " + b + " not found in the cloud storage");
        }
        long blockSize = this.cloud.getObjectSize(r.getCloudBucket(), blockKey);
        if (blockSize != r.getNumBytes()) {
            throw new IOException("File length mismatched. Expected: " + r.getNumBytes() + " Got: " + blockSize);
        }
        if (!this.cloud.objectExists(r.getCloudBucket(), metaKey)) {
            throw new IOException("Meta Object for Block: " + b + " not found in the cloud storage");
        }
        long metaFileSize = this.cloud.getObjectSize(r.getCloudBucket(), metaKey);
        if (metaFileSize == 0L) {
            throw new IOException("Metafile is empty. Block: " + b);
        }
    }

    @Override
    public synchronized FsVolumeImpl getVolume(ExtendedBlock b) {
        if (!b.isProvidedBlock()) {
            return super.getVolume(b);
        }
        return this.getVolumeInternal(b);
    }

    @Override
    public Map<DatanodeStorage, BlockReport> getBlockReports(String bpid) {
        return super.getBlockReports(bpid);
    }

    public synchronized FsVolumeImpl getVolumeInternal(ExtendedBlock b) {
        if (!b.isProvidedBlock()) {
            return super.getVolume(b);
        }
        return this.getCloudVolume();
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.cloud.shutdown();
    }

    @Override
    public synchronized ReplicaHandler createRbw(StorageType storageType, ExtendedBlock b) throws IOException {
        ReplicaHandler handler = super.createRbw(storageType, b);
        FsVolumeReference ref = handler.getVolumeReference();
        ProvidedReplicaBeingWritten providedReplicaBeingWritten = new ProvidedReplicaBeingWritten((ReplicaBeingWritten)handler.getReplica(), this.cloud.getPartSize());
        this.volumeMap.add(b.getBlockPoolId(), providedReplicaBeingWritten);
        return new ReplicaHandler(providedReplicaBeingWritten, ref);
    }

    @Override
    public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException {
        if (block.isProvidedBlock()) {
            throw new IOException("Moving files stored in the cloud is not supported");
        }
        if (targetStorageType != StorageType.CLOUD) {
            return super.moveBlockAcrossStorage(block, targetStorageType);
        }
        LOG.info((Object)("HopsFS-Cloud. Moving block: " + block + " to " + targetStorageType));
        ReplicaInfo replicaInfo = this.getReplicaInfo(block);
        if (replicaInfo.getState() != HdfsServerConstants.ReplicaState.FINALIZED) {
            throw new ReplicaNotFoundException("Cannot append to an unfinalized replica " + block);
        }
        if (replicaInfo.getNumBytes() != block.getNumBytes()) {
            throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + block.getNumBytes());
        }
        String bucket = block.getCloudBucket();
        if (block.getCloudBucket().equals("")) {
            List<String> buckets = CloudHelper.getBucketsFromConf(this.conf);
            if (buckets.size() > 0) {
                bucket = buckets.get(0);
                block.setCloudBucket(bucket);
            } else {
                String error = "HopsFS-Cloud. Moving block: " + block + ". Bucket not set";
                LOG.error((Object)error);
                throw new IOException(error);
            }
        }
        FinalizedReplica newReplicaInfo = new FinalizedReplica(replicaInfo, this.getCloudVolume(), this.getCloudVolume().getCacheDir(block.getBlockPoolId()));
        newReplicaInfo.setCloudBucketNoPersistance(bucket);
        ExtendedBlock extendedBlock = new ExtendedBlock(block.getBlockPoolId(), (Block)newReplicaInfo);
        String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, block.getLocalBlock());
        String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, block.getLocalBlock());
        if (this.cloud.objectExists(block.getCloudBucket(), blockFileKey) && this.cloud.objectExists(block.getCloudBucket(), metaFileKey)) {
            LOG.info((Object)("HopsFS-Cloud. Block " + block + " has already been moved to the cloud"));
            return null;
        }
        LOG.info((Object)("HopsFS-Cloud. Moving Block: " + block + " to the cloud."));
        FsVolumeImpl vol = this.getCloudVolume();
        File cachedBlockFile = new File(vol.getCacheDir(block.getBlockPoolId()), blockFileKey);
        File cachedMetaFile = new File(vol.getCacheDir(block.getBlockPoolId()), metaFileKey);
        this.cloud.uploadObject(block.getCloudBucket(), metaFileKey, replicaInfo.getMetaFile(), this.getMetaMetadata(block.getLocalBlock(), replicaInfo.getMetaFile(), replicaInfo.getBlockFile()));
        this.cloud.uploadObject(block.getCloudBucket(), blockFileKey, replicaInfo.getBlockFile(), this.getBlockFileMetadata(block.getLocalBlock()));
        this.datanode.getShortCircuitRegistry().processBlockInvalidation(ExtendedBlockId.fromExtendedBlock(extendedBlock));
        this.datanode.notifyNamenodeBlockMovedToCloud(extendedBlock, replicaInfo.getStorageUuid(), newReplicaInfo.getStorageUuid());
        this.moveToCache(replicaInfo.getBlockFile(), cachedBlockFile, block.getBlockPoolId());
        this.moveToCache(replicaInfo.getMetaFile(), cachedMetaFile, block.getBlockPoolId());
        this.volumeMap.remove(block.getBlockPoolId(), block.getBlockId());
        return replicaInfo;
    }

    @Override
    public synchronized long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
        if (!b.isProvidedBlock() || b.isProvidedBlock() && this.volumeMap.get(b.getBlockPoolId(), b.getBlockId()) != null) {
            return super.getReplicaVisibleLength(b);
        }
        try {
            String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
            Map<String, String> metadata = this.cloud.getUserMetaData(b.getCloudBucket(), metaFileKey);
            long size = Long.parseLong(metadata.get(OBJECT_SIZE));
            long genStamp = Long.parseLong(metadata.get(GEN_STAMP));
            if (genStamp < b.getGenerationStamp()) {
                throw new IOException("cloud.getGenerationStamp() < block.getGenerationStamp(), block=" + b + ", cloud GS =" + genStamp);
            }
            return size;
        }
        catch (IOException e) {
            LOG.info((Object)("HopsFS-Cloud. Unable to get the length of the replica from the cloud. " + e));
            throw new ReplicaNotFoundException("Cannot append to a non-existent replica " + b.getBlockPoolId() + ":" + b.getBlockId());
        }
    }

    @Override
    public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
        if (!b.isProvidedBlock()) {
            return super.append(b, newGS, expectedBlockLen);
        }
        this.checkRBWForAppend(b, newGS, expectedBlockLen);
        if (newGS < b.getGenerationStamp()) {
            throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp");
        }
        ReplicaInfo replicaInfo = this.getReplica(b);
        LOG.info((Object)("HopsFS-Cloud. Appending to " + replicaInfo));
        if (replicaInfo.getNumBytes() != expectedBlockLen) {
            throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + expectedBlockLen);
        }
        FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
        ReplicaBeingWritten replica = null;
        replica = this.appendInternal(b.getBlockPoolId(), b, (FinalizedReplica)replicaInfo, newGS, expectedBlockLen);
        return new ReplicaHandler(replica, ref);
    }

    private void checkRBWForAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
        ReplicaInfo replicaInfo = this.volumeMap.get(b.getBlockPoolId(), b.getBlockId());
        if (replicaInfo != null) {
            String metaFileKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
            String blockFileKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
            if (this.cloud.objectExists(b.getCloudBucket(), blockFileKey) && this.cloud.objectExists(b.getCloudBucket(), metaFileKey)) {
                Map<String, String> metadata = this.cloud.getUserMetaData(b.getCloudBucket(), metaFileKey);
                long genStamp = Long.parseLong(metadata.get(GEN_STAMP));
                long size = Long.parseLong(metadata.get(OBJECT_SIZE));
                if (genStamp == b.getGenerationStamp() && size == expectedBlockLen) {
                    this.deleteRBWImmediately(b, replicaInfo);
                    return;
                }
            }
            if (replicaInfo.getState() != HdfsServerConstants.ReplicaState.FINALIZED) {
                throw new ReplicaNotFoundException("Cannot append to an unfinalized replica " + b);
            }
        }
    }

    private void deleteRBWImmediately(ExtendedBlock b, ReplicaInfo rinfo) throws IOException {
        RemovedBlock[] rbs = new RemovedBlock[]{new RemovedBlock(b.getBlockId(), Long.MAX_VALUE, rinfo.getGenerationStamp(), b.getCloudBucket(), false)};
        this.invalidate(b.getBlockPoolId(), rbs);
        LOG.info((Object)("HopsFS-Cloud: Deleting existing RBW for append operation. blk: " + b));
        while (this.volumeMap.get(b.getBlockPoolId(), b.getBlockId()) != null) {
            try {
                Thread.sleep(100L);
                LOG.info((Object)("HopsFS-Cloud: waiting for the RBW block to be removed. blk: " + b));
            }
            catch (InterruptedException e) {
                LOG.info((Object)e, (Throwable)e);
            }
        }
    }

    private synchronized ReplicaBeingWritten appendInternal(String bpid, ExtendedBlock block, FinalizedReplica replicaInfo, long newGS, long expectedBlkLen) throws IOException {
        FsVolumeImpl v;
        String oldBlkKey = CloudHelper.getBlockKey(this.prefixSize, block.getLocalBlock());
        String oldBlkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, block.getLocalBlock());
        FsVolumeImpl vol = this.getCloudVolume();
        File oldBlockFile = new File(vol.getCacheDir(bpid), oldBlkKey);
        File oldMetaFile = new File(vol.getCacheDir(bpid), oldBlkMetaKey);
        if (!oldBlockFile.exists() || oldBlockFile.length() != expectedBlkLen) {
            LOG.info((Object)("HopsFS-Cloud. Downloading the block again from cloud for append op. Block: " + block + " Disk size: " + oldBlockFile.length() + " Expected bytes: " + expectedBlkLen));
            oldBlockFile.delete();
            this.cloud.downloadObject(block.getCloudBucket(), oldBlkKey, oldBlockFile);
            this.providedBlocksCacheUpdateTS(bpid, oldBlockFile);
            oldMetaFile.delete();
            this.cloud.downloadObject(block.getCloudBucket(), oldBlkMetaKey, oldMetaFile);
            this.providedBlocksCacheUpdateTS(bpid, oldMetaFile);
        }
        if ((v = (FsVolumeImpl)replicaInfo.getVolume()).getAvailable() < expectedBlkLen - replicaInfo.getNumBytes()) {
            throw new DiskChecker.DiskOutOfSpaceException("Insufficient space for appending to " + replicaInfo);
        }
        File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
        ReplicaBeingWritten replicaBeingWritten = new ReplicaBeingWritten(replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, replicaInfo.getCloudBucket(), v, newBlkFile.getParentFile(), Thread.currentThread(), expectedBlkLen);
        ProvidedReplicaBeingWritten newReplicaInfo = new ProvidedReplicaBeingWritten(replicaBeingWritten, this.cloud.getPartSize());
        File newmeta = newReplicaInfo.getMetaFile();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("HopsFS-Cloud. Renaming " + oldMetaFile + " to " + newmeta));
        }
        Block newBlock = new Block(block.getBlockId(), oldBlockFile.length(), newGS, block.getCloudBucket());
        String newBlkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, newBlock);
        HashMap<String, String> newBlkMetaFileMetaData = this.getMetaMetadata(newBlock, oldMetaFile, oldBlockFile);
        try {
            this.copyCloudObject(block.getCloudBucket(), oldBlkMetaKey, newBlkMetaKey, oldMetaFile, newBlkMetaFileMetaData);
            NativeIO.renameTo((File)oldMetaFile, (File)newmeta);
        }
        catch (IOException e) {
            throw new IOException("Block " + replicaInfo + " reopen failed.  Unable to move meta file  " + oldMetaFile + " to rbw dir " + newmeta, e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("HopsFS-Cloud. Renaming " + oldBlockFile + " to " + newBlkFile + ", file length=" + oldBlockFile.length()));
        }
        try {
            String newBlkKey = CloudHelper.getBlockKey(this.prefixSize, newBlock);
            HashMap<String, String> newBlkMetaData = this.getBlockFileMetadata(newBlock);
            this.copyCloudObject(block.getCloudBucket(), oldBlkKey, newBlkKey, oldBlockFile, newBlkMetaData);
            NativeIO.renameTo((File)oldBlockFile, (File)newBlkFile);
        }
        catch (IOException e) {
            try {
                NativeIO.renameTo((File)newmeta, (File)oldMetaFile);
                this.cloud.deleteObject(block.getCloudBucket(), newBlkMetaKey);
            }
            catch (IOException ex) {
                LOG.warn((Object)("Cannot move meta file " + newmeta + "back to the finalized directory " + oldMetaFile), (Throwable)ex);
            }
            throw new IOException("Block " + replicaInfo + " reopen failed.  Unable to move block file " + oldBlockFile + " to rbw dir " + newBlkFile, e);
        }
        newReplicaInfo.setAppend(true);
        newReplicaInfo.addOldGS(block.getGenerationStamp());
        this.volumeMap.add(bpid, newReplicaInfo);
        v.reserveSpaceForRbw(expectedBlkLen - replicaInfo.getNumBytes());
        return newReplicaInfo;
    }

    @Override
    public synchronized ReplicaHandler recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
        ReplicaBeingWritten replica;
        if (!b.isProvidedBlock()) {
            return super.recoverAppend(b, newGS, expectedBlockLen);
        }
        LOG.info((Object)("HopsFS-Cloud. Recover failed append operation. block " + b));
        ReplicaInfo replicaInfo = this.getReplica(b);
        this.recoverCheck(replicaInfo, b, newGS, expectedBlockLen);
        FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
        try {
            if (replicaInfo.getState() != HdfsServerConstants.ReplicaState.FINALIZED) {
                throw new UnsupportedOperationException("Appending to RBW replica is not supported for cloud");
            }
            replica = this.appendInternal(b.getBlockPoolId(), b, (FinalizedReplica)replicaInfo, newGS, expectedBlockLen);
        }
        catch (IOException e) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{ref});
            throw e;
        }
        return new ReplicaHandler(replica, ref);
    }

    @Override
    public synchronized String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
        ReplicaInfo replicaInfo = null;
        if (!b.isProvidedBlock()) {
            return super.recoverClose(b, newGS, expectedBlockLen);
        }
        LOG.info((Object)("HopsFS-Cloud. Recover Close RBW replica " + b));
        try {
            replicaInfo = this.getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
        }
        catch (ReplicaNotFoundException e) {
            replicaInfo = this.createRbw(b);
        }
        replicaInfo = this.recoverCheck(replicaInfo, b, newGS, expectedBlockLen);
        this.bumpReplicaGS(replicaInfo, newGS);
        Block bockNewGS = new Block(b.getBlockId(), expectedBlockLen, newGS, b.getCloudBucket());
        ExtendedBlock newEB = new ExtendedBlock(b.getBlockPoolId(), bockNewGS);
        this.finalizeBlock(newEB);
        return replicaInfo.getStorageUuid();
    }

    @Override
    public synchronized ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
        ReplicaInfo replicaInfo = null;
        long oldGS = b.getGenerationStamp();
        if (!b.isProvidedBlock()) {
            return super.recoverRbw(b, newGS, minBytesRcvd, maxBytesRcvd);
        }
        LOG.info((Object)("HopsFS-Cloud. Recover RBW replica " + b));
        try {
            replicaInfo = this.getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
        }
        catch (ReplicaNotFoundException e) {
            replicaInfo = this.createRbw(b);
        }
        if (replicaInfo.getState() != HdfsServerConstants.ReplicaState.RBW) {
            throw new ReplicaNotFoundException("Cannot recover a non-RBW replica " + replicaInfo);
        }
        ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
        LOG.info((Object)("HopsFS-Cloud. Recovering " + rbw));
        rbw.stopWriter(this.datanode.getDnConf().getXceiverStopTimeout());
        rbw.setWriter(Thread.currentThread());
        long replicaGenerationStamp = rbw.getGenerationStamp();
        if (replicaGenerationStamp < b.getGenerationStamp() || replicaGenerationStamp > newGS) {
            throw new ReplicaNotFoundException("Cannot append to a replica with unexpected generation stamp " + b + ". Expected GS range is [" + b.getGenerationStamp() + ", " + newGS + "].");
        }
        long bytesAcked = rbw.getBytesAcked();
        long numBytes = rbw.getNumBytes();
        if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
            throw new ReplicaNotFoundException("Unmatched length replica " + replicaInfo + ": BytesAcked = " + bytesAcked + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "].");
        }
        FsVolumeReference ref = rbw.getVolume().obtainReference();
        try {
            if (numBytes > bytesAcked) {
                File replicafile = rbw.getBlockFile();
                CloudFsDatasetImpl.truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
                rbw.setNumBytesNoPersistance(bytesAcked);
                rbw.setLastChecksumAndDataLen(bytesAcked, null);
            }
            this.bumpReplicaGS(rbw, newGS);
            try {
                Block newBlock = new Block(b.getBlockId(), rbw.getNumBytes(), newGS, b.getCloudBucket());
                String newBlkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, newBlock);
                HashMap<String, String> newBlkMetaFileMetaData = this.getMetaMetadata(newBlock, rbw.getMetaFile(), rbw.getBlockFile());
                this.cloud.uploadObject(b.getCloudBucket(), newBlkMetaKey, rbw.getMetaFile(), newBlkMetaFileMetaData);
                String newBlkKey = CloudHelper.getBlockKey(this.prefixSize, newBlock);
                HashMap<String, String> newBlkMetaData = this.getBlockFileMetadata(newBlock);
                this.cloud.uploadObject(b.getCloudBucket(), newBlkKey, rbw.getBlockFile(), newBlkMetaData);
                assert (rbw instanceof ProvidedReplicaBeingWritten);
                ((ProvidedReplicaBeingWritten)rbw).setRecovered(true);
                ((ProvidedReplicaBeingWritten)rbw).addOldGS(oldGS);
            }
            catch (IOException e) {
                throw new IOException("Failed to upload  to cloud. block " + b + " newGS: " + newGS, e);
            }
        }
        catch (IOException e) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{ref});
            throw e;
        }
        return new ReplicaHandler(rbw, ref);
    }

    private ReplicaInfo createRbw(ExtendedBlock b) throws IOException {
        ReplicaInfo replicaInfoCloud = this.getReplica(b);
        String blkKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        String blkMetaKey = CloudHelper.getMetaFileKey(this.prefixSize, b.getLocalBlock());
        FsVolumeImpl vol = this.getCloudVolume();
        File blockFile = new File(vol.getCacheDir(b.getBlockPoolId()), blkKey);
        File metaFile = new File(vol.getCacheDir(b.getBlockPoolId()), blkMetaKey);
        if (!blockFile.exists() || blockFile.length() != b.getNumBytes()) {
            blockFile.delete();
            this.cloud.downloadObject(b.getCloudBucket(), blkKey, blockFile);
            this.providedBlocksCacheUpdateTS(b.getBlockPoolId(), blockFile);
        }
        if (!metaFile.exists() || metaFile.length() <= 0L) {
            metaFile.delete();
            this.cloud.downloadObject(b.getCloudBucket(), blkMetaKey, metaFile);
            this.providedBlocksCacheUpdateTS(b.getBlockPoolId(), metaFile);
        }
        FsVolumeImpl v = (FsVolumeImpl)replicaInfoCloud.getVolume();
        File newBlkFile = new File(v.getRbwDir(b.getBlockPoolId()), replicaInfoCloud.getBlockName());
        ReplicaBeingWritten replicaBeingWritten = new ReplicaBeingWritten(replicaInfoCloud.getBlockId(), replicaInfoCloud.getNumBytes(), replicaInfoCloud.getGenerationStamp(), replicaInfoCloud.getCloudBucket(), v, newBlkFile.getParentFile(), Thread.currentThread(), b.getNumBytes());
        ProvidedReplicaBeingWritten replicaInfo = new ProvidedReplicaBeingWritten(replicaBeingWritten, this.cloud.getPartSize());
        File newmeta = replicaInfo.getMetaFile();
        NativeIO.renameTo((File)metaFile, (File)newmeta);
        NativeIO.renameTo((File)blockFile, (File)newBlkFile);
        this.volumeMap.add(b.getBlockPoolId(), replicaInfo);
        return replicaInfo;
    }

    private void copyCloudObject(String bucket, String srcKey, String dstKey, File srcFile, Map<String, String> newObjMetadata) throws IOException {
        if (this.getCloudProviderName().compareToIgnoreCase(CloudProvider.AWS.name()) == 0) {
            this.cloud.copyObject(bucket, bucket, srcKey, dstKey, newObjMetadata);
        } else if (this.getCloudProviderName().compareToIgnoreCase(CloudProvider.AZURE.name()) == 0) {
            this.cloud.uploadObject(bucket, dstKey, srcFile, newObjMetadata);
        } else if (this.getCloudProviderName().compareToIgnoreCase(CloudProvider.GCS.name()) == 0) {
            this.cloud.copyObject(bucket, bucket, srcKey, dstKey, newObjMetadata);
        } else {
            throw new UnsupportedOperationException("Cloud provider '" + this.getCloudProviderName() + "' is not supported");
        }
    }

    private boolean moveToCache(File from, File to, String bpid) throws IOException {
        if (this.bypassCache) {
            from.delete();
            return false;
        }
        File toBlockParent = new File(to.getParent());
        if (!toBlockParent.exists()) {
            toBlockParent.mkdir();
        }
        if (from.renameTo(to)) {
            LOG.debug((Object)("HopsFS-Cloud. Block file " + from + " moved to cloud cache location " + to));
            this.providedBlocksCacheUpdateTS(bpid, to);
            return true;
        }
        String error = "HopsFS-Cloud. Moving file: " + from + " to " + to + " failed";
        LOG.error((Object)error);
        return false;
    }

    public boolean existsInCloud(ExtendedBlock b) throws IOException {
        String blockKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        return this.cloud.objectExists(b.getCloudBucket(), blockKey);
    }

    public boolean skipMultipartUpload(ExtendedBlock b) throws IOException {
        ReplicaInfo replicaInfo = this.getReplicaInfo(b);
        if (replicaInfo instanceof ProvidedReplicaBeingWritten) {
            return ((ProvidedReplicaBeingWritten)replicaInfo).isSynced() || ((ProvidedReplicaBeingWritten)replicaInfo).isAppend() || ((ProvidedReplicaBeingWritten)replicaInfo).isRecovered();
        }
        return false;
    }

    public void uploadPart(ExtendedBlock b) throws IOException {
        ProvidedReplicaBeingWritten pReplicaInfo = (ProvidedReplicaBeingWritten)this.getReplicaInfo(b);
        if (!pReplicaInfo.isPartAvailable()) {
            throw new IOException("Not enough data available for multipart upload");
        }
        String blockKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        int partId = pReplicaInfo.incrementAndGetNextPart();
        if (partId == 1) {
            if (!this.cloud.objectExists(b.getCloudBucket(), blockKey)) {
                UploadID uploadID = this.cloud.startMultipartUpload(b.getCloudBucket(), blockKey, this.getBlockFileMetadata(b.getLocalBlock()));
                pReplicaInfo.setUploadID(uploadID);
                pReplicaInfo.setMultipart(true);
            } else {
                LOG.error((Object)("HopsFS-Cloud. Block: " + b + " alreay exists."));
                throw new IOException("Block: " + b + " alreay exists.");
            }
        }
        File blockFile = pReplicaInfo.getBlockFile();
        long start = (long)(partId - 1) * pReplicaInfo.getPartSize();
        long end = (long)partId * pReplicaInfo.getPartSize();
        PartUploadWorker worker = new PartUploadWorker(pReplicaInfo, b.getCloudBucket(), blockKey, pReplicaInfo.getUploadID(), partId, blockFile, start, end);
        pReplicaInfo.addUploadTask(this.threadPoolExecutor.submit(worker));
    }

    public void finalizeMultipartUpload(ExtendedBlock b) throws IOException {
        ProvidedReplicaBeingWritten pReplicaInfo = (ProvidedReplicaBeingWritten)this.getReplicaInfo(b);
        if (pReplicaInfo.isCancellMultipart()) {
            return;
        }
        assert (pReplicaInfo.isMultipart());
        long currentPart = pReplicaInfo.getCurrentPart();
        String blockKey = CloudHelper.getBlockKey(this.prefixSize, b.getLocalBlock());
        if (pReplicaInfo.getBytesOnDisk() > currentPart * pReplicaInfo.getPartSize()) {
            File blockFile = pReplicaInfo.getBlockFile();
            int newPartID = pReplicaInfo.incrementAndGetNextPart();
            long start = currentPart * pReplicaInfo.getPartSize();
            long end = pReplicaInfo.getBytesOnDisk();
            PartUploadWorker worker = new PartUploadWorker(pReplicaInfo, b.getCloudBucket(), blockKey, pReplicaInfo.getUploadID(), newPartID, blockFile, start, end);
            pReplicaInfo.addUploadTask(this.threadPoolExecutor.submit(worker));
        }
        this.waitForPartsUpload(pReplicaInfo);
        this.cloud.finalizeMultipartUpload(b.getCloudBucket(), blockKey, pReplicaInfo.getUploadID(), pReplicaInfo.getPartETags());
        pReplicaInfo.setMultipartComplete(true);
        LOG.info((Object)"HopsFS-Cloud. Finalized the multipart upload ");
    }

    private void waitForPartsUpload(ProvidedReplicaBeingWritten prbw) throws IOException {
        if (prbw.isCancellMultipart()) {
            return;
        }
        for (Future future : prbw.getAllUploadTasks()) {
            try {
                PartRef tag = (PartRef)future.get();
                prbw.addEtag(tag);
            }
            catch (ExecutionException e) {
                LOG.error((Object)"Exception was thrown during uploading a block to cloud", (Throwable)e);
                Throwable throwable = e.getCause();
                if (throwable instanceof IOException) {
                    throw (IOException)throwable;
                }
                throw new IOException(e);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @VisibleForTesting
    public FsVolumeImpl getCloudVolume() {
        for (FsVolumeImpl vol : this.getVolumes()) {
            if (vol.getStorageType() != StorageType.CLOUD) continue;
            return vol;
        }
        return null;
    }

    @VisibleForTesting
    public HashMap<String, String> getBlockFileMetadata(Block b) {
        HashMap<String, String> metadata = new HashMap<String, String>();
        return metadata;
    }

    @VisibleForTesting
    public HashMap<String, String> getMetaMetadata(Block b, File metaFile, File blockFile) {
        HashMap<String, String> metadata = new HashMap<String, String>();
        metadata.put(GEN_STAMP, Long.toString(b.getGenerationStamp()));
        metadata.put(OBJECT_SIZE, Long.toString(b.getNumBytes()));
        metadata.put(META_FILE_SIZE, Long.toString(metaFile.length()));
        metadata.put(BLOCK_FILE_SIZE, Long.toString(blockFile.length()));
        return metadata;
    }

    @VisibleForTesting
    public HashMap<String, String> getMetaMetadataRBW(ReplicaInfo b, File metaFile, File blockFile) {
        HashMap<String, String> metadata = new HashMap<String, String>();
        metadata.put(GEN_STAMP, Long.toString(b.getGenerationStamp()));
        metadata.put(OBJECT_SIZE, Long.toString(b.getBlockFile().length()));
        metadata.put(META_FILE_SIZE, Long.toString(metaFile.length()));
        metadata.put(BLOCK_FILE_SIZE, Long.toString(blockFile.length()));
        return metadata;
    }

    public void providedBlocksCacheUpdateTS(String bpid, File f) throws IOException {
        FsVolumeImpl cloudVolume = this.getCloudVolume();
        cloudVolume.getBlockPoolSlice(bpid).fileAccessed(f);
    }

    public void providedBlocksCacheDelete(String bpid, File f) throws IOException {
        FsVolumeImpl cloudVolume = this.getCloudVolume();
        cloudVolume.getBlockPoolSlice(bpid).fileDeleted(f);
    }

    @VisibleForTesting
    public CloudPersistenceProvider getCloudConnector() {
        return this.cloud;
    }

    @VisibleForTesting
    public void installMockCloudConnector(CloudPersistenceProvider mock) {
        this.cloud = mock;
    }

    @VisibleForTesting
    public boolean replicaExistsInVolumeMap(String bpid, long blockID) {
        ReplicaInfo replicaInfo = this.volumeMap.get(bpid, blockID);
        return replicaInfo != null;
    }

    @VisibleForTesting
    public int getOpenReplicasCount(String bpid) {
        return this.volumeMap.size(bpid);
    }

    class PartUploadWorker
    implements Callable {
        private final String bucket;
        private final String key;
        private final UploadID uploadID;
        private final int partID;
        private final File file;
        private final long startPos;
        private final long endPos;
        private final ProvidedReplicaBeingWritten replicaInfo;

        PartUploadWorker(ProvidedReplicaBeingWritten replicaInfo, String bucket, String key, UploadID uploadID, int partID, File file, long startPos, long endPos) {
            this.replicaInfo = replicaInfo;
            this.bucket = bucket;
            this.key = key;
            this.uploadID = uploadID;
            this.partID = partID;
            this.file = file;
            this.startPos = startPos;
            this.endPos = endPos;
        }

        public Object call() throws Exception {
            if (this.replicaInfo.isCancellMultipart()) {
                return null;
            }
            PartRef etag = CloudFsDatasetImpl.this.cloud.uploadPart(this.bucket, this.key, this.uploadID, this.partID, this.file, this.startPos, this.endPos);
            LOG.info((Object)("HopsFS-Cloud. Part id to upload " + this.partID + " start " + this.startPos + " end " + this.endPos + " payload size " + (this.endPos - this.startPos) + " Src File " + this.file.getName()));
            if (this.replicaInfo.isCancellMultipart()) {
                CloudFsDatasetImpl.this.cloud.abortMultipartUpload(this.bucket, this.key, this.uploadID);
                return null;
            }
            return etag;
        }
    }
}

