package se.sics.kompics.network.netty;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.udt.UdtChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.MDC;
import se.sics.kompics.network.Address;
import se.sics.kompics.network.MessageNotify;
import se.sics.kompics.network.Msg;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:se/sics/kompics/network/netty/MessageQueueManager.class */
public class MessageQueueManager {
    private final NettyNetwork component;
    private final HashMap<InetSocketAddress, Queue<MessageWrapper>> tcpDelays = new HashMap<>();
    private final HashMap<InetSocketAddress, Queue<MessageWrapper>> udtDelays = new HashMap<>();
    private final ConcurrentHashMap<UUID, MessageNotify.Req> awaitingDelivery = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:se/sics/kompics/network/netty/MessageQueueManager$NotifyListener.class */
    public class NotifyListener implements ChannelFutureListener {
        public final MessageNotify.Req notify;

        NotifyListener(MessageNotify.Req req) {
            this.notify = req;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            MessageQueueManager.this.component.setCustomMDC();
            try {
                if (channelFuture.isSuccess()) {
                    this.notify.prepareResponse(System.currentTimeMillis(), true, System.nanoTime());
                    if (this.notify.notifyOfDelivery) {
                        MessageQueueManager.this.awaitingDelivery.put(this.notify.getMsgId(), this.notify);
                    }
                } else {
                    MessageQueueManager.this.component.extLog.warn("Sending of message {} did not succeed :( : {}", this.notify.msg, channelFuture.cause());
                    this.notify.prepareResponse(System.currentTimeMillis(), false, System.nanoTime());
                }
                MessageQueueManager.this.component.notify(this.notify);
            } finally {
                MDC.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueueManager(NettyNetwork nettyNetwork) {
        this.component = nettyNetwork;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(Msg msg) {
        send(new MessageWrapper(msg));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(MessageNotify.Req req) {
        send(new MessageWrapper(req));
    }

    private void send(MessageWrapper messageWrapper) {
        switch (messageWrapper.msg.getProtocol()) {
            case TCP:
                sendTCP(messageWrapper);
                return;
            case UDT:
                sendUDT(messageWrapper);
                return;
            case UDP:
                ChannelFuture sendUdpMessage = this.component.sendUdpMessage(messageWrapper);
                if (messageWrapper.notify.isPresent()) {
                    if (sendUdpMessage != null) {
                        sendUdpMessage.addListener2((GenericFutureListener<? extends Future<? super Void>>) new NotifyListener(messageWrapper.notify.get()));
                        return;
                    } else {
                        messageWrapper.notify.get().prepareResponse(System.currentTimeMillis(), false, System.nanoTime());
                        this.component.notify(messageWrapper.notify.get());
                        return;
                    }
                }
                return;
            default:
                throw new Error("Unknown Transport type");
        }
    }

    private void sendTCP(MessageWrapper messageWrapper) {
        Address destination = messageWrapper.msg.getDestination();
        Queue<MessageWrapper> queue = this.tcpDelays.get(destination.asSocket());
        if (queue != null) {
            this.component.extLog.debug("Delaying message while establishing connection: {}", messageWrapper);
            queue.add(messageWrapper);
            return;
        }
        SocketChannel tCPChannel = this.component.channels.getTCPChannel(destination);
        if (tCPChannel == null) {
            tCPChannel = this.component.channels.createTCPChannel(destination, this.component.bootstrapTCPClient);
        }
        if (tCPChannel == null) {
            LinkedList linkedList = new LinkedList();
            this.tcpDelays.put(destination.asSocket(), linkedList);
            this.component.extLog.debug("Delaying message while establishing connection: {}", messageWrapper);
            linkedList.add(messageWrapper);
            return;
        }
        this.component.extLog.debug("Sending message {}. Local {}, Remote {}", messageWrapper, tCPChannel.localAddress(), tCPChannel.remoteAddress());
        ChannelFuture writeAndFlush = tCPChannel.writeAndFlush(messageWrapper);
        if (messageWrapper.notify.isPresent()) {
            writeAndFlush.addListener2((GenericFutureListener<? extends Future<? super Void>>) new NotifyListener(messageWrapper.notify.get()));
        }
    }

    private void sendUDT(MessageWrapper messageWrapper) {
        Address destination = messageWrapper.msg.getDestination();
        Queue<MessageWrapper> queue = this.udtDelays.get(destination.asSocket());
        if (queue != null) {
            this.component.extLog.debug("Delaying message while establishing connection: {}", messageWrapper);
            queue.add(messageWrapper);
            return;
        }
        UdtChannel uDTChannel = this.component.channels.getUDTChannel(destination);
        if (uDTChannel == null) {
            uDTChannel = this.component.channels.createUDTChannel(destination, this.component.bootstrapUDTClient);
        }
        if (uDTChannel == null) {
            LinkedList linkedList = new LinkedList();
            this.udtDelays.put(destination.asSocket(), linkedList);
            this.component.extLog.debug("Delaying message while establishing connection: {}", messageWrapper);
            linkedList.add(messageWrapper);
            return;
        }
        this.component.extLog.debug("Sending message {}. Local {}, Remote {}", messageWrapper, uDTChannel.localAddress(), uDTChannel.remoteAddress());
        ChannelFuture writeAndFlush = uDTChannel.writeAndFlush(messageWrapper);
        if (messageWrapper.notify.isPresent()) {
            writeAndFlush.addListener2((GenericFutureListener<? extends Future<? super Void>>) new NotifyListener(messageWrapper.notify.get()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retry(SendDelayed sendDelayed) {
        this.component.extLog.info("Trying to send delayed messages: {} on {}", sendDelayed.peer, sendDelayed.protocol);
        Address address = sendDelayed.peer;
        switch (sendDelayed.protocol) {
            case TCP:
                retryTCP(address);
                return;
            case UDT:
                retryUDT(address);
                return;
            default:
                return;
        }
    }

    private void retryTCP(Address address) {
        Queue<MessageWrapper> queue = this.tcpDelays.get(address.asSocket());
        if (queue == null) {
            return;
        }
        SocketChannel tCPChannel = this.component.channels.getTCPChannel(address);
        if (tCPChannel == null) {
            this.component.extLog.warn("Connection to {} still not available. Not retrying anything.", address);
            return;
        }
        while (!queue.isEmpty()) {
            MessageWrapper poll = queue.poll();
            this.component.extLog.debug("Sending message {}. Local {}, Remote {}", poll, tCPChannel.localAddress(), tCPChannel.remoteAddress());
            ChannelFuture write = tCPChannel.write(poll);
            if (poll.notify.isPresent()) {
                write.addListener2((GenericFutureListener<? extends Future<? super Void>>) new NotifyListener(poll.notify.get()));
            }
        }
        tCPChannel.flush();
        this.tcpDelays.remove(address.asSocket());
    }

    private void retryUDT(Address address) {
        Queue<MessageWrapper> queue = this.udtDelays.get(address.asSocket());
        if (queue == null) {
            return;
        }
        UdtChannel uDTChannel = this.component.channels.getUDTChannel(address);
        if (uDTChannel == null) {
            this.component.extLog.warn("Connection to {} still not available. Not retrying anything.", address);
            return;
        }
        while (!queue.isEmpty()) {
            MessageWrapper poll = queue.poll();
            this.component.extLog.debug("Sending message {}. Local {}, Remote {}", poll, uDTChannel.localAddress(), uDTChannel.remoteAddress());
            ChannelFuture write = uDTChannel.write(poll);
            if (poll.notify.isPresent()) {
                write.addListener2((GenericFutureListener<? extends Future<? super Void>>) new NotifyListener(poll.notify.get()));
            }
        }
        uDTChannel.flush();
        this.udtDelays.remove(address.asSocket());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void drop(DropDelayed dropDelayed) {
        Queue<MessageWrapper> remove;
        Address address = dropDelayed.peer;
        switch (dropDelayed.protocol) {
            case TCP:
                remove = this.tcpDelays.remove(address.asSocket());
                break;
            case UDT:
                remove = this.udtDelays.remove(address.asSocket());
                break;
            default:
                return;
        }
        if (remove == null) {
            return;
        }
        for (MessageWrapper messageWrapper : remove) {
            this.component.extLog.warn("Dropping message {} (with notify) because connection could not be established.", messageWrapper);
            if (messageWrapper.notify.isPresent()) {
                MessageNotify.Req req = messageWrapper.notify.get();
                req.prepareResponse(System.currentTimeMillis(), false, System.nanoTime());
                this.component.notify(req);
            }
        }
        remove.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ack(NotifyAck notifyAck) {
        MessageNotify.Req remove = this.awaitingDelivery.remove(notifyAck.id);
        if (remove != null) {
            this.component.notify(remove, remove.deliveryResponse(System.currentTimeMillis(), true, System.nanoTime()));
        } else {
            this.component.extLog.warn("Could not find MessageNotify.Req with id: {}!", notifyAck.id);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        this.component.extLog.info("Cleaning message queues.");
        this.tcpDelays.clear();
        this.udtDelays.clear();
        this.awaitingDelivery.clear();
    }
}
