数据库权限管理

原创-大数据平台权限设计分享-HDFS目录权限

2019-05-15  本文已影响0人  无色的叶

权限目标

可通过界面,添加修改分配用户对HDFS目录拥有的权限,HDFS目录权限分为read,write,excute三种权限类型。

权限实现思路

主要思想是在操作HDFS目录之前,获取操作HDFS目录类型,当前操作用户,进行操作权限校验,无权限则抛出权限异常信息,通过hdfs权限校验源码流程分析,真正进行权限校验方法的是dfs.namenode.inode.attributes.provider.class属性配置的抽象类INodeAttributeProvider 拥有的接口AccessControlEnforcer类checkPermission方法进行权限校验.

package org.apache.hadoop.hdfs.server.namenode.permissions.check;

import com.google.common.collect.Sets;
import org.apache.common.permissions.model.CheckPermissionModel;
import org.apache.common.permissions.model.ClusterType;
import org.apache.common.permissions.model.RequestType;
import org.apache.common.permissions.model.Result;
import org.apache.common.permissions.utils.HttpTools;
import org.apache.common.permissions.utils.JacksonTools;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider;
import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;

import java.util.*;

public class MyPermissionCheck extends INodeAttributeProvider {

    static final Log log = LogFactory.getLog(MyPermissionCheck.class);

    private static final String SUPER_USER = "hadoop";

    private static final String REGEX_CHAR = "&";

    /**
     * 表示失败
     */
    public final static String ZORE = "0";

    /**
     * 表示成功
     */
    public final static String ONE = "1";

    private static String checkUrl;
    private static String clusertType;

    private Map<FsAction, Set<String>> access2ActionListMapper = new HashMap<FsAction, Set<String>>();

