package io.hops.hopsworks.expat.migrations.airflow;

import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.FsPermissions;
import io.hops.hopsworks.common.provenance.core.Provenance;
import io.hops.hopsworks.common.provenance.core.dto.ProvCoreDTO;
import io.hops.hopsworks.common.provenance.core.dto.ProvTypeDTO;
import io.hops.hopsworks.common.util.ProcessDescriptor;
import io.hops.hopsworks.common.util.ProcessResult;
import io.hops.hopsworks.expat.configuration.ConfigurationBuilder;
import io.hops.hopsworks.expat.configuration.ExpatConf;
import io.hops.hopsworks.expat.db.DbConnectionFactory;
import io.hops.hopsworks.expat.executor.ProcessExecutor;
import io.hops.hopsworks.expat.migrations.MigrateStep;
import io.hops.hopsworks.expat.migrations.MigrationException;
import io.hops.hopsworks.expat.migrations.RollbackException;
import io.hops.hopsworks.expat.migrations.projects.util.HopsClient;
import io.hops.hopsworks.expat.migrations.projects.util.XAttrException;
import io.hops.hopsworks.expat.migrations.projects.util.XAttrHelper;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.oxm.MediaType;

/* loaded from: input_file:io/hops/hopsworks/expat/migrations/airflow/DagsMigration.class */
public class DagsMigration implements MigrateStep {
    private static final Logger LOGGER = LogManager.getLogger(DagsMigration.class);
    private static final String AIRFLOW_USER = "airflow";
    private static final String AIRFLOW_USER_EMAIL = "airflow@hopsworks.ai";
    private static final String AIRFLOW_DATASET_NAME = "Airflow";
    public static final String README_TEMPLATE = "*This is an auto-generated README.md file for your Dataset!*\nTo replace it, go into your DataSet and edit the README.md file.\n\n*%s* DataSet\n===\n\n## %s";
    private static final String AIRFLOW_DATASET_DESCRIPTION = "Contains airflow dags";
    private Connection connection;
    DistributedFileSystemOps dfso = null;
    String masterPassword = null;
    private String expatPath = null;
    private String hadoopHome = null;
    private String hopsClientUser = null;
    private boolean kubernetesInstalled = false;
    private boolean dryRun;

    private void setup() throws ConfigurationException, SQLException, IOException, MigrationException {
        this.connection = DbConnectionFactory.getConnection();
        Configuration configuration = ConfigurationBuilder.getConfiguration();
        this.expatPath = configuration.getString(ExpatConf.EXPAT_PATH);
        this.hopsClientUser = configuration.getString(ExpatConf.HOPS_CLIENT_USER);
        if (this.hopsClientUser == null) {
            throw new ConfigurationException("hops.client.user cannot be null");
        }
        this.dfso = HopsClient.getDFSO(this.hopsClientUser);
        this.hadoopHome = System.getenv("HADOOP_HOME");
        this.dryRun = configuration.getBoolean(ExpatConf.DRY_RUN);
        this.masterPassword = FileUtils.readFileToString(Paths.get(configuration.getString(ExpatConf.MASTER_PWD_FILE_KEY), new String[0]).toFile(), Charset.defaultCharset());
    }

