package org.apache.uniffle.common.netty.handle;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.uniffle.common.netty.client.RpcResponseCallback;
import org.apache.uniffle.common.netty.protocol.RpcResponse;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.shaded.io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/uniffle/common/netty/handle/TransportResponseHandler.class */
public class TransportResponseHandler extends MessageHandler<RpcResponse> {
    private static final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
    private Channel channel;
    private Map<Long, RpcResponseCallback> outstandingRpcRequests = JavaUtils.newConcurrentMap();
    private final AtomicLong timeOfLastRequestNs = new AtomicLong(0);

    public TransportResponseHandler(Channel channel) {
        this.channel = channel;
    }

    public void addResponseCallback(long j, RpcResponseCallback rpcResponseCallback) {
        updateTimeOfLastRequest();
        if (this.outstandingRpcRequests.containsKey(Long.valueOf(j))) {
            logger.warn("[addRpcRequest] requestId {} already exists!", Long.valueOf(j));
        }
        this.outstandingRpcRequests.put(Long.valueOf(j), rpcResponseCallback);
    }

    public void removeRpcRequest(long j) {
        this.outstandingRpcRequests.remove(Long.valueOf(j));
    }

    @Override // org.apache.uniffle.common.netty.handle.MessageHandler
    public void handle(RpcResponse rpcResponse) throws Exception {
        RpcResponseCallback rpcResponseCallback = this.outstandingRpcRequests.get(Long.valueOf(rpcResponse.getRequestId()));
        if (rpcResponseCallback == null) {
            logger.error("Ignoring response from {} since it is not outstanding, {} {}", new Object[]{NettyUtils.getRemoteAddress(this.channel), rpcResponse.type(), Long.valueOf(rpcResponse.getRequestId())});
        } else {
            rpcResponseCallback.onSuccess(rpcResponse);
        }
    }

    @Override // org.apache.uniffle.common.netty.handle.MessageHandler
    public void channelActive() {
    }

    @Override // org.apache.uniffle.common.netty.handle.MessageHandler
    public void exceptionCaught(Throwable th) {
        if (numOutstandingRequests() > 0) {
            logger.error("Still have {} requests outstanding when connection from {} is closed", Integer.valueOf(numOutstandingRequests()), NettyUtils.getRemoteAddress(this.channel));
            failOutstandingRequests(th);
        }
    }

    @Override // org.apache.uniffle.common.netty.handle.MessageHandler
    public void channelInactive() {
        if (numOutstandingRequests() > 0) {
            String remoteAddress = NettyUtils.getRemoteAddress(this.channel);
            logger.error("Still have {} requests outstanding when connection from {} is closed", Integer.valueOf(numOutstandingRequests()), remoteAddress);
            failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
        }
    }

    public int numOutstandingRequests() {
        return this.outstandingRpcRequests.size();
    }

    private void failOutstandingRequests(Throwable th) {
        Iterator<Map.Entry<Long, RpcResponseCallback>> it = this.outstandingRpcRequests.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().onFailure(th);
            } catch (Exception e) {
                logger.warn("RpcResponseCallback.onFailure throws exception", e);
            }
        }
        this.outstandingRpcRequests.clear();
    }

    public long getTimeOfLastRequestNs() {
        return this.timeOfLastRequestNs.get();
    }

    public void updateTimeOfLastRequest() {
        this.timeOfLastRequestNs.set(System.nanoTime());
    }
}
