package org.apache.hudi.common.util.collection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap.class */
public final class BitCaskDiskMap<T extends Serializable, R> extends DiskMap<T, R> {
    public static final int BUFFER_SIZE = 131072;
    private static final Logger LOG = LoggerFactory.getLogger(BitCaskDiskMap.class);
    private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF = ThreadLocal.withInitial(CompressionHandler::new);
    private final Map<T, ValueMetadata> valueMetadataMap;
    private final boolean isCompressionEnabled;
    private final File writeOnlyFile;
    private final SizeAwareDataOutputStream writeOnlyFileHandle;
    private final FileOutputStream fileOutputStream;
    private final AtomicLong filePosition;
    private final String filePath;
    private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile;
    private final Queue<BufferedRandomAccessFile> openedAccessFiles;
    private final List<ClosableIterator<R>> iterators;
    private final CustomSerializer<R> valueSerializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$CompressionHandler.class */
    public static class CompressionHandler implements Serializable {
        private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
        private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192;
        private final ByteArrayOutputStream compressBaos = new ByteArrayOutputStream(1048576);
        private final ByteArrayOutputStream decompressBaos = new ByteArrayOutputStream(1048576);
        private final byte[] decompressIntermediateBuffer = new byte[8192];

        CompressionHandler() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public byte[] compressBytes(byte[] bArr) throws IOException {
            this.compressBaos.reset();
            Deflater deflater = new Deflater(9);
            DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(this.compressBaos, deflater);
            try {
                deflaterOutputStream.write(bArr);
                deflaterOutputStream.close();
                deflater.end();
                return this.compressBaos.toByteArray();
            } catch (Throwable th) {
                deflaterOutputStream.close();
                deflater.end();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public byte[] decompressBytes(byte[] bArr) throws IOException {
            this.decompressBaos.reset();
            try {
                InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(bArr));
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            int read = inflaterInputStream.read(this.decompressIntermediateBuffer);
                            if (read <= 0) {
                                break;
                            }
                            this.decompressBaos.write(this.decompressIntermediateBuffer, 0, read);
                        } finally {
                        }
                    } finally {
                    }
                }
                byte[] byteArray = this.decompressBaos.toByteArray();
                if (inflaterInputStream != null) {
                    if (0 != 0) {
                        try {
                            inflaterInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        inflaterInputStream.close();
                    }
                }
                return byteArray;
            } catch (IOException e) {
                throw new HoodieIOException("IOException while decompressing bytes", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$FileEntry.class */
    public static final class FileEntry {
        private final Long crc;
        private final Integer sizeOfKey;
        private final Integer sizeOfValue;
        private final byte[] key;
        private final byte[] value;
        private final Long timestamp;

        public FileEntry(long j, int i, int i2, byte[] bArr, byte[] bArr2, long j2) {
            this.crc = Long.valueOf(j);
            this.sizeOfKey = Integer.valueOf(i);
            this.sizeOfValue = Integer.valueOf(i2);
            this.key = bArr;
            this.value = bArr2;
            this.timestamp = Long.valueOf(j2);
        }

        public long getCrc() {
            return this.crc.longValue();
        }

        public int getSizeOfKey() {
            return this.sizeOfKey.intValue();
        }

        public int getSizeOfValue() {
            return this.sizeOfValue.intValue();
        }

        public byte[] getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public long getTimestamp() {
            return this.timestamp.longValue();
        }
    }

    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$ValueMetadata.class */
    public static final class ValueMetadata implements Comparable<ValueMetadata> {
        private final String filePath;
        private final Integer sizeOfValue;
        private final Long offsetOfValue;
        private final Long timestamp;

        protected ValueMetadata(String str, int i, long j, long j2) {
            this.filePath = str;
            this.sizeOfValue = Integer.valueOf(i);
            this.offsetOfValue = Long.valueOf(j);
            this.timestamp = Long.valueOf(j2);
        }

        public String getFilePath() {
            return this.filePath;
        }

        public int getSizeOfValue() {
            return this.sizeOfValue.intValue();
        }

        public Long getOffsetOfValue() {
            return this.offsetOfValue;
        }

        public long getTimestamp() {
            return this.timestamp.longValue();
        }

        @Override // java.lang.Comparable
        public int compareTo(ValueMetadata valueMetadata) {
            return Long.compare(this.offsetOfValue.longValue(), valueMetadata.offsetOfValue.longValue());
        }
    }

    public BitCaskDiskMap(String str, CustomSerializer<R> customSerializer, boolean z) throws IOException {
        super(str, ExternalSpillableMap.DiskMapType.BITCASK.name());
        this.randomAccessFile = new ThreadLocal<>();
        this.openedAccessFiles = new ConcurrentLinkedQueue();
        this.iterators = new ArrayList();
        this.valueMetadataMap = new ConcurrentHashMap();
        this.isCompressionEnabled = z;
        this.writeOnlyFile = new File(this.diskMapPath, UUID.randomUUID().toString());
        this.filePath = this.writeOnlyFile.getPath();
        initFile(this.writeOnlyFile);
        this.fileOutputStream = new FileOutputStream(this.writeOnlyFile, true);
        this.writeOnlyFileHandle = new SizeAwareDataOutputStream(this.fileOutputStream, 131072);
        this.filePosition = new AtomicLong(0L);
        this.valueSerializer = customSerializer;
    }

    private BufferedRandomAccessFile getRandomAccessFile() {
        try {
            BufferedRandomAccessFile bufferedRandomAccessFile = this.randomAccessFile.get();
            if (bufferedRandomAccessFile == null) {
                bufferedRandomAccessFile = new BufferedRandomAccessFile(this.filePath, "r", 131072);
                bufferedRandomAccessFile.seek(0L);
                this.randomAccessFile.set(bufferedRandomAccessFile);
                this.openedAccessFiles.offer(bufferedRandomAccessFile);
            }
            return bufferedRandomAccessFile;
        } catch (IOException e) {
            throw new HoodieException(e);
        }
    }

    private void initFile(File file) throws IOException {
        if (file.exists()) {
            file.delete();
        }
        if (!file.getParentFile().exists()) {
            file.getParentFile().mkdir();
        }
        file.createNewFile();
        LOG.debug("Spilling to file location " + file.getAbsolutePath());
        file.deleteOnExit();
    }

    private void flushToDisk() {
        try {
            this.writeOnlyFileHandle.flush();
        } catch (IOException e) {
            throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e);
        }
    }

    @Override // java.lang.Iterable
    public Iterator<R> iterator() {
        ClosableIterator<R> it = new LazyFileIterable(this.filePath, this.valueMetadataMap, this.valueSerializer, this.isCompressionEnabled).iterator();
        this.iterators.add(it);
        return it;
    }

    @Override // org.apache.hudi.common.util.collection.KeyFilteringIterable
    public Iterator<R> iterator(Predicate<T> predicate) {
        ClosableIterator<R> it = new LazyFileIterable(this.filePath, (Map) this.valueMetadataMap.entrySet().stream().filter(entry -> {
            return predicate.test(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })), this.valueSerializer, this.isCompressionEnabled).iterator();
        this.iterators.add(it);
        return it;
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap
    public long sizeOfFileOnDiskInBytes() {
        return this.filePosition.get();
    }

    @Override // java.util.Map
    public int size() {
        return this.valueMetadataMap.size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        return this.valueMetadataMap.isEmpty();
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        return this.valueMetadataMap.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        throw new HoodieNotSupportedException("unable to compare values in map");
    }

    @Override // java.util.Map
    public R get(Object obj) {
        ValueMetadata valueMetadata = this.valueMetadataMap.get(obj);
        if (valueMetadata == null) {
            return null;
        }
        return get(valueMetadata);
    }

    private R get(ValueMetadata valueMetadata) {
        return (R) get(valueMetadata, getRandomAccessFile(), this.valueSerializer, this.isCompressionEnabled);
    }

    public static <V> V get(ValueMetadata valueMetadata, RandomAccessFile randomAccessFile, CustomSerializer<V> customSerializer, boolean z) {
        try {
            byte[] readBytesFromDisk = SpillableMapUtils.readBytesFromDisk(randomAccessFile, valueMetadata.getOffsetOfValue().longValue(), valueMetadata.getSizeOfValue());
            return z ? customSerializer.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(readBytesFromDisk)) : customSerializer.deserialize(readBytesFromDisk);
        } catch (IOException e) {
            throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
        }
    }

    private synchronized R put(T t, R r, boolean z) {
        try {
            byte[] compressBytes = this.isCompressionEnabled ? DISK_COMPRESSION_REF.get().compressBytes(this.valueSerializer.serialize(r)) : this.valueSerializer.serialize(r);
            int length = compressBytes.length;
            long currentTimeMillis = System.currentTimeMillis();
            this.valueMetadataMap.put(t, new ValueMetadata(this.filePath, length, this.filePosition.get(), currentTimeMillis));
            byte[] serialize = SerializationUtils.serialize(t);
            this.filePosition.set(SpillableMapUtils.spillToDisk(this.writeOnlyFileHandle, new FileEntry(BinaryUtil.generateChecksum(compressBytes), serialize.length, length, serialize, compressBytes, currentTimeMillis)));
            if (z) {
                flushToDisk();
            }
            return r;
        } catch (IOException e) {
            throw new HoodieIOException("Unable to store data in Disk Based map", e);
        }
    }

    public R put(T t, R r) {
        return put(t, r, true);
    }

    @Override // java.util.Map
    public R remove(Object obj) {
        R r = get(obj);
        this.valueMetadataMap.remove(obj);
        return r;
    }

    @Override // java.util.Map
    public void putAll(Map<? extends T, ? extends R> map) {
        for (Map.Entry<? extends T, ? extends R> entry : map.entrySet()) {
            put(entry.getKey(), entry.getValue(), false);
        }
        flushToDisk();
    }

    @Override // java.util.Map
    public void clear() {
        this.valueMetadataMap.clear();
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap, java.lang.AutoCloseable
    public void close() {
        this.valueMetadataMap.clear();
        try {
            if (this.writeOnlyFileHandle != null) {
                this.writeOnlyFileHandle.flush();
                this.fileOutputStream.getChannel().force(false);
                this.writeOnlyFileHandle.close();
            }
            this.fileOutputStream.close();
            while (!this.openedAccessFiles.isEmpty()) {
                BufferedRandomAccessFile poll = this.openedAccessFiles.poll();
                if (null != poll) {
                    try {
                        poll.close();
                    } catch (IOException e) {
                    }
                }
            }
            this.writeOnlyFile.delete();
            this.iterators.forEach((v0) -> {
                v0.close();
            });
        } catch (Exception e2) {
            this.writeOnlyFile.delete();
        } finally {
            super.close();
        }
    }

    @Override // java.util.Map
    public Set<T> keySet() {
        return this.valueMetadataMap.keySet();
    }

    @Override // java.util.Map
    public Collection<R> values() {
        throw new HoodieException("Unsupported Operation Exception");
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap
    public Stream<R> valueStream() {
        BufferedRandomAccessFile randomAccessFile = getRandomAccessFile();
        return this.valueMetadataMap.values().stream().sorted().map(valueMetadata -> {
            return get(valueMetadata, randomAccessFile, this.valueSerializer, this.isCompressionEnabled);
        });
    }

    @Override // java.util.Map
    public Set<Map.Entry<T, R>> entrySet() {
        HashSet hashSet = new HashSet();
        for (T t : this.valueMetadataMap.keySet()) {
            hashSet.add(new AbstractMap.SimpleEntry(t, get(t)));
        }
        return hashSet;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.Map
    public /* bridge */ /* synthetic */ Object put(Object obj, Object obj2) {
        return put((BitCaskDiskMap<T, R>) obj, (Serializable) obj2);
    }
}