    @Override
    public void start() {
        Configuration configuration = new Configuration();
        //checkUrl 后管权限校验服务地址
        checkUrl = configuration.get(PermissionsConstants.PERMISSIONS_CHECK_URL);
        //集群类型
        clusertType = configuration.get(PermissionsConstants.CLUSTER_TYPE);
        if (log.isDebugEnabled()) {
            log.debug("------------");
            log.debug("----checkUrl------" + checkUrl);
            log.debug("------------");
        }

        access2ActionListMapper.put(FsAction.NONE, new HashSet<String>());
        access2ActionListMapper.put(FsAction.ALL, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.WRITE_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.READ, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.READ_WRITE, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.WRITE_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.READ_EXECUTE, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.WRITE, Sets.newHashSet(PermissionsConstants.WRITE_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.WRITE_EXECUTE, Sets.newHashSet(PermissionsConstants.WRITE_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
        access2ActionListMapper.put(FsAction.EXECUTE, Sets.newHashSet(PermissionsConstants.EXECUTE_ACCCESS_TYPE));

    }

    @Override
    public void stop() {
        log.info("------MyPermissionCheck----stop-----");
    }

    @Override
    public INodeAttributes getAttributes(String[] pathElements, INodeAttributes inode) {
        if (log.isDebugEnabled()) {
            log.debug("<== MyPermissionCheck.getAttributes(pathElementsCount=" + (pathElements == null ? 0 : pathElements.length) + "): " + inode);
        }
        return inode;
    }

    @Override
    public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
        if (log.isDebugEnabled()) {
            log.debug("<== MyPermissionCheck.getAttributes(" + fullPath + "): " + inode);
        }
        return inode;
    }

    @Override
    public AccessControlEnforcer getExternalAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
        if (log.isDebugEnabled()) {
            log.debug("==> MyPermissionCheck.getExternalAccessControlEnforcer()");
        }
        MyPermissionsControlEnforcer permissionsControlEnforcer = new MyPermissionsControlEnforcer();
        return permissionsControlEnforcer;
    }

    private enum AuthzStatus {ALLOW, DENY, NOT_DETERMINED}

    class MyPermissionsControlEnforcer implements AccessControlEnforcer {

        public MyPermissionsControlEnforcer() {

        }

        /**
         * Check whether current user have permissions to access the path.
         * Traverse is always checked.
         * <p>
         * Parent path means the parent directory for the path.
         * Ancestor path means the last (the closest) existing ancestor directory
         * of the path.
         * Note that if the parent path exists,
         * then the parent path and the ancestor path are the same.
         * <p>
         * For example, suppose the path is "/foo/bar/baz".
         * No matter baz is a file or a directory,
         * the parent path is "/foo/bar".
         * If bar exists, then the ancestor path is also "/foo/bar".
         * If bar does not exist and foo exists,
         * then the ancestor path is "/foo".
         * Further, if both foo and bar do not exist,
         * then the ancestor path is "/".
         *
         * @param doCheckOwner   Require user to be the owner of the path?
         * @param ancestorAccess The access required by the ancestor of the path.
         * @param parentAccess   The access required by the parent of the path.
         * @param access         The access required by the path.
         * @param subAccess      If path is a directory,
         *                       it is the access required of the path and all the sub-directories.
         *                       If path is not a directory, there is no effect.
         * @param ignoreEmptyDir Ignore permission checking for empty directory?
         * @throws AccessControlException
         */

        @Override
        public void checkPermission(String fsOwner, String superGroup, UserGroupInformation ugi, INodeAttributes[] inodeAttrs,
                                    INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
                                    int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
                                    FsAction parentAccess, FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
                throws AccessControlException {

            String userName = ugi != null ? ugi.getUserName() : null;
            if (log.isDebugEnabled()) {
                log.debug("==> MyPermissionsControlEnforcer.checkPermission("
                        + "fsOwner=" + fsOwner + "; superGroup=" + superGroup
                        + ", inodesCount=" + (inodes != null ? inodes.length : 0)
                        + ", snapshotId=" + snapshotId + ", userName=" + userName
                        + ", provided-path=" + path + ", ancestorIndex=" + ancestorIndex
                        + ", doCheckOwner=" + doCheckOwner + ", ancestorAccess=" + ancestorAccess
                        + ", parentAccess=" + parentAccess + ", access=" + access + ", subAccess="
                        + subAccess + ", ignoreEmptyDir=" + ignoreEmptyDir + ")");
            }

            if (userName == null || userName == "") {
                throw new AccessControlException("callerUgi.getUserName() is null");
            }

            String checkUserName = userName;
            if (SUPER_USER.equals(checkUserName)) {
                log.info("userName: " + checkUserName + " is super user ,has all permissions");
                return;
            }

            //开始校验checkUserName用户对目录的操作权限
            AuthzStatus authzStatus = AuthzStatus.NOT_DETERMINED;
            boolean isTraverseOnlyCheck = (access == null) && (parentAccess == null) && (ancestorAccess == null) && (subAccess == null);
            INode ancestor = null;
            INode parent = null;
            INode inode = null;
            if (!ArrayUtils.isEmpty(inodes)) {
                if (ancestorIndex >= inodes.length) {
                    ancestorIndex = inodes.length - 1;
                }
                while ((ancestorIndex >= 0) && (inodes[ancestorIndex] == null)) {
                    ancestorIndex--;
                }
                authzStatus = AuthzStatus.ALLOW;

                ancestor = (inodes.length > ancestorIndex) && (ancestorIndex >= 0) ? inodes[ancestorIndex] : null;
                parent = inodes.length > 1 ? inodes[(inodes.length - 2)] : null;
                inode = inodes[(inodes.length - 1)];

                if (isTraverseOnlyCheck) {
                    INode nodeToCheck = inode;
                    INodeAttributes nodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
                    if ((nodeToCheck == null) || (nodeToCheck.isFile())) {
                        if (parent != null) {
                            nodeToCheck = parent;
                            nodeAttribs = inodeAttrs.length > 1 ? inodeAttrs[(inodeAttrs.length - 2)] : null;
                        } else if (ancestor != null) {
                            nodeToCheck = ancestor;
                            nodeAttribs = inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
                        }
                    }
                    if (nodeToCheck != null) {
                        authzStatus = isAccessAllowed(nodeToCheck, nodeAttribs, FsAction.EXECUTE, checkUserName);
                    }
                }

                // checkStickyBit
                if ((authzStatus == AuthzStatus.ALLOW) && (parentAccess != null) && (parentAccess.implies(FsAction.WRITE)) && (parent != null) && (inode != null) &&
                        (parent.getFsPermission() != null) && (parent.getFsPermission().getStickyBit())) {
                    authzStatus = (StringUtils.equals(parent.getUserName(), checkUserName)) || (StringUtils.equals(inode.getUserName(), checkUserName)) ? AuthzStatus.ALLOW : AuthzStatus.NOT_DETERMINED;
                }

                // checkAncestorAccess
                if ((authzStatus == AuthzStatus.ALLOW) && (ancestorAccess != null) && (ancestor != null)) {
                    INodeAttributes ancestorAttribs = inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
                    authzStatus = isAccessAllowed(ancestor, ancestorAttribs, ancestorAccess, checkUserName);
                }

                // checkParentAccess
                if ((authzStatus == AuthzStatus.ALLOW) && (parentAccess != null) && (parent != null)) {
                    INodeAttributes parentAttribs = inodeAttrs.length > 1 ? inodeAttrs[(inodeAttrs.length - 2)] : null;
                    authzStatus = isAccessAllowed(parent, parentAttribs, parentAccess, checkUserName);
                }

                // checkINodeAccess
                if ((authzStatus == AuthzStatus.ALLOW) && (access != null) && (inode != null)) {
                    INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
                    authzStatus = isAccessAllowed(inode, inodeAttribs, access, checkUserName);
                }

                // checkSubAccess
                if ((authzStatus == AuthzStatus.ALLOW) && (subAccess != null) && (inode != null) && (inode.isDirectory())) {
                    Stack<INodeDirectory> directories = new Stack();
                    for (directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
                        INodeDirectory dir = directories.pop();
                        ReadOnlyList<INode> cList = dir.getChildrenList(snapshotId);
                        if ((!cList.isEmpty()) || (!ignoreEmptyDir)) {
                            INodeAttributes dirAttribs = dir.getSnapshotINode(snapshotId);
                            authzStatus = isAccessAllowed(dir, dirAttribs, subAccess, checkUserName);
                            if (authzStatus != AuthzStatus.ALLOW) {
                                break;
                            }
                        }
                        for (INode child : cList) {
                            if (child.isDirectory()) {
                                directories.push(child.asDirectory());
                            }
                        }
                    }

                }

                // checkOwnerAccess
                if ((authzStatus == AuthzStatus.ALLOW) && (doCheckOwner)) {
                    INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
                    String owner = inodeAttribs != null ? inodeAttribs.getUserName() : null;
                    authzStatus = StringUtils.equals(checkUserName, owner) ? AuthzStatus.ALLOW : AuthzStatus.NOT_DETERMINED;
                }
            }


            if (authzStatus != AuthzStatus.ALLOW) {
                FsAction action = access;
                if (action == null) {
                    if (parentAccess != null) {
                        action = parentAccess;
                    } else if (ancestorAccess != null) {
                        action = ancestorAccess;
                    } else {
                        action = FsAction.EXECUTE;
                    }
                }
                throw new AccessControlException("Permission denied: user=" + checkUserName + ", access=" + action + ", inode=\"" + path + "\"");
            }


        }


        private AuthzStatus isAccessAllowed(INode inode, INodeAttributes inodeAttribs, FsAction access, String user) throws AccessControlException {
            String path = inode != null ? inode.getFullPathName() : null;

            if (PermissionsConstants.HDFS_ROOT_FOLDER_PATH_ALT.equals(path)) {
                path = PermissionsConstants.HDFS_ROOT_FOLDER_PATH;
            }

            if (log.isDebugEnabled()) {
                log.debug("==> MyPermissionsControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + ")");
            }

            Set<String> accessTypes = access2ActionListMapper.get(access);

            if (accessTypes == null) {
                log.warn("MyPermissionsControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + "): no accessType found for " + access);
                accessTypes = access2ActionListMapper.get(FsAction.NONE);
            }

            //发出请求,对操作进行权限校验
            CheckPermissionModel checkPermissionModel = new CheckPermissionModel();
            checkPermissionModel.setClusterType(ClusterType.valueOf(clusertType));
            checkPermissionModel.setRequestType(RequestType.HDFS_REQUEST);
            checkPermissionModel.setUserName(user);
            checkPermissionModel.setCommand(path);
            checkPermissionModel.setOperationTypes(accessTypes);
            try {
                String post = HttpTools.sendPost(checkUrl, JacksonTools.obj2json(checkPermissionModel));
                Result result = JacksonTools.json2pojo(post, Result.class);
                String status = result.getStatus();

                if (status.equals(ZORE)) {
                    log.info(JacksonTools.obj2json(result));
                    return AuthzStatus.DENY;
                }

                Object obj = result.getObj();
                CheckPermissionModel model = JacksonTools.json2pojo(JacksonTools.obj2json(obj), CheckPermissionModel.class);
                log.info("check permissions result: " + JacksonTools.obj2json(obj));
                if (model.isValidateFlag()) {
                    return AuthzStatus.ALLOW;
                }

            } catch (Exception e) {
                log.error(e);
                throw new AccessControlException("validate permissions is has error");
            }

            return AuthzStatus.DENY;
        }

    }

}

自实现的MyPermissionsControlEnforcer类,继承了INodeAttributeProvider ,并实现了AccessControlEnforcer接口方法,进行权限验证

public abstract class INodeAttributeProvider {

  /**
   * The AccessControlEnforcer allows implementations to override the
   * default File System permission checking logic enforced on a file system
   * object
   */
  public interface AccessControlEnforcer {

    /**
     * Checks permission on a file system object. Has to throw an Exception
     * if the filesystem object is not accessessible by the calling Ugi.
     * @param fsOwner Filesystem owner (The Namenode user)
     * @param supergroup super user geoup
     * @param callerUgi UserGroupInformation of the caller
     * @param inodeAttrs Array of INode attributes for each path element in the
     *                   the path
     * @param inodes Array of INodes for each path element in the path
     * @param pathByNameArr Array of byte arrays of the LocalName
     * @param snapshotId the snapshotId of the requested path
     * @param path Path String
     * @param ancestorIndex Index of ancestor
     * @param doCheckOwner perform ownership check
     * @param ancestorAccess The access required by the ancestor of the path.
     * @param parentAccess The access required by the parent of the path.
     * @param access The access required by the path.
     * @param subAccess If path is a directory, It is the access required of
     *                  the path and all the sub-directories. If path is not a
     *                  directory, there should ideally be no effect.
     * @param ignoreEmptyDir Ignore permission checking for empty directory?
     * @throws AccessControlException
     */
    public abstract void checkPermission(String fsOwner, String supergroup,
        UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
        INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
        int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
        FsAction parentAccess, FsAction access, FsAction subAccess,
        boolean ignoreEmptyDir)
            throws AccessControlException;

  }
  /**
   * Initialize the provider. This method is called at NameNode startup
   * time.
   */
  public abstract void start();

  /**
   * Shutdown the provider. This method is called at NameNode shutdown time.
   */
  public abstract void stop();

  @Deprecated
  String[] getPathElements(String path) {
    path = path.trim();
    if (path.charAt(0) != Path.SEPARATOR_CHAR) {
      throw new IllegalArgumentException("It must be an absolute path: " +
          path);
    }
    int numOfElements = StringUtils.countMatches(path, Path.SEPARATOR);
    if (path.length() > 1 && path.endsWith(Path.SEPARATOR)) {
      numOfElements--;
    }
    String[] pathElements = new String[numOfElements];
    int elementIdx = 0;
    int idx = 0;
    int found = path.indexOf(Path.SEPARATOR_CHAR, idx);
    while (found > -1) {
      if (found > idx) {
        pathElements[elementIdx++] = path.substring(idx, found);
      }
      idx = found + 1;
      found = path.indexOf(Path.SEPARATOR_CHAR, idx);
    }
    if (idx < path.length()) {
      pathElements[elementIdx] = path.substring(idx);
    }
    return pathElements;
  }

  @Deprecated
  public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
    return getAttributes(getPathElements(fullPath), inode);
  }

  public abstract INodeAttributes getAttributes(String[] pathElements,
      INodeAttributes inode);

  public INodeAttributes getAttributes(byte[][] components,
      INodeAttributes inode) {
    String[] elements = new String[components.length];
    for (int i = 0; i < elements.length; i++) {
      elements[i] = DFSUtil.bytes2String(components[i]);
    }
    return getAttributes(elements, inode);
  }

  /**
   * Can be over-ridden by implementations to provide a custom Access Control
   * Enforcer that can provide an alternate implementation of the
   * default permission checking logic.
   * @param defaultEnforcer The Default AccessControlEnforcer
   * @return The AccessControlEnforcer to use
   */
  public AccessControlEnforcer getExternalAccessControlEnforcer(
      AccessControlEnforcer defaultEnforcer) {
    return defaultEnforcer;
  }
}

每次再对hdfs目录操作之前,调用权限校验服务,对权限进行校验,进一步对HDFS权限校验流程分析,调用AccessControlEnforcer.checkPermission是在FSDirectory.checkPermission方法中进行调用

