package org.apache.uniffle.storage.handler.impl;

import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.base.Preconditions;
import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.class */
public class ComposedClientReadHandler extends AbstractClientReadHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class);
    private final ShuffleServerInfo serverInfo;
    private final Map<Tier, Supplier<ClientReadHandler>> supplierMap = new EnumMap(Tier.class);
    private final Map<Tier, ClientReadHandler> handlerMap = new EnumMap(Tier.class);
    private final Map<Tier, ClientReadHandlerMetric> metricsMap = new EnumMap(Tier.class);
    private Tier currentTier = Tier.VALUES[0];
    private final int numTiers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler$Tier.class */
    public enum Tier {
        HOT,
        WARM,
        COLD,
        FROZEN;

        static final Tier[] VALUES = values();

        Tier next() {
            return VALUES[ordinal() + 1];
        }
    }

    public ComposedClientReadHandler(ShuffleServerInfo shuffleServerInfo, ClientReadHandler... clientReadHandlerArr) {
        for (Tier tier : Tier.VALUES) {
            this.metricsMap.put(tier, new ClientReadHandlerMetric());
        }
        Preconditions.checkArgument(clientReadHandlerArr.length <= Tier.VALUES.length, "Too many handlers, got %d, max %d", clientReadHandlerArr.length, Tier.VALUES.length);
        this.serverInfo = shuffleServerInfo;
        this.numTiers = clientReadHandlerArr.length;
        for (int i = 0; i < this.numTiers; i++) {
            this.handlerMap.put(Tier.VALUES[i], clientReadHandlerArr[i]);
        }
    }

    public ComposedClientReadHandler(ShuffleServerInfo shuffleServerInfo, List<Supplier<ClientReadHandler>> list) {
        for (Tier tier : Tier.VALUES) {
            this.metricsMap.put(tier, new ClientReadHandlerMetric());
        }
        Preconditions.checkArgument(list.size() <= Tier.VALUES.length, "Too many suppliers, got %d, max %d", list.size(), Tier.VALUES.length);
        this.serverInfo = shuffleServerInfo;
        this.numTiers = list.size();
        for (int i = 0; i < this.numTiers; i++) {
            this.supplierMap.put(Tier.VALUES[i], list.get(i));
        }
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public ShuffleDataResult readShuffleData() {
        ClientReadHandler computeIfAbsent = this.handlerMap.computeIfAbsent(this.currentTier, tier -> {
            return this.supplierMap.getOrDefault(tier, () -> {
                return null;
            }).get();
        });
        if (computeIfAbsent == null) {
            throw new RssException("Unexpected null when getting " + this.currentTier.name() + " handler");
        }
        try {
            ShuffleDataResult readShuffleData = computeIfAbsent.readShuffleData();
            if (readShuffleData != null && !readShuffleData.isEmpty()) {
                return readShuffleData;
            }
            if (this.currentTier.ordinal() + 1 >= this.numTiers) {
                return null;
            }
            this.currentTier = this.currentTier.next();
            return readShuffleData();
        } catch (RssFetchFailedException e) {
            throw new RssFetchFailedException("Failed to read shuffle data from " + this.currentTier.name() + "handler, error: " + e.getMessage(), e.getCause());
        } catch (Exception e2) {
            throw new RssFetchFailedException("Failed to read shuffle data from " + this.currentTier.name() + " handler", e2);
        }
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public void close() {
        this.handlerMap.values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public void updateConsumedBlockInfo(BufferSegment bufferSegment, boolean z) {
        if (bufferSegment == null) {
            return;
        }
        super.updateConsumedBlockInfo(bufferSegment, z);
        updateBlockMetric(this.metricsMap.get(this.currentTier), bufferSegment, z);
    }

    @Override // org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler, org.apache.uniffle.storage.handler.api.ClientReadHandler
    public void logConsumedBlockInfo() {
        LOG.info(getReadBlockNumInfo());
        LOG.info(getReadLengthInfo());
        LOG.info(getReadUncompressLengthInfo());
    }

    @VisibleForTesting
    public String getReadBlockNumInfo() {
        return getMetricsInfo("blocks", (v0) -> {
            return v0.getReadBlockNum();
        }, (v0) -> {
            return v0.getSkippedReadBlockNum();
        });
    }

    @VisibleForTesting
    public String getReadLengthInfo() {
        return getMetricsInfo("bytes", (v0) -> {
            return v0.getReadLength();
        }, (v0) -> {
            return v0.getSkippedReadLength();
        });
    }

    @VisibleForTesting
    public String getReadUncompressLengthInfo() {
        return getMetricsInfo("uncompressed bytes", (v0) -> {
            return v0.getReadUncompressLength();
        }, (v0) -> {
            return v0.getSkippedReadUncompressLength();
        });
    }

    private String getMetricsInfo(String str, Function<ClientReadHandlerMetric, Long> function, Function<ClientReadHandlerMetric, Long> function2) {
        StringBuilder append = new StringBuilder("Client read ").append(function.apply(this.readHandlerMetric)).append(" ").append(str).append(" from [").append(this.serverInfo).append("], Consumed[");
        for (Tier tier : Tier.VALUES) {
            append.append(" ").append(tier.name().toLowerCase()).append(":").append(function.apply(this.metricsMap.get(tier)));
        }
        append.append(" ], Skipped[");
        for (Tier tier2 : Tier.VALUES) {
            append.append(" ").append(tier2.name().toLowerCase()).append(":").append(function2.apply(this.metricsMap.get(tier2)));
        }
        append.append(" ]");
        return append.toString();
    }
}
