/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bucket;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.index.bucket.HoodieBucketIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieSimpleBucketIndex
extends HoodieBucketIndex {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);

    public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
        super(config);
    }

    public Map<Integer, HoodieRecordLocation> loadBucketIdToFileIdMappingForPartition(HoodieTable hoodieTable, String partition) {
        HashMap<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<Integer, HoodieRecordLocation>();
        HoodieActiveTimeline hoodieActiveTimeline = hoodieTable.getMetaClient().reloadActiveTimeline();
        Set pendingInstants = hoodieActiveTimeline.filterInflights().getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toSet());
        HoodieIndexUtils.getLatestFileSlicesForPartition(partition, hoodieTable).forEach(fileSlice -> {
            String fileId = fileSlice.getFileId();
            String commitTime = fileSlice.getBaseInstantTime();
            int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
            if (bucketIdToFileIdMapping.containsKey(bucketId)) {
                List<String> instants = this.findConflictInstantsInPartition(hoodieTable, partition, bucketId, pendingInstants);
                throw new HoodieIOException("Find multiple files at partition path=" + partition + " that belong to the same bucket id = " + bucketId + ", these instants need to rollback: " + instants.toString() + ", you can use 'rollback_to_instant' procedure to revert the conflicts.");
            }
            bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));
        });
        return bucketIdToFileIdMapping;
    }

    public List<String> findConflictInstantsInPartition(HoodieTable hoodieTable, String partition, int bucketId, Set<String> pendingInstants) {
        ArrayList<String> instants = new ArrayList<String>();
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        StoragePath partitionPath = new StoragePath(metaClient.getBasePath(), partition);
        List<StoragePathInfo> filesInPartition = HoodieSimpleBucketIndex.listFilesFromPartition(metaClient, partitionPath);
        Stream<FileSlice> latestFileSlicesIncludingInflight = hoodieTable.getSliceView().getLatestFileSlicesIncludingInflight(partition);
        List candidates = latestFileSlicesIncludingInflight.map(FileSlice::getLatestInstantTime).filter(pendingInstants::contains).collect(Collectors.toList());
        for (String i : candidates) {
            if (!this.hasPendingDataFilesForInstant(filesInPartition, i, bucketId).booleanValue()) continue;
            instants.add(i);
        }
        return instants;
    }

    private static List<StoragePathInfo> listFilesFromPartition(HoodieTableMetaClient metaClient, StoragePath partitionPath) {
        try {
            return metaClient.getStorage().listFiles(partitionPath);
        }
        catch (IOException e) {
            return Collections.emptyList();
        }
    }

    public Boolean hasPendingDataFilesForInstant(List<StoragePathInfo> filesInPartition, String instant, int bucketId) {
        for (StoragePathInfo status : filesInPartition) {
            String fileName = status.getPath().getName();
            try {
                if (!status.isFile() || BucketIdentifier.bucketIdFromFileId(fileName) != bucketId || !fileName.contains(instant)) continue;
                return true;
            }
            catch (NumberFormatException e) {
                LOG.warn("File is not bucket file");
            }
        }
        return false;
    }

    public int getBucketID(HoodieKey key) {
        return BucketIdentifier.getBucketId(key, this.indexKeyFields, this.numBuckets);
    }

    @Override
    public boolean canIndexLogFiles() {
        return false;
    }

    @Override
    protected Function<HoodieRecord, Option<HoodieRecordLocation>> getIndexLocationFunctionForPartition(HoodieTable table, String partitionPath) {
        return new SimpleBucketIndexLocationFunction(table, partitionPath);
    }

    private class SimpleBucketIndexLocationFunction
    implements Function<HoodieRecord, Option<HoodieRecordLocation>> {
        private final Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping;

        public SimpleBucketIndexLocationFunction(HoodieTable table, String partitionPath) {
            this.bucketIdToFileIdMapping = HoodieSimpleBucketIndex.this.loadBucketIdToFileIdMappingForPartition(table, partitionPath);
        }

        @Override
        public Option<HoodieRecordLocation> apply(HoodieRecord record) {
            int bucketId = HoodieSimpleBucketIndex.this.getBucketID(record.getKey());
            return Option.ofNullable(this.bucketIdToFileIdMapping.get(bucketId));
        }
    }
}