 /**
   * Check whether current user have permissions to access the path. For more
   * details of the parameters, see
   * {@link FSPermissionChecker#checkPermission}.
   */
  void checkPermission(FSPermissionChecker pc, INodesInPath iip,
      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
      FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
      throws AccessControlException {
    if (!pc.isSuperUser()) {
      readLock();
      try {
        pc.checkPermission(iip, doCheckOwner, ancestorAccess,
            parentAccess, access, subAccess, ignoreEmptyDir);
      } finally {
        readUnlock();
      }
    }
  }

可看到调用pc.checkPermission方法会进行判断if (!pc.isSuperUser()) 当前用户是否是超级用户,如果是超级用户则不进行权限校验,默认拥有所有权限,进一步查看pc.isSuperUser()方法,可了解到判断是否是超级用户,user.equals(fsOwner) || groups.contains(supergroup),

 private final boolean isSuper;
  private final INodeAttributeProvider attributeProvider;


  protected FSPermissionChecker(String fsOwner, String supergroup,
      UserGroupInformation callerUgi,
      INodeAttributeProvider attributeProvider) {
    this.fsOwner = fsOwner;
    this.supergroup = supergroup;
    this.callerUgi = callerUgi;
    this.groups = callerUgi.getGroups();
    user = callerUgi.getShortUserName();
    isSuper = user.equals(fsOwner) || groups.contains(supergroup);
    this.attributeProvider = attributeProvider;
  }

FileSystem.get()方法获取FileSystem对象会默认构建 UserGroupInformation callerUgi 类并执行其中的commit方法,该类记录了当前执行用户,组等信息。如果使用export HADOOP_USER_NAMNE=hadoop,获取java api传入hadoop用户方式,则会判断pc.isSuperUser()为true,不会进行权限验证,即如知道运行hdfs的进程linux用户,则会绕过权限校验,用户集群最大权限,存在风险。
根据自身权限需求,在commit方法中进行用户登录校验,

@Override
    public boolean commit() throws LoginException {
      if (LOG.isDebugEnabled()) {
        LOG.debug("hadoop login commit");
      }
      // if we already have a user, we are done.
      if (!subject.getPrincipals(User.class).isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("using existing subject:"+subject.getPrincipals());
        }
        return true;
      }
      Principal user = getCanonicalUser(KerberosPrincipal.class);
      if (user != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("using kerberos user:"+user);
        }
      }
      //If we don't have a kerberos user and security is disabled, check
      //if user is specified in the environment or properties
      if (!isSecurityEnabled() && (user == null)) {
        String envUser = System.getenv(HADOOP_USER_NAME);
        if (envUser == null) {
          envUser = System.getProperty(HADOOP_USER_NAME);
        }
        user = envUser == null ? null : new User(envUser);
      }
      // use the OS user
      if (user == null) {
        user = getCanonicalUser(OS_PRINCIPAL_CLASS);
        if (LOG.isDebugEnabled()) {
          LOG.debug("using local user:"+user);
        }
      }

