package org.apache.uniffle.common.netty;

import java.util.Iterator;
import java.util.LinkedList;
import org.apache.uniffle.common.netty.protocol.Message;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.com.google.common.base.Preconditions;
import org.apache.uniffle.shaded.io.netty.buffer.ByteBuf;
import org.apache.uniffle.shaded.io.netty.buffer.CompositeByteBuf;
import org.apache.uniffle.shaded.io.netty.buffer.EmptyByteBuf;
import org.apache.uniffle.shaded.io.netty.buffer.Unpooled;
import org.apache.uniffle.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandlerAdapter;

/* loaded from: input_file:org/apache/uniffle/common/netty/TransportFrameDecoder.class */
public class TransportFrameDecoder extends ChannelInboundHandlerAdapter implements FrameDecoder {
    private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    private static final int UNKNOWN_FRAME_SIZE = -1;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int msgSize = -1;
    private int bodySize = -1;
    private Message.Type curType = Message.Type.UNKNOWN_TYPE;
    private ByteBuf headerBuf = Unpooled.buffer(9, 9);
    private final LinkedList<ByteBuf> buffers = new LinkedList<>();
    private long totalSize = 0;
    private long nextFrameSize = -1;

    @Override // org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        ByteBuf decodeNext;
        this.buffers.add((ByteBuf) obj);
        this.totalSize += r0.readableBytes();
        while (!this.buffers.isEmpty() && (decodeNext = decodeNext()) != null) {
            Message message = null;
            try {
                message = Message.decode(this.curType, decodeNext);
                if (shouldRelease(message)) {
                    decodeNext.release();
                }
                channelHandlerContext.fireChannelRead((Object) message);
                clear();
            } catch (Throwable th) {
                if (shouldRelease(message)) {
                    decodeNext.release();
                }
                throw th;
            }
        }
    }

    @VisibleForTesting
    static boolean shouldRelease(Message message) {
        if (message == null || message.body() == null || message.body().byteBuf() == null) {
            return true;
        }
        return message.body().byteBuf() instanceof EmptyByteBuf;
    }

    private void clear() {
        this.curType = Message.Type.UNKNOWN_TYPE;
        this.msgSize = -1;
        this.bodySize = -1;
        this.headerBuf.clear();
    }

    private long decodeFrameSize() {
        if (this.nextFrameSize != -1 || this.totalSize < 9) {
            return this.nextFrameSize;
        }
        ByteBuf first = this.buffers.getFirst();
        if (first.readableBytes() >= 9) {
            this.msgSize = first.readInt();
            this.curType = Message.Type.decode(first);
            this.bodySize = first.readInt();
            this.nextFrameSize = this.msgSize + this.bodySize;
            this.totalSize -= 9;
            if (!first.isReadable()) {
                this.buffers.removeFirst().release();
            }
            return this.nextFrameSize;
        }
        while (this.headerBuf.readableBytes() < 9) {
            ByteBuf first2 = this.buffers.getFirst();
            this.headerBuf.writeBytes(first2, Math.min(first2.readableBytes(), 9 - this.headerBuf.readableBytes()));
            if (!first2.isReadable()) {
                this.buffers.removeFirst().release();
            }
        }
        this.msgSize = this.headerBuf.readInt();
        this.curType = Message.Type.decode(this.headerBuf);
        this.bodySize = this.headerBuf.readInt();
        this.nextFrameSize = this.msgSize + this.bodySize;
        this.totalSize -= 9;
        return this.nextFrameSize;
    }

    private ByteBuf decodeNext() {
        long decodeFrameSize = decodeFrameSize();
        if (decodeFrameSize == -1 || this.totalSize < decodeFrameSize) {
            return null;
        }
        this.nextFrameSize = -1L;
        Preconditions.checkArgument(decodeFrameSize < 2147483647L, "Too large frame: %s", decodeFrameSize);
        Preconditions.checkArgument(decodeFrameSize > 0, "Frame length should be positive: %s", decodeFrameSize);
        int i = (int) decodeFrameSize;
        if (this.buffers.getFirst().readableBytes() >= i) {
            return nextBufferForFrame(i);
        }
        CompositeByteBuf compositeBuffer = this.buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
        while (i > 0) {
            ByteBuf nextBufferForFrame = nextBufferForFrame(i);
            i -= nextBufferForFrame.readableBytes();
            compositeBuffer.addComponent(nextBufferForFrame).writerIndex(compositeBuffer.writerIndex() + nextBufferForFrame.readableBytes());
        }
        if ($assertionsDisabled || i == 0) {
            return compositeBuffer;
        }
        throw new AssertionError();
    }

    private ByteBuf nextBufferForFrame(int i) {
        ByteBuf byteBuf;
        ByteBuf first = this.buffers.getFirst();
        if (first.readableBytes() > i) {
            byteBuf = first.retain().readSlice(i);
            this.totalSize -= i;
        } else {
            byteBuf = first;
            this.buffers.removeFirst();
            this.totalSize -= byteBuf.readableBytes();
        }
        return byteBuf;
    }

    @Override // org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
    }

    @Override // org.apache.uniffle.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.uniffle.shaded.io.netty.channel.ChannelHandler
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        Iterator<ByteBuf> it = this.buffers.iterator();
        while (it.hasNext()) {
            it.next().release();
        }
        this.buffers.clear();
        this.headerBuf.release();
        super.handlerRemoved(channelHandlerContext);
    }

    @Override // org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.uniffle.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.uniffle.shaded.io.netty.channel.ChannelHandler, org.apache.uniffle.shaded.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        super.exceptionCaught(channelHandlerContext, th);
    }

    static {
        $assertionsDisabled = !TransportFrameDecoder.class.desiredAssertionStatus();
    }
}
