package io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.region;

import io.hops.hudi.org.apache.hadoop.hbase.Cell;
import io.hops.hudi.org.apache.hadoop.hbase.HBaseIOException;
import io.hops.hudi.org.apache.hadoop.hbase.HConstants;
import io.hops.hudi.org.apache.hadoop.hbase.Server;
import io.hops.hudi.org.apache.hadoop.hbase.client.Delete;
import io.hops.hudi.org.apache.hadoop.hbase.client.Mutation;
import io.hops.hudi.org.apache.hadoop.hbase.client.Put;
import io.hops.hudi.org.apache.hadoop.hbase.client.Scan;
import io.hops.hudi.org.apache.hadoop.hbase.ipc.RpcCall;
import io.hops.hudi.org.apache.hadoop.hbase.ipc.RpcServer;
import io.hops.hudi.org.apache.hadoop.hbase.log.HBaseMarkers;
import io.hops.hudi.org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import io.hops.hudi.org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import io.hops.hudi.org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
import io.hops.hudi.org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import io.hops.hudi.org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import io.hops.hudi.org.apache.hadoop.hbase.master.region.MasterRegion;
import io.hops.hudi.org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.Procedure;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import io.hops.hudi.org.apache.hadoop.hbase.regionserver.RegionScanner;
import io.hops.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import io.hops.hudi.org.apache.hadoop.hbase.util.Bytes;
import io.hops.hudi.org.apache.hadoop.hbase.util.CommonFSUtils;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import io.hops.hudi.org.apache.hbase.thirdparty.com.google.common.collect.UnmodifiableIterator;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:io/hops/hudi/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.class */
public class RegionProcedureStore extends ProcedureStoreBase {
    private final Server server;
    private final LeaseRecovery leaseRecovery;
    final MasterRegion region;
    private int numThreads;
    private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
    static final byte[] PROC_QUALIFIER = Bytes.toBytes(DebeziumConstants.DELETE_OP);
    private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES = ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class, MoveRegionProcedure.class);

    public RegionProcedureStore(Server server, MasterRegion masterRegion, LeaseRecovery leaseRecovery) {
        this.server = server;
        this.region = masterRegion;
        this.leaseRecovery = leaseRecovery;
    }

    public void start(int i) throws IOException {
        if (setRunning(true)) {
            LOG.info("Starting the Region Procedure Store, number threads={}", Integer.valueOf(i));
            this.numThreads = i;
        }
    }

    public void stop(boolean z) {
        if (setRunning(false)) {
            LOG.info("Stopping the Region Procedure Store, isAbort={}", Boolean.valueOf(z));
        }
    }

    public int getNumThreads() {
        return this.numThreads;
    }

    public int setRunningProcedureCount(int i) {
        return i;
    }

    private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> map) throws HBaseIOException {
        UnmodifiableIterator<Class<?>> it = UNSUPPORTED_PROCEDURES.iterator();
        while (it.hasNext()) {
            Class<?> next = it.next();
            List<Procedure<?>> list = map.get(next);
            if (list != null) {
                LOG.error("Unsupported procedure type {} found, please rollback your master to the old version to finish them, and then try to upgrade again. See https://hbase.apache.org/book.html#upgrade2.2 for more details. The full procedure list: {}", next, list);
                throw new HBaseIOException("Unsupported procedure type " + next + " found");
            }
        }
        if (map.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream().map(procedure -> {
            return (ServerCrashProcedure) procedure;
        }).anyMatch((v0) -> {
            return v0.isInRecoverMetaState();
        })) {
            LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure, which is not supported any more. Please rollback your master to the old version to finish them, and then try to upgrade again. See https://hbase.apache.org/book.html#upgrade2.2 for more details.");
            throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
        }
    }

    private void tryMigrate(FileSystem fileSystem) throws IOException {
        Configuration configuration = this.server.getConfiguration();
        Path path = new Path(CommonFSUtils.getWALRootDir(configuration), "MasterProcWALs");
        if (fileSystem.exists(path)) {
            LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", path);
            WALProcedureStore wALProcedureStore = new WALProcedureStore(configuration, this.leaseRecovery);
            wALProcedureStore.start(this.numThreads);
            wALProcedureStore.recoverLease();
            final MutableLong mutableLong = new MutableLong(-1L);
            final ArrayList<Procedure<?>> arrayList = new ArrayList();
            final HashMap hashMap = new HashMap();
            wALProcedureStore.load(new ProcedureStore.ProcedureLoader() { // from class: io.hops.hudi.org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore.1
                public void setMaxProcId(long j) {
                    mutableLong.setValue(j);
                }

                public void load(ProcedureStore.ProcedureIterator procedureIterator) throws IOException {
                    while (procedureIterator.hasNext()) {
                        Procedure next = procedureIterator.next();
                        arrayList.add(next);
                        if (!next.isFinished()) {
                            ((List) hashMap.computeIfAbsent(next.getClass(), cls -> {
                                return new ArrayList();
                            })).add(next);
                        }
                    }
                }

                public void handleCorrupted(ProcedureStore.ProcedureIterator procedureIterator) throws IOException {
                    long j;
                    long j2 = 0;
                    while (true) {
                        j = j2;
                        if (!procedureIterator.hasNext()) {
                            break;
                        }
                        RegionProcedureStore.LOG.error("Corrupted procedure {}", procedureIterator.next());
                        j2 = j + 1;
                    }
                    if (j > 0) {
                        throw new IOException("There are " + j + " corrupted procedures when migrating from the old WAL based store to the new region based store, please fix them before upgrading again.");
                    }
                }
            });
            checkUnsupportedProcedure(hashMap);
            MutableLong mutableLong2 = new MutableLong(-1L);
            for (Procedure<?> procedure : arrayList) {
                update(procedure);
                if (procedure.getProcId() > mutableLong2.longValue()) {
                    mutableLong2.setValue(procedure.getProcId());
                }
            }
            LOG.info("Migrated {} existing procedures from the old storage format.", Integer.valueOf(arrayList.size()));
            LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", Long.valueOf(mutableLong.longValue()), Long.valueOf(mutableLong2.longValue()));
            if (mutableLong.longValue() > mutableLong2.longValue()) {
                if (mutableLong.longValue() > 0) {
                    this.region.update(hRegion -> {
                        hRegion.put(new Put(Bytes.toBytes(mutableLong.longValue())).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY));
                    });
                }
            } else if (mutableLong.longValue() < mutableLong2.longValue()) {
                LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
            }
            wALProcedureStore.stop(false);
            if (!fileSystem.delete(path, true)) {
                throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " + path);
            }
            LOG.info("Migration of WALProcedureStore finished");
        }
    }

    public void recoverLease() throws IOException {
        LOG.info("Starting Region Procedure Store lease recovery...");
        tryMigrate(CommonFSUtils.getWALFileSystem(this.server.getConfiguration()));
    }

    public void load(ProcedureStore.ProcedureLoader procedureLoader) throws IOException {
        boolean next;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        RegionScanner scanner = this.region.getScanner(new Scan().addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER));
        Throwable th = null;
        try {
            try {
                ArrayList arrayList2 = new ArrayList();
                do {
                    next = scanner.next(arrayList2);
                    if (!arrayList2.isEmpty()) {
                        Cell cell = arrayList2.get(0);
                        arrayList2.clear();
                        j = Math.max(j, Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
                        if (cell.getValueLength() > 0) {
                            arrayList.add(ProcedureProtos.Procedure.parser().parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                        }
                    }
                } while (next);
                if (scanner != null) {
                    if (0 != 0) {
                        try {
                            scanner.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scanner.close();
                    }
                }
                procedureLoader.setMaxProcId(j);
                ProcedureTree build = ProcedureTree.build(arrayList);
                procedureLoader.load(build.getValidProcs());
                procedureLoader.handleCorrupted(build.getCorruptedProcs());
            } finally {
            }
        } catch (Throwable th3) {
            if (scanner != null) {
                if (th != null) {
                    try {
                        scanner.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scanner.close();
                }
            }
            throw th3;
        }
    }

    private void serializePut(Procedure<?> procedure, List<Mutation> list, List<byte[]> list2) throws IOException {
        ProcedureProtos.Procedure convertToProtoProcedure = ProcedureUtil.convertToProtoProcedure(procedure);
        byte[] bytes = Bytes.toBytes(procedure.getProcId());
        list.add(new Put(bytes).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, convertToProtoProcedure.toByteArray()));
        list2.add(bytes);
    }

    private void serializeDelete(long j, List<Mutation> list, List<byte[]> list2) {
        byte[] bytes = Bytes.toBytes(j);
        list.add(new Put(bytes).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY));
        list2.add(bytes);
    }

    private void runWithoutRpcCall(Runnable runnable) {
        Optional<RpcCall> unsetCurrentCall = RpcServer.unsetCurrentCall();
        try {
            runnable.run();
        } finally {
            unsetCurrentCall.ifPresent(RpcServer::setCurrentCall);
        }
    }

    public void insert(Procedure<?> procedure, Procedure<?>[] procedureArr) {
        if (procedureArr == null || procedureArr.length == 0) {
            update(procedure);
            return;
        }
        ArrayList arrayList = new ArrayList(procedureArr.length + 1);
        ArrayList arrayList2 = new ArrayList(procedureArr.length + 1);
        runWithoutRpcCall(() -> {
            try {
                serializePut(procedure, arrayList, arrayList2);
                for (Procedure procedure2 : procedureArr) {
                    serializePut(procedure2, arrayList, arrayList2);
                }
                this.region.update(hRegion -> {
                    hRegion.mutateRowsWithLocks(arrayList, arrayList2, 0L, 0L);
                });
            } catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", new Object[]{procedure, Arrays.toString(procedureArr), e});
                throw new UncheckedIOException(e);
            }
        });
    }

    public void insert(Procedure<?>[] procedureArr) {
        ArrayList arrayList = new ArrayList(procedureArr.length);
        ArrayList arrayList2 = new ArrayList(procedureArr.length);
        runWithoutRpcCall(() -> {
            try {
                for (Procedure procedure : procedureArr) {
                    serializePut(procedure, arrayList, arrayList2);
                }
                this.region.update(hRegion -> {
                    hRegion.mutateRowsWithLocks(arrayList, arrayList2, 0L, 0L);
                });
            } catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procedureArr), e);
                throw new UncheckedIOException(e);
            }
        });
    }

    public void update(Procedure<?> procedure) {
        runWithoutRpcCall(() -> {
            try {
                ProcedureProtos.Procedure convertToProtoProcedure = ProcedureUtil.convertToProtoProcedure(procedure);
                this.region.update(hRegion -> {
                    hRegion.put(new Put(Bytes.toBytes(procedure.getProcId())).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, convertToProtoProcedure.toByteArray()));
                });
            } catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", procedure, e);
                throw new UncheckedIOException(e);
            }
        });
    }

    public void delete(long j) {
        try {
            this.region.update(hRegion -> {
                hRegion.put(new Put(Bytes.toBytes(j)).addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY));
            });
        } catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", Long.valueOf(j), e);
            throw new UncheckedIOException(e);
        }
    }

    public void delete(Procedure<?> procedure, long[] jArr) {
        ArrayList arrayList = new ArrayList(jArr.length + 1);
        ArrayList arrayList2 = new ArrayList(jArr.length + 1);
        try {
            serializePut(procedure, arrayList, arrayList2);
            for (long j : jArr) {
                serializeDelete(j, arrayList, arrayList2);
            }
            this.region.update(hRegion -> {
                hRegion.mutateRowsWithLocks(arrayList, arrayList2, 0L, 0L);
            });
        } catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", new Object[]{procedure, Arrays.toString(jArr), e});
            throw new UncheckedIOException(e);
        }
    }

    public void delete(long[] jArr, int i, int i2) {
        if (i2 == 0) {
            return;
        }
        if (i2 == 1) {
            delete(jArr[i]);
            return;
        }
        ArrayList arrayList = new ArrayList(i2);
        ArrayList arrayList2 = new ArrayList(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            serializeDelete(jArr[i + i3], arrayList, arrayList2);
        }
        try {
            this.region.update(hRegion -> {
                hRegion.mutateRowsWithLocks(arrayList, arrayList2, 0L, 0L);
            });
        } catch (IOException e) {
            LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(jArr), e);
            throw new UncheckedIOException(e);
        }
    }

    public void cleanup() {
        ArrayList arrayList = new ArrayList();
        try {
            RegionScanner scanner = this.region.getScanner(new Scan().addColumn(MasterRegionFactory.PROC_FAMILY, PROC_QUALIFIER).setReversed(true));
            Throwable th = null;
            try {
                try {
                    boolean next = scanner.next(arrayList);
                    if (arrayList.isEmpty()) {
                        if (scanner != null) {
                            if (0 == 0) {
                                scanner.close();
                                return;
                            }
                            try {
                                scanner.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    arrayList.clear();
                    while (next) {
                        next = scanner.next(arrayList);
                        if (!arrayList.isEmpty()) {
                            Cell cell = (Cell) arrayList.get(0);
                            arrayList.clear();
                            if (cell.getValueLength() == 0) {
                                this.region.update(hRegion -> {
                                    hRegion.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
                                });
                            }
                        }
                    }
                    if (scanner != null) {
                        if (0 != 0) {
                            try {
                                scanner.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            scanner.close();
                        }
                    }
                    return;
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Failed to clean up delete procedures", e);
        }
        LOG.warn("Failed to clean up delete procedures", e);
    }
}