      //获取系统用户名
      String osLoginName = getCanonicalUser(OS_PRINCIPAL_CLASS).getName();
      if (!SUPER_NAME.equals(osLoginName)) {
        if (SUPER_NAME.equals(user.getName())) {
          LOG.error("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way");
          throw new LoginException("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way to excute any operation");
        }
      }

      // if we found the user, add our principal
      if (user != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
        }

        User userEntry = null;
        try {
          // LoginContext will be attached later unless it's an external
          // subject.
          AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
            ? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
          userEntry = new User(user.getName(), authMethod, null);
        } catch (Exception e) {
          throw (LoginException)(new LoginException(e.toString()).initCause(e));
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
        }

        subject.getPrincipals().add(userEntry);
        return true;
      }
      LOG.error("Can't find user in " + subject);
      throw new LoginException("Can't find user name");
    }

添加禁止使用非以hdfs进程用户登录到集群上,使用export HADOOP_USER_NAME=hadoop(HDFS进程运行用户),方式进行任何操作,如是则抛出LoginException

//获取系统用户名
      String osLoginName = getCanonicalUser(OS_PRINCIPAL_CLASS).getName();
      if (!SUPER_NAME.equals(osLoginName)) {
        if (SUPER_NAME.equals(user.getName())) {
          LOG.error("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way");
          throw new LoginException("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way to excute any operation");
        }
      }

