package org.apache.uniffle.client.record.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.record.Record;
import org.apache.uniffle.client.record.RecordBlob;
import org.apache.uniffle.client.record.RecordBuffer;
import org.apache.uniffle.client.record.metrics.MetricsReporter;
import org.apache.uniffle.client.record.writer.Combiner;
import org.apache.uniffle.client.request.RssGetSortedShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetSortedShuffleDataResponse;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.merger.MergeState;
import org.apache.uniffle.common.merger.Merger;
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.records.RecordsReader;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.serializer.SerInputStream;
import org.apache.uniffle.common.serializer.Serializer;
import org.apache.uniffle.common.serializer.SerializerFactory;
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.serializer.writable.ComparativeOutputBuffer;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader.class */
public class RMRecordsReader<K, V, C> {
    private static final Logger LOG;
    private String appId;
    private final int shuffleId;
    private final Set<Integer> partitionIds;
    private final RssConf rssConf;
    private final Class<K> keyClass;
    private final Class<V> valueClass;
    private final Comparator comparator;
    private boolean raw;
    private final Combiner combiner;
    private boolean isMapCombine;
    private final MetricsReporter metrics;
    private final String clientType;
    private SerializerInstance serializerInstance;
    private final int retryMax;
    private final long retryIntervalMax;
    private final long initFetchSleepTime;
    private final long maxFetchSleepTime;
    private final int maxBufferPerPartition;
    private final int maxRecordsNumPerBuffer;
    private Map<Integer, List<ShuffleServerInfo>> shuffleServerInfoMap;
    private volatile boolean stop;
    private volatile Throwable error;
    private Map<Integer, RMRecordsReader<K, V, C>.Queue<RecordBuffer>> combineBuffers;
    private Map<Integer, RMRecordsReader<K, V, C>.Queue<RecordBuffer>> mergeBuffers;
    private RMRecordsReader<K, V, C>.Queue<Record> results;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.uniffle.client.record.reader.RMRecordsReader$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader$4.class */
    class AnonymousClass4 extends KeyValuesReader {
        private Record<K, C> start = null;

        AnonymousClass4() {
        }

