/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.shuffle.handle;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.shuffle.handle.ShuffleHandleInfoBase;
import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.shaded.org.apache.commons.collections4.CollectionUtils;
import org.apache.uniffle.shaded.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MutableShuffleHandleInfo
extends ShuffleHandleInfoBase {
    private static final long serialVersionUID = 0L;
    private static final Logger LOGGER = LoggerFactory.getLogger(MutableShuffleHandleInfo.class);
    private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers;
    private Map<String, Set<ShuffleServerInfo>> excludedServerToReplacements;
    private Map<Integer, Map<String, Set<ShuffleServerInfo>>> excludedServerForPartitionToReplacements;

    public MutableShuffleHandleInfo(int shuffleId, Map<Integer, List<ShuffleServerInfo>> partitionToServers, RemoteStorageInfo storageInfo) {
        this(shuffleId, storageInfo, MutableShuffleHandleInfo.toPartitionReplicaMapping(partitionToServers));
    }

    @VisibleForTesting
    protected MutableShuffleHandleInfo(int shuffleId, RemoteStorageInfo storageInfo, Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers) {
        super(shuffleId, storageInfo);
        this.excludedServerToReplacements = new HashMap<String, Set<ShuffleServerInfo>>();
        this.excludedServerForPartitionToReplacements = new HashMap<Integer, Map<String, Set<ShuffleServerInfo>>>();
        this.partitionReplicaAssignedServers = partitionReplicaAssignedServers;
    }

    public MutableShuffleHandleInfo(int shuffleId, RemoteStorageInfo storageInfo) {
        super(shuffleId, storageInfo);
    }

    private static Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
        HashMap<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers = new HashMap<Integer, Map<Integer, List<ShuffleServerInfo>>>();
        for (Map.Entry<Integer, List<ShuffleServerInfo>> partitionEntry : partitionToServers.entrySet()) {
            int partitionId = partitionEntry.getKey();
            Map replicaMapping = partitionReplicaAssignedServers.computeIfAbsent(partitionId, x -> new HashMap());
            List<ShuffleServerInfo> replicaServers = partitionEntry.getValue();
            for (int i = 0; i < replicaServers.size(); ++i) {
                int replicaIdx = i;
                replicaMapping.computeIfAbsent(replicaIdx, x -> new ArrayList()).add(replicaServers.get(i));
            }
        }
        return partitionReplicaAssignedServers;
    }

    public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
        return this.excludedServerToReplacements.get(faultyServerId);
    }

    public Set<ShuffleServerInfo> getReplacementsForPartition(int partitionId, String excludedServerId) {
        return this.excludedServerForPartitionToReplacements.getOrDefault(partitionId, Collections.emptyMap()).getOrDefault(excludedServerId, Collections.emptySet());
    }

    public Set<ShuffleServerInfo> updateAssignment(int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
        if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
            return Collections.emptySet();
        }
        this.excludedServerToReplacements.put(receivingFailureServerId, replacements);
        return this.updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
    }

    private Set<ShuffleServerInfo> updateAssignmentInternal(int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
        HashSet<ShuffleServerInfo> updatedServers = new HashSet<ShuffleServerInfo>();
        Map<Integer, List<ShuffleServerInfo>> replicaServers = this.partitionReplicaAssignedServers.get(partitionId);
        for (Map.Entry<Integer, List<ShuffleServerInfo>> serverEntry : replicaServers.entrySet()) {
            List<ShuffleServerInfo> servers = serverEntry.getValue();
            if (!servers.stream().map(x -> x.getId()).collect(Collectors.toSet()).contains(receivingFailureServerId)) continue;
            HashSet<ShuffleServerInfo> tempSet = new HashSet<ShuffleServerInfo>();
            tempSet.addAll(replacements);
            tempSet.removeAll(servers);
            if (!CollectionUtils.isNotEmpty(tempSet)) continue;
            updatedServers.addAll(tempSet);
            servers.addAll(tempSet);
        }
        return updatedServers;
    }

    public Set<ShuffleServerInfo> updateAssignmentOnPartitionSplit(int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
        if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
            return Collections.emptySet();
        }
        this.excludedServerForPartitionToReplacements.computeIfAbsent(partitionId, x -> new HashMap()).put(receivingFailureServerId, replacements);
        return this.updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
    }

    @Override
    public Set<ShuffleServerInfo> getServers() {
        return this.partitionReplicaAssignedServers.values().stream().flatMap(x -> x.values().stream().flatMap(k -> k.stream())).collect(Collectors.toSet());
    }

    @Override
    public Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWriter() {
        HashMap<Integer, List<ShuffleServerInfo>> assignment = new HashMap<Integer, List<ShuffleServerInfo>>();
        for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry : this.partitionReplicaAssignedServers.entrySet()) {
            int partitionId = entry.getKey();
            Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
            for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry : replicaServers.entrySet()) {
                int candidateSize = replicaServerEntry.getValue().size();
                ShuffleServerInfo candidate = replicaServerEntry.getValue().get(candidateSize - 1);
                assignment.computeIfAbsent(partitionId, x -> new ArrayList()).add(candidate);
            }
        }
        return assignment;
    }

    @Override
    public Map<Integer, List<ShuffleServerInfo>> getAllPartitionServersForReader() {
        HashMap<Integer, List<ShuffleServerInfo>> assignment = new HashMap<Integer, List<ShuffleServerInfo>>();
        for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry : this.partitionReplicaAssignedServers.entrySet()) {
            int partitionId = entry.getKey();
            Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
            for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry : replicaServers.entrySet()) {
                assignment.computeIfAbsent(partitionId, x -> new ArrayList()).addAll((Collection)replicaServerEntry.getValue());
            }
        }
        return assignment;
    }

    @Override
    public PartitionDataReplicaRequirementTracking createPartitionReplicaTracking() {
        PartitionDataReplicaRequirementTracking replicaRequirement = new PartitionDataReplicaRequirementTracking(this.shuffleId, this.partitionReplicaAssignedServers);
        return replicaRequirement;
    }

    public Set<String> listExcludedServers() {
        return this.excludedServerToReplacements.keySet();
    }

    public void checkPartitionReassignServerNum(Set<Integer> partitionIds, int legalReassignServerNum) {
        for (int partitionId : partitionIds) {
            Map<Integer, List<ShuffleServerInfo>> replicas = this.partitionReplicaAssignedServers.get(partitionId);
            for (List<ShuffleServerInfo> servers : replicas.values()) {
                if (servers.size() - 1 <= legalReassignServerNum) continue;
                throw new RssException("Illegal reassignment servers for partitionId: " + partitionId + " that exceeding the max legal reassign server num: " + legalReassignServerNum);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static RssProtos.MutableShuffleHandleInfo toProto(MutableShuffleHandleInfo handleInfo) {
        MutableShuffleHandleInfo mutableShuffleHandleInfo = handleInfo;
        synchronized (mutableShuffleHandleInfo) {
            HashMap<Integer, RssProtos.PartitionReplicaServers> partitionToServers = new HashMap<Integer, RssProtos.PartitionReplicaServers>();
            for (Map.Entry<Integer, Map<Integer, List<ShuffleServerInfo>>> entry : handleInfo.partitionReplicaAssignedServers.entrySet()) {
                int partitionId = entry.getKey();
                HashMap<Integer, RssProtos.ReplicaServersItem> replicaServersProto = new HashMap<Integer, RssProtos.ReplicaServersItem>();
                Map<Integer, List<ShuffleServerInfo>> replicaServers = entry.getValue();
                for (Map.Entry<Integer, List<ShuffleServerInfo>> replicaServerEntry : replicaServers.entrySet()) {
                    RssProtos.ReplicaServersItem item = RssProtos.ReplicaServersItem.newBuilder().addAllServerId(ShuffleServerInfo.toProto(replicaServerEntry.getValue())).build();
                    replicaServersProto.put(replicaServerEntry.getKey(), item);
                }
                RssProtos.PartitionReplicaServers partitionReplicaServerProto = RssProtos.PartitionReplicaServers.newBuilder().putAllReplicaServers(replicaServersProto).build();
                partitionToServers.put(partitionId, partitionReplicaServerProto);
            }
            RssProtos.MutableShuffleHandleInfo handleProto = RssProtos.MutableShuffleHandleInfo.newBuilder().setShuffleId(handleInfo.shuffleId).setRemoteStorageInfo(RssProtos.RemoteStorageInfo.newBuilder().setPath(handleInfo.remoteStorage.getPath()).putAllConfItems(handleInfo.remoteStorage.getConfItems()).build()).putAllPartitionToServers(partitionToServers).build();
            return handleProto;
        }
    }

    public static MutableShuffleHandleInfo fromProto(RssProtos.MutableShuffleHandleInfo handleProto) {
        if (handleProto == null) {
            return null;
        }
        HashMap<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionToServers = new HashMap<Integer, Map<Integer, List<ShuffleServerInfo>>>();
        for (Map.Entry<Integer, RssProtos.PartitionReplicaServers> entry : handleProto.getPartitionToServersMap().entrySet()) {
            Map replicaServers = partitionToServers.computeIfAbsent(entry.getKey(), x -> new HashMap());
            for (Map.Entry<Integer, RssProtos.ReplicaServersItem> serverEntry : entry.getValue().getReplicaServersMap().entrySet()) {
                int replicaIdx = serverEntry.getKey();
                List<ShuffleServerInfo> shuffleServerInfos = ShuffleServerInfo.fromProto(serverEntry.getValue().getServerIdList());
                replicaServers.put(replicaIdx, shuffleServerInfos);
            }
        }
        RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo(handleProto.getRemoteStorageInfo().getPath(), handleProto.getRemoteStorageInfo().getConfItemsMap());
        MutableShuffleHandleInfo handle = new MutableShuffleHandleInfo(handleProto.getShuffleId(), remoteStorageInfo);
        handle.partitionReplicaAssignedServers = partitionToServers;
        return handle;
    }

    public Set<String> listExcludedServersForPartition(int partitionId) {
        return this.excludedServerForPartitionToReplacements.getOrDefault(partitionId, Collections.emptyMap()).keySet();
    }
}

