package org.apache.uniffle.storage.handler.impl;

import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.shaded.com.google.common.collect.Lists;
import org.apache.uniffle.shaded.org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.class */
public class HadoopClientReadHandler extends AbstractClientReadHandler {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopClientReadHandler.class);
    protected final int partitionNumPerRange;
    protected final int partitionNum;
    protected final int readBufferSize;
    private final String shuffleServerId;
    protected Roaring64NavigableMap expectBlockIds;
    protected Roaring64NavigableMap processBlockIds;
    protected final String storageBasePath;
    protected final Configuration hadoopConf;
    protected final List<HadoopShuffleReadHandler> readHandlers;
    private int readHandlerIndex;
    private ShuffleDataDistributionType distributionType;
    private Roaring64NavigableMap expectTaskIds;
    private boolean offHeapEnable;

    public HadoopClientReadHandler(String str, int i, int i2, int i3, int i4, int i5, int i6, Roaring64NavigableMap roaring64NavigableMap, Roaring64NavigableMap roaring64NavigableMap2, String str2, Configuration configuration, ShuffleDataDistributionType shuffleDataDistributionType, Roaring64NavigableMap roaring64NavigableMap3, String str3, boolean z) {
        this.readHandlers = Lists.newArrayList();
        this.offHeapEnable = false;
        this.appId = str;
        this.shuffleId = i;
        this.partitionId = i2;
        this.partitionNumPerRange = i4;
        this.partitionNum = i5;
        this.readBufferSize = i6;
        this.expectBlockIds = roaring64NavigableMap;
        this.processBlockIds = roaring64NavigableMap2;
        this.storageBasePath = str2;
        this.hadoopConf = configuration;
        this.readHandlerIndex = 0;
        this.distributionType = shuffleDataDistributionType;
        this.expectTaskIds = roaring64NavigableMap3;
        this.shuffleServerId = str3;
        this.offHeapEnable = z;
    }

    public HadoopClientReadHandler(String str, int i, int i2, int i3, int i4, int i5, int i6, Roaring64NavigableMap roaring64NavigableMap, Roaring64NavigableMap roaring64NavigableMap2, String str2, Configuration configuration) {
        this(str, i, i2, i3, i4, i5, i6, roaring64NavigableMap, roaring64NavigableMap2, str2, configuration, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf(new long[0]), null, false);
    }

    protected void init(String str) {
        Path path = new Path(str);
        try {
            try {
                FileStatus[] listStatus = HadoopFilesystemProvider.getFilesystem(path, this.hadoopConf).listStatus(path, path2 -> {
                    return path2.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX) && (this.shuffleServerId == null || path2.getName().startsWith(this.shuffleServerId));
                });
                if (listStatus == null || listStatus.length == 0) {
                    return;
                }
                for (FileStatus fileStatus : listStatus) {
                    LOG.info("Find index file for shuffleId[" + this.shuffleId + "], partitionId[" + this.partitionId + "] " + fileStatus.getPath());
                    String fileNamePrefix = getFileNamePrefix(fileStatus.getPath().toUri().toString());
                    try {
                        this.readHandlers.add(new HadoopShuffleReadHandler(this.appId, this.shuffleId, this.partitionId, fileNamePrefix, this.readBufferSize, this.expectBlockIds, this.processBlockIds, this.hadoopConf, this.distributionType, this.expectTaskIds, this.offHeapEnable));
                    } catch (Exception e) {
                        LOG.warn("Can't create ShuffleReaderHandler for " + fileNamePrefix, e);
                    }
                }
                Collections.shuffle(this.readHandlers);
                LOG.info("Reading order of Hadoop files with name prefix: {}", this.readHandlers.stream().map(hadoopShuffleReadHandler -> {
                    return hadoopShuffleReadHandler.filePrefix;
                }).collect(Collectors.toList()));
            } catch (Exception e2) {
                if (e2 instanceof FileNotFoundException) {
                    LOG.info("Directory[" + path + "] not found. The data may not be flushed to this directory. Nothing will be read.");
                } else {
                    LOG.error("Can't list index file in  " + path, e2);
                }
            }
        } catch (Exception e3) {
            throw new RssException("Can't get FileSystem for " + path);
        }
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public ShuffleDataResult readShuffleData() {
        if (this.readHandlers.isEmpty()) {
            init(ShuffleStorageUtils.getFullShuffleDataFolder(this.storageBasePath, ShuffleStorageUtils.getShuffleDataPathWithRange(this.appId, this.shuffleId, this.partitionId, this.partitionNumPerRange, this.partitionNum)));
        }
        if (this.readHandlerIndex >= this.readHandlers.size()) {
            return new ShuffleDataResult();
        }
        ShuffleDataResult readShuffleData = this.readHandlers.get(this.readHandlerIndex).readShuffleData();
        while (true) {
            ShuffleDataResult shuffleDataResult = readShuffleData;
            if (shuffleDataResult != null) {
                return shuffleDataResult;
            }
            this.readHandlerIndex++;
            if (this.readHandlerIndex >= this.readHandlers.size()) {
                return new ShuffleDataResult();
            }
            readShuffleData = this.readHandlers.get(this.readHandlerIndex).readShuffleData();
        }
    }

    protected String getFileNamePrefix(String str) {
        return str.substring(0, str.lastIndexOf("."));
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public synchronized void close() {
        Iterator<HadoopShuffleReadHandler> it = this.readHandlers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    protected List<HadoopShuffleReadHandler> getHdfsShuffleFileReadHandlers() {
        return this.readHandlers;
    }

    protected int getReadHandlerIndex() {
        return this.readHandlerIndex;
    }
}