禁止远程使用(java api)HDFS运行进程用户执行任何操作,这会导致hadoop集群的datanode和namenode节点间交互出现异常,namenode主备节点间通信出现异常,可修改NameNode类中的getRemoteUser方法,判断远程连接IP是否是datanode节点或namenode节点IP,如是则拥有所有权限,如请求来自集群以为且执行用户为hadoop,则抛出异常

/* optimize ugi lookup for RPC operations to avoid a trip through
     * UGI.getCurrentUser which is synch'ed
     */
    public static UserGroupInformation getRemoteUser() throws IOException {
        UserGroupInformation ugi = Server.getRemoteUser();

        if (Server.getRemoteIp() != null) {
            boolean isTrust = false;
            String hostName = Server.getRemoteIp().getHostName();

            getTrustHosts();

            //判断是否来自datanode节点的请求
            if (slaves.contains(hostName)) {
                isTrust = true;
            }

            //判断是否来自namenode节点的请求
            if (namenodeIps.contains(hostName)) {
                isTrust = true;
            }

            if (!isTrust) {

                String userName;
                if (ugi != null) {
                    userName = ugi.getUserName();
                } else {
                    userName = UserGroupInformation.getCurrentUser().getUserName();
                }

                //判断用户是否以hadoop用户提交
                if (userName.equals("hadoop")) {
                    LOG.error("hdfs not allow ip: " + hostName + " use " + userName + " user to excute command");
                    throw new IOException("hdfs not allow ip: " + hostName + " use " + userName + " user to excute command");
                }
            }
        }
        return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
    }

    private static void getTrustHosts() throws IOException {
        if (slaves == null || slaves.isEmpty() || namenodeIps == null) {
            loadTrustHosts();
        }

        long diff = System.currentTimeMillis() - lastLoadTrustHostsNamesTime;
        if (diff > 60000) {
            if (slaves != null) {
                slaves.clear();
            }
            LOG.info("agin to load trust host");
            loadTrustHosts();
            lastLoadTrustHostsNamesTime = System.currentTimeMillis();
        }
    }

    private static void loadTrustHosts() throws IOException {
        Configuration configuration = new Configuration();
        String hadoopHome = configuration.get(HADOOP_HOME_CONFIG_KEY);
        slaves = Files.readAllLines(Paths.get(hadoopHome + "/workers"));
        namenodeIps = configuration.get(HADOOP_NAMENODE_IP_KEY);
    }
上一篇 下一篇

猜你喜欢

热点阅读