    @Override // io.hops.hopsworks.expat.migrations.MigrateStep
    public void migrate() throws MigrationException {
        Statement statement = null;
        try {
            try {
                setup();
                statement = this.connection.createStatement();
                ResultSet executeQuery = statement.executeQuery("SELECT project.id, projectname,users.username FROM project JOIN users ON project.username=users.email;");
                while (executeQuery.next()) {
                    String string = executeQuery.getString("projectname");
                    Integer valueOf = Integer.valueOf(executeQuery.getInt("id"));
                    String hdfsUserName = getHdfsUserName(executeQuery.getString("username"), string);
                    String sha256Hex = DigestUtils.sha256Hex(Integer.toString(valueOf.intValue()));
                    createAirflowDataset(valueOf, string, hdfsUserName);
                    addAirflowUserToProject(valueOf);
                    try {
                        ProcessResult execute = ProcessExecutor.getExecutor().execute(new ProcessDescriptor.Builder().addCommand(this.expatPath + "/bin/dags_migrate.sh").addCommand(string).addCommand(sha256Hex).addCommand(hdfsUserName).addCommand(this.hopsClientUser).addCommand(this.hadoopHome).ignoreOutErrStreams(false).setWaitTimeout(30L, TimeUnit.MINUTES).build());
                        if (execute.getExitCode() == 0) {
                            LOGGER.info("Successfully moved dags for project: " + string);
                        } else if (execute.getExitCode() == 2) {
                            LOGGER.info("Dags directory for project: " + string + ", was not configured. So it does not have any dags.");
                        } else {
                            LOGGER.error("Failed to copy dags for project: " + string + " " + execute.getStdout());
                        }
                    } catch (IOException e) {
                        LOGGER.error("Failed to copy dags for project: " + string + " " + e.getMessage());
                    }
                }
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e2) {
                        LOGGER.error("failed to close stmt", e2);
                    }
                }
                close();
            } catch (Exception e3) {
                throw new MigrationException("Error in migration step " + DagsMigration.class.getSimpleName(), e3);
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e4) {
                    LOGGER.error("failed to close stmt", e4);
                }
            }
            close();
            throw th;
        }
    }

    private void addAirflowUserToProject(Integer num) throws Exception {
        Statement statement = null;
        try {
            statement = this.connection.createStatement();
            if (!statement.executeQuery("SELECT project_id FROM project_team WHERE project_id=" + num + " AND team_member='" + AIRFLOW_USER_EMAIL + "';").next()) {
                this.connection.setAutoCommit(false);
                PreparedStatement prepareStatement = this.connection.prepareStatement("INSERT INTO project_team (project_id, team_member, team_role, added) VALUES (?, ? ,?, ?)");
                prepareStatement.setInt(1, num.intValue());
                prepareStatement.setString(2, AIRFLOW_USER_EMAIL);
                prepareStatement.setString(3, "Data scientist");
                prepareStatement.setDate(4, new Date(System.currentTimeMillis()));
                prepareStatement.execute();
                prepareStatement.close();
                this.connection.commit();
                this.connection.setAutoCommit(true);
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error("failed to close stmt", e);
                }
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e2) {
                    LOGGER.error("failed to close stmt", e2);
                }
            }
            throw th;
        }
    }

    private void createAirflowDataset(Integer num, String str, String str2) throws IOException, SQLException, MigrationException {
        Path airflowDatasetPath = getAirflowDatasetPath(str);
        if (this.dfso.exists(airflowDatasetPath) || this.dryRun) {
            LOGGER.info("Airflow dataset already exist for project: " + str);
            return;
        }
        try {
            LOGGER.info("Creating dataset Airflow in " + str);
            this.dfso.mkdir(airflowDatasetPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE, false));
            createAirflowDatasetInDB(num);
            setAirflowDatasetPermissions(num, str, str2);
            createAirflowDatasetReadme(str2, str);
            setAirflowDatasetProvType(num, str);
        } catch (MigrationException | IOException | SQLException e) {
            LOGGER.error("Failed to create the airflow dataset in project: " + str, e);
            if (this.dfso.exists(airflowDatasetPath)) {
                LOGGER.info("Deleting the airflow dataset in project: " + str);
                try {
                    this.dfso.rm(airflowDatasetPath, true);
                } catch (IOException e2) {
                    LOGGER.error("Failed to delete the airflow dataset in project: " + str, e2);
                }
                deleteAirflowDatasetInDB(num);
            }
            throw e;
        }
    }

    private void createAirflowDatasetInDB(Integer num) throws SQLException {
        Statement statement = null;
        try {
            statement = this.connection.createStatement();
            if (!statement.executeQuery("SELECT id FROM dataset WHERE projectId=" + num + " AND inode_name='" + AIRFLOW_DATASET_NAME + "';").next()) {
                this.connection.setAutoCommit(false);
                PreparedStatement prepareStatement = this.connection.prepareStatement("INSERT INTO dataset (inode_name, projectId, description, searchable, permission) VALUES (?, ? ,?, ?, ?)");
                prepareStatement.setString(1, AIRFLOW_DATASET_NAME);
                prepareStatement.setInt(2, num.intValue());
                prepareStatement.setString(3, AIRFLOW_DATASET_DESCRIPTION);
                prepareStatement.setInt(4, 1);
                prepareStatement.setString(5, "EDITABLE");
                prepareStatement.execute();
                prepareStatement.close();
                this.connection.commit();
                this.connection.setAutoCommit(true);
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error("failed to close stmt", e);
                }
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e2) {
                    LOGGER.error("failed to close stmt", e2);
                }
            }
            throw th;
        }
    }

    private void deleteAirflowDatasetInDB(Integer num) throws SQLException {
        PreparedStatement preparedStatement = null;
        try {
            try {
                preparedStatement = this.connection.prepareStatement("DELETE FROM dataset WHERE projectId=? AND inode_name=?");
                preparedStatement.setInt(1, num.intValue());
                preparedStatement.setString(2, AIRFLOW_DATASET_NAME);
                preparedStatement.execute();
                preparedStatement.close();
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        LOGGER.error("failed to close stmt", e);
                    }
                }
            } catch (SQLException e2) {
                LOGGER.error("Failed to delete airflow database in DB", e2);
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e3) {
                        LOGGER.error("failed to close stmt", e3);
                    }
                }
            }
        } catch (Throwable th) {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e4) {
                    LOGGER.error("failed to close stmt", e4);
                }
            }
            throw th;
        }
    }

    private void setAirflowDatasetPermissions(Integer num, String str, String str2) throws IOException, MigrationException, SQLException {
        Path airflowDatasetPath = getAirflowDatasetPath(str);
        this.dfso.setOwner(airflowDatasetPath, str2, getAirflowDatasetGroup(str));
        String airflowDatasetAclGroup = getAirflowDatasetAclGroup(str);
        addGroup(airflowDatasetAclGroup);
        addUserToGroup(str2, airflowDatasetAclGroup);
        this.dfso.setPermission(airflowDatasetPath, getDefaultDatasetAcl(airflowDatasetAclGroup));
        addProjectMembersToAirflowDataset(num, str);
        this.dfso.getFilesystem().modifyAclEntries(airflowDatasetPath, getAirflowAcls());
    }

    private void setAirflowDatasetProvType(Integer num, String str) throws MigrationException {
        ProvCoreDTO provCoreDTO = new ProvCoreDTO(Provenance.Type.META.dto, Long.valueOf(num.longValue()));
        try {
            this.dfso.setMetaStatus(getAirflowDatasetPath(str), Inode.MetaStatus.META_ENABLED);
            Marshaller createMarshaller = jaxbContext().createMarshaller();
            StringWriter stringWriter = new StringWriter();
            createMarshaller.marshal(provCoreDTO, stringWriter);
            XAttrHelper.upsertProvXAttr(this.dfso, "/Projects/" + str, "core", stringWriter.toString().getBytes());
        } catch (JAXBException | XAttrException | IOException e) {
            throw new MigrationException("Error setting airflow dataset provenance", e);
        }
    }

    private void createAirflowDatasetReadme(String str, String str2) {
        Path path = new Path(getAirflowDatasetPath(str2), "README.md");
        String format = String.format(README_TEMPLATE, AIRFLOW_DATASET_NAME, AIRFLOW_DATASET_DESCRIPTION);
        try {
            FSDataOutputStream create = this.dfso.create(path);
            Throwable th = null;
            try {
                try {
                    create.writeBytes(format);
                    create.flush();
                    this.dfso.setPermission(path, FsPermissions.rwxrwx___);
                    this.dfso.setOwner(path, str, getAirflowDatasetGroup(str2));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            LOGGER.info("Failed to create README for project " + str2, e.getMessage());
        }
    }

    private void addProjectMembersToAirflowDataset(Integer num, String str) throws SQLException, IOException {
        Statement statement = null;
        try {
            statement = this.connection.createStatement();
            ResultSet executeQuery = statement.executeQuery("SELECT  username FROM users JOIN project_team WHERE users.email = project_team.team_member AND project_id=" + num);
            while (executeQuery.next()) {
                addUserToGroup(getHdfsUserName(executeQuery.getString("username"), str), getAirflowDatasetGroup(str));
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error("failed to close stmt connection", e);
                }
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e2) {
                    LOGGER.error("failed to close stmt connection", e2);
                }
            }
            throw th;
        }
    }

    private void addGroup(String str) throws IOException {
        try {
            this.dfso.addGroup(str);
        } catch (IOException e) {
            if (!e.getMessage().contains(str + " already exists")) {
                throw e;
            }
            LOGGER.info("Group " + str + " already exist");
        }
    }

    private void addUserToGroup(String str, String str2) throws IOException {
        try {
            this.dfso.addUserToGroup(str, str2);
        } catch (IOException e) {
            if (!e.getMessage().contains(str + " is already part of Group: " + str2)) {
                throw e;
            }
            LOGGER.info(str + " is already part of Group: " + str2);
        }
    }

    private Path getAirflowDatasetPath(String str) {
        return new Path("hdfs:///Projects/" + str + "/" + AIRFLOW_DATASET_NAME);
    }

    private String getHdfsUserName(String str, String str2) {
        return str2 + "__" + str;
    }

    private String getAirflowDatasetGroup(String str) {
        return str + "__" + AIRFLOW_DATASET_NAME;
    }

    private String getAirflowDatasetAclGroup(String str) {
        return getAirflowDatasetGroup(str) + "__read";
    }

    private List<AclEntry> getAirflowAcls() {
        ArrayList arrayList = new ArrayList();
        AclEntry build = new AclEntry.Builder().setType(AclEntryType.USER).setName(AIRFLOW_USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build();
        AclEntry build2 = new AclEntry.Builder().setType(AclEntryType.USER).setName(AIRFLOW_USER).setScope(AclEntryScope.DEFAULT).setPermission(FsAction.READ_EXECUTE).build();
        arrayList.add(build);
        arrayList.add(build2);
        return arrayList;
    }

    private List<AclEntry> getDefaultDatasetAcl(String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AclEntry.Builder().setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
        arrayList.add(new AclEntry.Builder().setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
        arrayList.add(new AclEntry.Builder().setType(AclEntryType.GROUP).setName(str).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build());
        arrayList.add(new AclEntry.Builder().setType(AclEntryType.OTHER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.NONE).build());
        arrayList.add(new AclEntry.Builder().setType(AclEntryType.GROUP).setName(str).setScope(AclEntryScope.DEFAULT).setPermission(FsAction.READ_EXECUTE).build());
        return arrayList;
    }

    private JAXBContext jaxbContext() throws JAXBException {
        HashMap hashMap = new HashMap();
        hashMap.put("eclipselink.json.include-root", false);
        hashMap.put("eclipselink.media-type", MediaType.APPLICATION_JSON);
        return JAXBContextFactory.createContext(new Class[]{ProvCoreDTO.class, ProvTypeDTO.class}, hashMap);
    }

    protected void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (SQLException e) {
                LOGGER.error("failed to close jdbc connection", e);
            }
        }
        if (this.dfso != null) {
            this.dfso.close();
        }
    }

    @Override // io.hops.hopsworks.expat.migrations.MigrateStep
    public void rollback() throws RollbackException {
        try {
            setup();
            projectAirflowDatasetRollback();
            close();
            LOGGER.info("Finished external airflow dags rollback");
        } catch (ConfigurationException | MigrationException | IOException | SQLException e) {
            LOGGER.error("Rollback failed. Could not initialize database connection.");
            close();
            throw new RollbackException("Rollback failed. Could not initialize database connection.", e);
        }
    }

    public void projectAirflowDatasetRollback() throws RollbackException {
        Statement statement = null;
        try {
            try {
                statement = this.connection.createStatement();
                ResultSet executeQuery = statement.executeQuery("SELECT project.id, projectname,users.username FROM project JOIN users ON project.username=users.email;");
                while (executeQuery.next()) {
                    String string = executeQuery.getString("projectname");
                    LOGGER.info("Deleting airflow dataset for project: " + string);
                    this.dfso.rm(getAirflowDatasetPath(string), true);
                }
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        LOGGER.error("failed to close jdbc connection", e);
                    }
                }
            } catch (Throwable th) {
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e2) {
                        LOGGER.error("failed to close jdbc connection", e2);
                    }
                }
                throw th;
            }
        } catch (IOException | SQLException e3) {
            throw new RollbackException("Failed airflow dataset rollback", e3);
        }
    }
}
