package io.hops.kafka;

import io.hops.kafka.authorizer.tables.HopsAcl;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import kafka.network.RequestChannel;
import kafka.security.auth.Acl;
import kafka.security.auth.Authorizer;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
import kafka.security.auth.ResourceType$;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/hops/kafka/HopsAclAuthorizer.class */
public class HopsAclAuthorizer implements Authorizer {
    private static final Logger LOG = Logger.getLogger("kafka.authorizer.logger");
    private Set<KafkaPrincipal> superUsers;
    DbConnection dbConnection;
    final Map<String, Map<String, List<HopsAcl>>> acls;
    private final String sqlExceptionPattern = "HikariDataSource.+has been closed.";
    private final Pattern r;

    public HopsAclAuthorizer() {
        this.superUsers = new HashSet();
        this.sqlExceptionPattern = "HikariDataSource.+has been closed.";
        this.r = Pattern.compile("HikariDataSource.+has been closed.");
        this.acls = new HashMap();
    }

    public HopsAclAuthorizer(Map<String, Map<String, List<HopsAcl>>> map) {
        this.superUsers = new HashSet();
        this.sqlExceptionPattern = "HikariDataSource.+has been closed.";
        this.r = Pattern.compile("HikariDataSource.+has been closed.");
        this.acls = map;
    }