        @Override // org.apache.uniffle.client.record.reader.KeyValuesReader
        public boolean next() throws IOException {
            try {
                if (this.start != null) {
                    return true;
                }
                this.start = (Record) RMRecordsReader.this.results.take();
                return this.start != null;
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }

        @Override // org.apache.uniffle.client.record.reader.KeyValuesReader
        public K getCurrentKey() throws IOException {
            if (!RMRecordsReader.this.raw) {
                return this.start.getKey();
            }
            ComparativeOutputBuffer comparativeOutputBuffer = (ComparativeOutputBuffer) this.start.getKey();
            DataInputBuffer dataInputBuffer = new DataInputBuffer();
            dataInputBuffer.reset(comparativeOutputBuffer.getData(), 0, comparativeOutputBuffer.getLength());
            return (K) RMRecordsReader.this.serializerInstance.deserialize(dataInputBuffer, RMRecordsReader.this.keyClass);
        }

        @Override // org.apache.uniffle.client.record.reader.KeyValuesReader
        public Iterable<C> getCurrentValues() throws IOException {
            return new Iterable<C>() { // from class: org.apache.uniffle.client.record.reader.RMRecordsReader.4.1
                @Override // java.lang.Iterable
                public Iterator<C> iterator() {
                    return new Iterator<C>() { // from class: org.apache.uniffle.client.record.reader.RMRecordsReader.4.1.1
                        Record<K, C> curr;

                        {
                            this.curr = AnonymousClass4.this.start;
                        }

                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            if (this.curr != null && RMRecordsReader.this.isSameKey(this.curr.getKey(), AnonymousClass4.this.start.getKey())) {
                                return true;
                            }
                            AnonymousClass4.this.start = this.curr;
                            return false;
                        }

                        /* JADX WARN: Multi-variable type inference failed */
                        @Override // java.util.Iterator
                        public C next() {
                            C value;
                            try {
                                if (RMRecordsReader.this.raw) {
                                    ComparativeOutputBuffer comparativeOutputBuffer = (ComparativeOutputBuffer) this.curr.getValue();
                                    DataInputBuffer dataInputBuffer = new DataInputBuffer();
                                    dataInputBuffer.reset(comparativeOutputBuffer.getData(), 0, comparativeOutputBuffer.getLength());
                                    value = RMRecordsReader.this.serializerInstance.deserialize(dataInputBuffer, RMRecordsReader.this.valueClass);
                                } else {
                                    value = this.curr.getValue();
                                }
                                this.curr = (Record) RMRecordsReader.this.results.take();
                                return value;
                            } catch (IOException | InterruptedException e) {
                                throw new RssException(e);
                            }
                        }
                    };
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader$Queue.class */
    public class Queue<E> {
        private LinkedBlockingQueue<E> queue;
        private volatile boolean producerDone = false;

        Queue(int i) {
            this.queue = new LinkedBlockingQueue<>(i);
        }

        public void setProducerDone(boolean z) {
            this.producerDone = z;
        }

        public void put(E e) throws InterruptedException {
            this.queue.put(e);
        }

        public E take() throws InterruptedException {
            while (!this.producerDone && !RMRecordsReader.this.stop) {
                E poll = this.queue.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    return poll;
                }
            }
            if (RMRecordsReader.this.error != null) {
                throw new RssException("RMShuffleReader fetch record failed, caused by " + RMRecordsReader.this.error);
            }
            return this.queue.poll(100L, TimeUnit.MILLISECONDS);
        }

        public void clear() {
            this.queue.clear();
            this.producerDone = false;
        }
    }

    /* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader$RecordsCombiner.class */
    class RecordsCombiner extends Thread {
        private int partitionId;
        private RecordBuffer cached;
        private Queue nextQueue;

        RecordsCombiner(int i) {
            this.partitionId = i;
            this.cached = new RecordBuffer(i);
            this.nextQueue = (Queue) RMRecordsReader.this.mergeBuffers.get(Integer.valueOf(i));
            setName("RecordsCombiner-" + i);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!RMRecordsReader.this.stop) {
                try {
                    RecordBuffer<K, V> recordBuffer = (RecordBuffer) ((Queue) RMRecordsReader.this.combineBuffers.get(Integer.valueOf(this.partitionId))).take();
                    if (recordBuffer == null) {
                        if (this.cached.size() > 0) {
                            sendCachedBuffer(this.cached);
                        }
                        this.nextQueue.setProducerDone(true);
                        return;
                    }
                    if (this.cached.size() > 0 && !RMRecordsReader.this.isSameKey(this.cached.getLastKey(), recordBuffer.getFirstKey())) {
                        sendCachedBuffer(this.cached);
                        this.cached = new RecordBuffer(this.partitionId);
                    }
                    RecordBlob recordBlob = new RecordBlob(this.partitionId);
                    recordBlob.addRecords(recordBuffer);
                    recordBlob.combine(RMRecordsReader.this.combiner, RMRecordsReader.this.isMapCombine);
                    for (Record<K, C> record : recordBlob.getResult()) {
                        if (this.cached.size() >= RMRecordsReader.this.maxRecordsNumPerBuffer && !RMRecordsReader.this.isSameKey(record.getKey(), this.cached.getLastKey())) {
                            sendCachedBuffer(this.cached);
                            this.cached = new RecordBuffer(this.partitionId);
                        }
                        this.cached.addRecord(record);
                    }
                } catch (InterruptedException e) {
                    throw new RssException(e);
                }
            }
        }

        private void sendCachedBuffer(RecordBuffer<K, C> recordBuffer) throws InterruptedException {
            RecordBlob recordBlob = new RecordBlob(this.partitionId);
            recordBlob.addRecords(recordBuffer);
            recordBlob.combine(RMRecordsReader.this.combiner, true);
            RecordBuffer recordBuffer2 = new RecordBuffer(this.partitionId);
            recordBuffer2.addRecords(recordBlob.getResult());
            this.nextQueue.put(recordBuffer2);
        }
    }

    /* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader$RecordsFetcher.class */
    class RecordsFetcher extends Thread {
        private int partitionId;
        private long sleepTime;
        private long blockId = 1;
        private RecordBuffer recordBuffer;
        private Queue nextQueue;
        private List<ShuffleServerInfo> serverInfos;
        private ShuffleServerClient client;
        private int choose;
        private String fetchError;

        RecordsFetcher(int i) {
            this.partitionId = i;
            this.sleepTime = RMRecordsReader.this.initFetchSleepTime;
            this.recordBuffer = new RecordBuffer(i);
            this.nextQueue = RMRecordsReader.this.combiner == null ? (Queue) RMRecordsReader.this.mergeBuffers.get(Integer.valueOf(i)) : (Queue) RMRecordsReader.this.combineBuffers.get(Integer.valueOf(i));
            this.serverInfos = (List) RMRecordsReader.this.shuffleServerInfoMap.get(Integer.valueOf(i));
            this.choose = this.serverInfos.size() - 1;
            this.client = RMRecordsReader.this.createShuffleServerClient(this.serverInfos.get(this.choose));
            setName("RecordsFetcher-" + i);
        }

        private void nextShuffleServerInfo() {
            if (this.choose <= 0) {
                throw new RssException("Fetch sorted record failed, last error message is " + this.fetchError);
            }
            this.choose--;
            this.client = RMRecordsReader.this.createShuffleServerClient(this.serverInfos.get(this.choose));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            RssGetSortedShuffleDataResponse sortedShuffleData;
            while (!RMRecordsReader.this.stop) {
                try {
                    sortedShuffleData = this.client.getSortedShuffleData(new RssGetSortedShuffleDataRequest(RMRecordsReader.this.appId, RMRecordsReader.this.shuffleId, this.partitionId, this.blockId, RMRecordsReader.this.retryMax, RMRecordsReader.this.retryIntervalMax));
                } catch (Throwable th) {
                    RMRecordsReader.this.error = th;
                    RMRecordsReader.this.stop = true;
                    RMRecordsReader.LOG.info("Found exception when fetch sorted record, caused by ", th);
                }
                if (sortedShuffleData.getStatusCode() != StatusCode.SUCCESS || sortedShuffleData.getMergeState() == MergeState.INTERNAL_ERROR.code()) {
                    this.fetchError = sortedShuffleData.getMessage();
                    nextShuffleServerInfo();
                    return;
                }
                if (sortedShuffleData.getMergeState() == MergeState.INITED.code()) {
                    this.fetchError = "Remote merge should be started!";
                    nextShuffleServerInfo();
                    return;
                }
                if (sortedShuffleData.getMergeState() == MergeState.MERGING.code() && sortedShuffleData.getNextBlockId() == -1) {
                    RMRecordsReader.LOG.info("RMRecordsFetcher will sleep {} ms", Long.valueOf(this.sleepTime));
                    Thread.sleep(this.sleepTime);
                    this.sleepTime = Math.min(this.sleepTime * 2, RMRecordsReader.this.maxFetchSleepTime);
                } else {
                    if (sortedShuffleData.getMergeState() == MergeState.DONE.code() && sortedShuffleData.getNextBlockId() == -1) {
                        if (this.recordBuffer.size() > 0) {
                            this.nextQueue.put(this.recordBuffer);
                        }
                        this.nextQueue.setProducerDone(true);
                        return;
                    }
                    if (sortedShuffleData.getMergeState() != MergeState.DONE.code() && sortedShuffleData.getMergeState() != MergeState.MERGING.code()) {
                        this.fetchError = "Receive wrong offset from server, offset is " + sortedShuffleData.getNextBlockId();
                        nextShuffleServerInfo();
                        return;
                    }
                    this.sleepTime = RMRecordsReader.this.initFetchSleepTime;
                    this.blockId = sortedShuffleData.getNextBlockId();
                    ManagedBuffer managedBuffer = null;
                    ByteBuf byteBuf = null;
                    RecordsReader recordsReader = null;
                    try {
                        managedBuffer = sortedShuffleData.getData();
                        byteBuf = managedBuffer.byteBuf();
                        recordsReader = new RecordsReader(RMRecordsReader.this.rssConf, SerInputStream.newInputStream(byteBuf), RMRecordsReader.this.keyClass, RMRecordsReader.this.valueClass, RMRecordsReader.this.raw, false);
                        recordsReader.init();
                        while (recordsReader.next()) {
                            if (RMRecordsReader.this.metrics != null) {
                                RMRecordsReader.this.metrics.incRecordsRead(1L);
                            }
                            if (this.recordBuffer.size() >= RMRecordsReader.this.maxRecordsNumPerBuffer) {
                                this.nextQueue.put(this.recordBuffer);
                                this.recordBuffer = new RecordBuffer(this.partitionId);
                            }
                            this.recordBuffer.addRecord(recordsReader.getCurrentKey(), recordsReader.getCurrentValue());
                        }
                        if (recordsReader != null) {
                            recordsReader.close();
                        }
                        if (byteBuf != null) {
                            byteBuf.release();
                        }
                        if (managedBuffer != null) {
                            managedBuffer.release();
                        }
                    } catch (Throwable th2) {
                        if (recordsReader != null) {
                            recordsReader.close();
                        }
                        if (byteBuf != null) {
                            byteBuf.release();
                        }
                        if (managedBuffer != null) {
                            managedBuffer.release();
                        }
                        throw th2;
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/uniffle/client/record/reader/RMRecordsReader$RecordsMerger.class */
    class RecordsMerger extends Thread {
        RecordsMerger() {
            setName("RecordsMerger");
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ArrayList arrayList = new ArrayList();
                Iterator it = RMRecordsReader.this.partitionIds.iterator();
                while (it.hasNext()) {
                    RecordBuffer recordBuffer = (RecordBuffer) ((Queue) RMRecordsReader.this.mergeBuffers.get(Integer.valueOf(((Integer) it.next()).intValue()))).take();
                    if (recordBuffer != null) {
                        arrayList.add(new BufferedSegment(recordBuffer));
                    }
                }
                Merger.MergeQueue mergeQueue = new Merger.MergeQueue(RMRecordsReader.this.rssConf, arrayList, RMRecordsReader.this.keyClass, RMRecordsReader.this.valueClass, RMRecordsReader.this.comparator, RMRecordsReader.this.raw, false);
                try {
                    mergeQueue.init();
                    mergeQueue.setPopSegmentHook(obj -> {
                        try {
                            RecordBuffer recordBuffer2 = (RecordBuffer) ((Queue) RMRecordsReader.this.mergeBuffers.get(obj)).take();
                            if (recordBuffer2 == null) {
                                return null;
                            }
                            return new BufferedSegment(recordBuffer2);
                        } catch (InterruptedException e) {
                            throw new RssException(e);
                        }
                    });
                    while (!RMRecordsReader.this.stop && mergeQueue.next()) {
                        RMRecordsReader.this.results.put(Record.create(mergeQueue.getCurrentKey(), mergeQueue.getCurrentValue()));
                    }
                    mergeQueue.close();
                    if (!RMRecordsReader.this.stop) {
                        RMRecordsReader.this.results.setProducerDone(true);
                    }
                } catch (Throwable th) {
                    mergeQueue.close();
                    throw th;
                }
            } catch (IOException | InterruptedException e) {
                RMRecordsReader.this.error = e;
                RMRecordsReader.this.stop = true;
            }
        }
    }

    public RMRecordsReader(String str, int i, Set<Integer> set, Map<Integer, List<ShuffleServerInfo>> map, RssConf rssConf, Class<K> cls, Class<V> cls2, Comparator<K> comparator, boolean z, Combiner combiner, boolean z2, MetricsReporter metricsReporter) {
        this(str, i, set, map, rssConf, cls, cls2, comparator, z, combiner, z2, metricsReporter, ClientType.GRPC.name());
    }

    public RMRecordsReader(String str, int i, Set<Integer> set, Map<Integer, List<ShuffleServerInfo>> map, RssConf rssConf, Class<K> cls, Class<V> cls2, Comparator<K> comparator, boolean z, Combiner combiner, boolean z2, MetricsReporter metricsReporter, String str2) {
        this.stop = false;
        this.error = null;
        this.combineBuffers = JavaUtils.newConcurrentMap();
        this.mergeBuffers = JavaUtils.newConcurrentMap();
        this.appId = str;
        this.shuffleId = i;
        this.partitionIds = set;
        this.shuffleServerInfoMap = map;
        this.rssConf = rssConf;
        this.keyClass = cls;
        this.valueClass = cls2;
        this.raw = z;
        if (z && comparator == null) {
            throw new RssException("RawComparator must be set!");
        }
        this.comparator = comparator != null ? comparator : new Comparator<K>() { // from class: org.apache.uniffle.client.record.reader.RMRecordsReader.1
            @Override // java.util.Comparator
            public int compare(K k, K k2) {
                int hashCode = k == null ? 0 : k.hashCode();
                int hashCode2 = k2 == null ? 0 : k2.hashCode();
                if (hashCode < hashCode2) {
                    return -1;
                }
                return hashCode == hashCode2 ? 0 : 1;
            }
        };
        this.combiner = combiner;
        this.isMapCombine = z2;
        this.metrics = metricsReporter;
        this.clientType = str2;
        if (this.raw) {
            SerializerFactory serializerFactory = new SerializerFactory(rssConf);
            Serializer serializer = serializerFactory.getSerializer(cls);
            if (!$assertionsDisabled && !serializerFactory.getSerializer(cls2).getClass().equals(serializer.getClass())) {
                throw new AssertionError();
            }
            this.serializerInstance = serializer.newInstance();
        }
        this.initFetchSleepTime = ((Integer) rssConf.get(RssClientConf.RSS_CLIENT_REMOTE_MERGE_FETCH_INIT_SLEEP_MS)).intValue();
        this.maxFetchSleepTime = ((Integer) rssConf.get(RssClientConf.RSS_CLIENT_REMOTE_MERGE_FETCH_MAX_SLEEP_MS)).intValue();
        this.maxBufferPerPartition = Math.max(1, ((Integer) rssConf.get(RssClientConf.RSS_CLIENT_REMOTE_MERGE_READER_MAX_BUFFER)).intValue() / set.size());
        this.maxRecordsNumPerBuffer = ((Integer) rssConf.get(RssClientConf.RSS_CLIENT_REMOTE_MERGE_READER_MAX_RECORDS_PER_BUFFER)).intValue();
        this.results = new Queue<>(this.maxBufferPerPartition * this.maxRecordsNumPerBuffer * set.size());
        this.retryMax = rssConf.getInteger(RssClientConfig.RSS_CLIENT_RETRY_MAX, 50);
        this.retryIntervalMax = rssConf.getLong(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 10000L);
        LOG.info("RMRecordsReader constructed for partitions {}", set);
    }

    public void start() {
        Iterator<Integer> it = this.partitionIds.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.mergeBuffers.put(Integer.valueOf(intValue), new Queue<>(this.maxBufferPerPartition));
            if (this.combiner != null) {
                this.combineBuffers.put(Integer.valueOf(intValue), new Queue<>(this.maxBufferPerPartition));
            }
            new RecordsFetcher(intValue).start();
            if (this.combiner != null) {
                new RecordsCombiner(intValue).start();
            }
        }
        new RecordsMerger().start();
    }

    public void close() {
        this.error = null;
        this.stop = true;
        Iterator<RMRecordsReader<K, V, C>.Queue<RecordBuffer>> it = this.mergeBuffers.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.mergeBuffers.clear();
        if (this.combiner != null) {
            Iterator<RMRecordsReader<K, V, C>.Queue<RecordBuffer>> it2 = this.combineBuffers.values().iterator();
            while (it2.hasNext()) {
                it2.next().clear();
            }
            this.combineBuffers.clear();
        }
        if (this.results != null) {
            this.results.clear();
            this.results = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isSameKey(Object obj, Object obj2) {
        if (!this.raw) {
            return this.comparator.compare(obj, obj2) == 0;
        }
        ComparativeOutputBuffer comparativeOutputBuffer = (ComparativeOutputBuffer) obj;
        ComparativeOutputBuffer comparativeOutputBuffer2 = (ComparativeOutputBuffer) obj2;
        return this.comparator.compare(comparativeOutputBuffer.getData(), 0, comparativeOutputBuffer.getLength(), comparativeOutputBuffer2.getData(), 0, comparativeOutputBuffer2.getLength()) == 0;
    }

    public KeyValueReader<ComparativeOutputBuffer, ComparativeOutputBuffer> rawKeyValueReader() {
        if (this.raw) {
            return new KeyValueReader<ComparativeOutputBuffer, ComparativeOutputBuffer>() { // from class: org.apache.uniffle.client.record.reader.RMRecordsReader.2
                private Record<ComparativeOutputBuffer, ComparativeOutputBuffer> curr = null;

                @Override // org.apache.uniffle.client.record.reader.KeyValueReader
                public boolean next() throws IOException {
                    try {
                        this.curr = (Record) RMRecordsReader.this.results.take();
                        return this.curr != null;
                    } catch (InterruptedException e) {
                        throw new IOException(e);
                    }
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.uniffle.client.record.reader.KeyValueReader
                public ComparativeOutputBuffer getCurrentKey() throws IOException {
                    return this.curr.getKey();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.uniffle.client.record.reader.KeyValueReader
                public ComparativeOutputBuffer getCurrentValue() throws IOException {
                    return this.curr.getValue();
                }
            };
        }
        throw new RssException("rawKeyValueReader is not supported!");
    }

    public KeyValueReader<K, C> keyValueReader() {
        return new KeyValueReader<K, C>() { // from class: org.apache.uniffle.client.record.reader.RMRecordsReader.3
            private Record<K, C> curr = null;

            @Override // org.apache.uniffle.client.record.reader.KeyValueReader
            public boolean next() throws IOException {
                try {
                    this.curr = (Record) RMRecordsReader.this.results.take();
                    return this.curr != null;
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }

            @Override // org.apache.uniffle.client.record.reader.KeyValueReader
            public K getCurrentKey() throws IOException {
                if (!RMRecordsReader.this.raw) {
                    return this.curr.getKey();
                }
                ComparativeOutputBuffer comparativeOutputBuffer = (ComparativeOutputBuffer) this.curr.getKey();
                DataInputBuffer dataInputBuffer = new DataInputBuffer();
                dataInputBuffer.reset(comparativeOutputBuffer.getData(), 0, comparativeOutputBuffer.getLength());
                return (K) RMRecordsReader.this.serializerInstance.deserialize(dataInputBuffer, RMRecordsReader.this.keyClass);
            }

            @Override // org.apache.uniffle.client.record.reader.KeyValueReader
            public C getCurrentValue() throws IOException {
                if (!RMRecordsReader.this.raw) {
                    return this.curr.getValue();
                }
                ComparativeOutputBuffer comparativeOutputBuffer = (ComparativeOutputBuffer) this.curr.getValue();
                DataInputBuffer dataInputBuffer = new DataInputBuffer();
                dataInputBuffer.reset(comparativeOutputBuffer.getData(), 0, comparativeOutputBuffer.getLength());
                return (C) RMRecordsReader.this.serializerInstance.deserialize(dataInputBuffer, RMRecordsReader.this.valueClass);
            }
        };
    }

    public KeyValuesReader<K, C> keyValuesReader() {
        return new AnonymousClass4();
    }

    @VisibleForTesting
    public ShuffleServerClient createShuffleServerClient(ShuffleServerInfo shuffleServerInfo) {
        return ShuffleServerClientFactory.getInstance().getShuffleServerClient(this.clientType, shuffleServerInfo, this.rssConf);
    }

    static {
        $assertionsDisabled = !RMRecordsReader.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(RMRecordsReader.class);
    }
}
