package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.class */
public class HoodieMergeOnReadSnapshotReader extends AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
    private final String tableBasePath;
    private final List<HoodieLogFile> logFilePaths;
    private final String latestInstantTime;
    private final Schema readerSchema;
    private final JobConf jobConf;
    private final HoodieMergedLogRecordScanner logRecordScanner;
    private final HoodieFileReader baseFileReader;
    private final Map<String, HoodieRecord> logRecordsByKey;
    private final Iterator<HoodieRecord> recordsIterator;
    private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;

    public HoodieMergeOnReadSnapshotReader(String str, String str2, List<HoodieLogFile> list, String str3, Schema schema, JobConf jobConf, long j, long j2) throws IOException {
        super(getRealtimeSplit(str, str2, list, str3, j, j2, new String[0]), jobConf);
        this.tableBasePath = str;
        this.logFilePaths = list;
        this.latestInstantTime = str3;
        this.readerSchema = schema;
        this.jobConf = jobConf;
        HoodieTimer startTimer = new HoodieTimer().startTimer();
        this.logRecordScanner = getMergedLogRecordScanner();
        LOG.debug("Time taken to scan log records: {}", Long.valueOf(startTimer.endTimer()));
        this.baseFileReader = HoodieRealtimeRecordReaderUtils.getBaseFileReader(new Path(str2), jobConf);
        this.logRecordsByKey = this.logRecordScanner.getRecords();
        HashSet hashSet = new HashSet(this.logRecordsByKey.keySet());
        this.mergedRecordsByKey = new ExternalSpillableMap<>(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath()), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema), (ExternalSpillableMap.DiskMapType) jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()), new DefaultSerializer(), jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue()), getClass().getSimpleName());
        ClosableIterator<String> recordKeyIterator = this.baseFileReader.getRecordKeyIterator();
        Throwable th = null;
        try {
            try {
                startTimer.startTimer();
                while (recordKeyIterator.hasNext()) {
                    String next = recordKeyIterator.next();
                    if (hashSet.contains(next)) {
                        hashSet.remove(next);
                        Option<HoodieAvroIndexedRecord> buildGenericRecordWithCustomPayload = buildGenericRecordWithCustomPayload(this.logRecordsByKey.get(next));
                        if (buildGenericRecordWithCustomPayload.isPresent()) {
                            this.mergedRecordsByKey.put((ExternalSpillableMap<String, HoodieRecord>) next, (String) buildGenericRecordWithCustomPayload.get().copy());
                        }
                    }
                }
                if (recordKeyIterator != null) {
                    if (0 != 0) {
                        try {
                            recordKeyIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        recordKeyIterator.close();
                    }
                }
                LOG.debug("Time taken to merge base file and log file records: {}", Long.valueOf(startTimer.endTimer()));
                this.recordsIterator = this.mergedRecordsByKey.values().iterator();
            } finally {
            }
        } catch (Throwable th3) {
            if (recordKeyIterator != null) {
                if (th != null) {
                    try {
                        recordKeyIterator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    recordKeyIterator.close();
                }
            }
            throw th3;
        }
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return this.recordsIterator.hasNext();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public HoodieRecord next() {
        return this.recordsIterator.next();
    }

    public Map<String, HoodieRecord> getRecordsByKey() {
        return this.mergedRecordsByKey;
    }

    public Iterator<HoodieRecord> getRecordsIterator() {
        return this.recordsIterator;
    }

    public Map<String, HoodieRecord> getLogRecordsByKey() {
        return this.logRecordsByKey;
    }

    private static HoodieRealtimeFileSplit getRealtimeSplit(String str, String str2, List<HoodieLogFile> list, String str3, long j, long j2, String[] strArr) {
        return HoodieInputFormatUtils.createRealtimeFileSplit(new HoodieRealtimePath(new Path(str2).getParent(), str2, str, list, str3, false, Option.empty()), j, j2, strArr);
    }

    private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
        return HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(this.split.getPath().toString(), (StorageConfiguration<?>) HadoopFSUtils.getStorageConf(this.jobConf))).withBasePath(this.tableBasePath).withLogFilePaths((List<String>) this.logFilePaths.stream().map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(this.readerSchema).withLatestInstantTime(this.latestInstantTime).withMaxMemorySizeInBytes(Long.valueOf(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf))).withReverseReader(false).withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 1048576)).withSpillableMapBasePath(this.jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath())).withDiskMapType((ExternalSpillableMap.DiskMapType) this.jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(this.jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withOptimizedLogBlocksScan(this.jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))).withInternalSchema(this.schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).build();
    }

    private Option<HoodieAvroIndexedRecord> buildGenericRecordWithCustomPayload(HoodieRecord hoodieRecord) throws IOException {
        return this.usesCustomPayload ? hoodieRecord.toIndexedRecord(getWriterSchema(), this.payloadProps) : hoodieRecord.toIndexedRecord(this.readerSchema, this.payloadProps);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.baseFileReader != null) {
            this.baseFileReader.close();
        }
        if (this.logRecordScanner != null) {
            this.logRecordScanner.close();
        }
        if (this.mergedRecordsByKey != null) {
            this.mergedRecordsByKey.close();
        }
    }
}