    public void configure(Map<String, ?> map) {
        Object obj = map.get(Consts.SUPERUSERS_PROP);
        if (obj != null) {
            for (String str : ((String) obj).split(Consts.SEMI_COLON)) {
                this.superUsers.add(KafkaPrincipal.fromString(str.trim()));
            }
        } else {
            this.superUsers = new HashSet();
        }
        try {
            this.dbConnection = new DbConnection(map.get(Consts.DATABASE_URL).toString(), map.get(Consts.DATABASE_USERNAME).toString(), map.get(Consts.DATABASE_PASSWORD).toString(), Integer.parseInt(map.get(Consts.DATABASE_MAX_POOL_SIZE).toString()), map.get(Consts.DATABASE_CACHE_PREPSTMTS).toString(), map.get(Consts.DATABASE_PREPSTMT_CACHE_SIZE).toString(), map.get(Consts.DATABASE_PREPSTMT_CACHE_SQL_LIMIT).toString());
        } catch (SQLException e) {
            LOG.error("HopsAclAuthorizer could not connect to database at:" + map.get(Consts.DATABASE_URL).toString(), e);
        }
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                try {
                    updateAclCache();
                    Thread.sleep(Long.parseLong(String.valueOf(map.get(Consts.DATABASE_ACL_POLLING_FREQUENCY_MS))));
                } catch (InterruptedException e2) {
                    LOG.error("HopsAclAuthorizer db polling exception", e2);
                    this.acls.clear();
                }
            }
        });
    }

    public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
        Map<String, List<HopsAcl>> map;
        KafkaPrincipal principal = session.principal();
        String hostAddress = session.clientAddress().getHostAddress();
        String name = resource.name();
        String name2 = principal.getName();
        LOG.debug("authorize :: session:" + session);
        LOG.debug("authorize :: principal.name:" + principal.getName());
        LOG.debug("authorize :: principal.type:" + principal.getPrincipalType());
        LOG.debug("authorize :: operation:" + operation);
        LOG.debug("authorize :: host:" + hostAddress);
        LOG.debug("authorize :: resource:" + resource);
        LOG.debug("authorize :: topicName:" + name);
        LOG.debug("authorize :: projectName__userName:" + name2);
        if (name2.equalsIgnoreCase(Consts.ANONYMOUS)) {
            LOG.info("No Acl found for cluster authorization, user:" + name2);
            return false;
        }
        if (isSuperUser(principal)) {
            return true;
        }
        if (resource.resourceType().equals(ResourceType$.MODULE$.fromString(Consts.CLUSTER))) {
            LOG.info("This is cluster authorization for broker: " + name2);
            return false;
        }
        if (resource.resourceType().equals(ResourceType$.MODULE$.fromString(Consts.GROUP))) {
            String str = name2.split(Consts.PROJECT_USER_DELIMITER)[0];
            if (resource.name().contains(Consts.PROJECT_USER_DELIMITER)) {
                String str2 = resource.name().split(Consts.PROJECT_USER_DELIMITER)[0];
                LOG.debug("Consumer group :: projectCN:" + str);
                LOG.debug("Consumer group :: projectConsumerGroup:" + str2);
                if (!str.equals(str2)) {
                    LOG.info("Principal:" + name2 + " is not allowed to access group:" + resource.name());
                    return false;
                }
            }
            LOG.info("Principal:" + name2 + " is allowed to access group:" + resource.name());
            return true;
        }
        synchronized (this.acls) {
            map = this.acls.get(name);
        }
        if (map == null) {
            updateAclCache();
            synchronized (this.acls) {
                map = this.acls.get(name);
            }
            if (map == null) {
                LOG.info("For principal: " + name2 + ", topic:" + name + ", operation:" + operation + ", resource:" + resource + ", Topic not found");
                return false;
            }
        }
        return authorizeProjectUser(operation, resource, hostAddress, map, name2);
    }

    private boolean authorizeProjectUser(Operation operation, Resource resource, String str, Map<String, List<HopsAcl>> map, String str2) {
        List<HopsAcl> list = map.get(str2);
        if (list == null || list.isEmpty()) {
            LOG.info("For principal: " + str2 + ", operation:" + operation + ", resource:" + resource + ", allowMatch: false - no ACL found");
            return false;
        }
        boolean booleanValue = aclMatch(operation.name(), str2, str, Consts.DENY, list.get(0).getProjectRole(), list).booleanValue();
        LOG.info("For principal: " + str2 + ", operation:" + operation + ", resource:" + resource + ", denyMatch:" + booleanValue);
        boolean booleanValue2 = aclMatch(operation.name(), str2, str, Consts.ALLOW, list.get(0).getProjectRole(), list).booleanValue();
        LOG.info("For principal: " + str2 + ", operation:" + operation + ", resource:" + resource + ", allowMatch:" + booleanValue2);
        return !booleanValue && booleanValue2;
    }

    private Boolean aclMatch(String str, String str2, String str3, String str4, String str5, List<HopsAcl> list) {
        LOG.debug("aclMatch :: Operation:" + str);
        LOG.debug("aclMatch :: principal:" + str2);
        LOG.debug("aclMatch :: host:" + str3);
        LOG.debug("aclMatch :: permissionType:" + str4);
        LOG.debug("aclMatch :: role:" + str5);
        LOG.debug("aclMatch :: acls:" + list);
        for (HopsAcl hopsAcl : list) {
            LOG.debug("aclMatch.acl" + hopsAcl);
            if (hopsAcl.getPermissionType().equalsIgnoreCase(str4) && (hopsAcl.getPrincipal().equalsIgnoreCase(str2) || hopsAcl.getPrincipal().equals("*"))) {
                if (hopsAcl.getOperationType().equalsIgnoreCase(str) || hopsAcl.getOperationType().equalsIgnoreCase("*")) {
                    if (hopsAcl.getHost().equalsIgnoreCase(str3) || hopsAcl.getHost().equals("*")) {
                        if (hopsAcl.getRole().equalsIgnoreCase(str5) || hopsAcl.getRole().equals("*")) {
                            return true;
                        }
                    }
                }
            }
        }
        return false;
    }

    private void updateAclCache() {
        try {
            Map<String, Map<String, List<HopsAcl>>> acls = this.dbConnection.getAcls();
            LOG.debug("Acls:" + acls);
            synchronized (this.acls) {
                this.acls.clear();
                this.acls.putAll(acls);
            }
        } catch (SQLException e) {
            if (!this.r.matcher(e.getMessage()).find()) {
                LOG.error("HopsAclAuthorizer could not query database", e);
            }
            this.acls.clear();
        }
    }

    private boolean isSuperUser(KafkaPrincipal kafkaPrincipal) {
        if (this.superUsers.contains(kafkaPrincipal)) {
            LOG.debug("principal = " + kafkaPrincipal + " is a super user, allowing operation without checking acls.");
            return true;
        }
        LOG.debug("principal = " + kafkaPrincipal + " is not a super user.");
        return false;
    }

    public void addAcls(scala.collection.immutable.Set<Acl> set, Resource resource) {
    }

    public boolean removeAcls(scala.collection.immutable.Set<Acl> set, Resource resource) {
        return false;
    }

    public boolean removeAcls(Resource resource) {
        return false;
    }

    public scala.collection.immutable.Set<Acl> getAcls(Resource resource) {
        return null;
    }

    public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal kafkaPrincipal) {
        return null;
    }

    public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
        return null;
    }

    public void close() {
        this.dbConnection.close();
    }

    public void setSuperUsers(Set<KafkaPrincipal> set) {
        this.superUsers = set;
    }

    public void setDbConnection(DbConnection dbConnection) {
        this.dbConnection = dbConnection;
    }
}
