大数据平台技术笔记

AM注册与取消注册

2023-07-23  本文已影响0人  OkGogogooo

1. 背景

  在笔者开发的大数据平台(XSailboat)中,包含一个任务开发、调度、监控的模块,叫XTaskWorks。这里的任务是使用Java,在XTaskFrame框架之上开发的数据集成、数据计算、数据分析等类型的任务,并且在通用的任务框架基础之上,抽象出可视化编辑的ETL工具。所以任务数量是很多的,通常在一个项目中能达到几百的规模。每个任务可以视为一个Java进程(有一些调度频率不高的任务,出于节约内存的目的,也会合并到一个进程中)。
  为了维护、部署、更新方便,笔者基于YARN的容器化和AM 架构,开发了任务引擎。任务引擎并不是一个ApplicationMaster、多个Application的模式,而是每一个任务进程都是一个ApplicationMater,主要原因是:

  1. XTaskFrame是先于大数据平台XSailboat的产品,它本身已经有了任务注册和管理服务(类似于AM),注册和管理服务不存在,不会影响任务的正常调度执行,只是暂时暂时失去了从界面管控任务的能力。
  2. 如果采用一个AM,多个APP的模式,一旦AM因为某些原因,停止了,与它相关的几百个任务可能会因为失去了AM,而被回收,即使没有回收,AM重启后也需要在YARN的框架下重新将这些容器纳入到新AM的管理之下,这会比较复杂。

  笔者的处理方法是:

  1. 在原来的任务注册和管理服务基础之上,增加了容器引擎的扩展模块。
  2. 通过任务注册和管理服务实现定义任务,一键启停任务。启动任务会在YARN上以AM形式运行起来。
  3. 任务启动成功后,会向任务注册和管理服务注册。

2. 出现的问题

  1. 任务进程运行起来以后,一直没有达到RUNNING状态。
  2. 停止进程之后,YARN又会自动启动一次。

3. 原因:

  运行起来的AM需要主动连接RM,进行注册,发送心跳和取消注册。提供下面代码做参考:

        mLogger.info("开始启动 ApplicationMaster ...");

        Map<String, String> envMap = System.getenv() ;
        String ctnIdString = envMap.get(ApplicationConstants.Environment.CONTAINER_ID.key());
        mLogger.info("容器ID:{}", ctnIdString);
        mLogger.info("JAVA_HOME:{}", envMap.get(ApplicationConstants.Environment.JAVA_HOME.key()));
        try
        {
            ContainerId ctnId = ContainerId.fromString(ctnIdString);
            ApplicationAttemptId attempId = ctnId.getApplicationAttemptId();
            mAmCtx.setApplicationAttemptId(attempId) ;
    
            RMCallbackHandler rmCall = new RMCallbackHandler(mAmCtx);
    
            mAMRMClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCall);
            Configuration conf = mAmCtx.getHadoopConf() ;
            Assert.notNull(conf) ;
            conf = new Configuration(conf) ;
            conf.set(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS , Integer.toString(YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS));
            mAMRMClient.init(conf);
        
        
            mUgf = UserGroupInformation.getCurrentUser();
            mUgf.addToken(mAmCtx.getAMRMToken()) ;
            mUgf.doAs((PrivilegedExceptionAction<Void>) () -> {
                try
                {
                    mAMRMClient.start();
                }
                catch(Exception e)
                {
                    mLogger.error("启动AMRMClientAsync出现异常,异常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
                    System.exit(0) ;
                }
                return null;
            }) ;
        }
        catch (IOException | InterruptedException e)
        {
            mLogger.error("安全执行出现异常,异常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
            System.exit(0) ;
        }
        
        // AM必须向RM发出心跳信号,以便让它知道AM已存在且仍在运行。
        // RM的超时到期间隔由可通过YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS访问的配置设置定义,
        // 默认值由YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS定义。
        // ApplicationMaster需要使用ResourceManager注册自己以开始心跳。
        String hostName = NetUtils.getHostname();
        try
        {
            // 服务地址
            String serviceAddr = "http://"+XNet.getPreferedIpv4()+":"
                    + MSApp.instance().getHttpPort() ;
            //注册服务地址
            RegisterApplicationMasterResponse resp = mAMRMClient.registerApplicationMaster(hostName
                    , 0 , serviceAddr);

            long maxMem = resp.getMaximumResourceCapability().getMemorySize();
            mLogger.info("集群资源分配的最大内存是:{}MB" , maxMem);

            int maxVCores = resp.getMaximumResourceCapability().getVirtualCores();
            mLogger.info("集群资源分配的最大虚拟核数是:{}个 " , maxVCores);
            mLogger.info("AM启动完成。");
            // 取消注册
            Runnable stopPerformer = ()->{
                try
                {
                    mAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED , "用户主动关闭应用。", "N/A");
                }
                catch (YarnException | IOException e)
                {
                    e.printStackTrace();
                }
            } ;
            // 任务停止时,取消注册。
            MSApp.instance().withStopPerformer(stopPerformer) ;
            // ShutdownHook形式不可靠
//          Runtime.getRuntime().addShutdownHook(new Thread(stopPerformer)) ;
        }
        catch (YarnException | IOException e)
        {
            mLogger.error("AM运行过程中出现异常。异常消息:"+ExceptionAssist.getStackTrace(e)) ;
        }
上一篇下一篇

猜你喜欢

热点阅读