AM注册与取消注册
2023-07-23 本文已影响0人
OkGogogooo
1. 背景
在笔者开发的大数据平台(XSailboat)中,包含一个任务开发、调度、监控的模块,叫XTaskWorks。这里的任务是使用Java,在XTaskFrame框架之上开发的数据集成、数据计算、数据分析等类型的任务,并且在通用的任务框架基础之上,抽象出可视化编辑的ETL工具。所以任务数量是很多的,通常在一个项目中能达到几百的规模。每个任务可以视为一个Java进程(有一些调度频率不高的任务,出于节约内存的目的,也会合并到一个进程中)。
为了维护、部署、更新方便,笔者基于YARN的容器化和AM 架构,开发了任务引擎。任务引擎并不是一个ApplicationMaster、多个Application的模式,而是每一个任务进程都是一个ApplicationMater,主要原因是:
- XTaskFrame是先于大数据平台XSailboat的产品,它本身已经有了任务注册和管理服务(类似于AM),注册和管理服务不存在,不会影响任务的正常调度执行,只是暂时暂时失去了从界面管控任务的能力。
- 如果采用一个AM,多个APP的模式,一旦AM因为某些原因,停止了,与它相关的几百个任务可能会因为失去了AM,而被回收,即使没有回收,AM重启后也需要在YARN的框架下重新将这些容器纳入到新AM的管理之下,这会比较复杂。
笔者的处理方法是:
- 在原来的任务注册和管理服务基础之上,增加了容器引擎的扩展模块。
- 通过任务注册和管理服务实现定义任务,一键启停任务。启动任务会在YARN上以AM形式运行起来。
- 任务启动成功后,会向任务注册和管理服务注册。
2. 出现的问题
- 任务进程运行起来以后,一直没有达到RUNNING状态。
- 停止进程之后,YARN又会自动启动一次。
3. 原因:
运行起来的AM需要主动连接RM,进行注册,发送心跳和取消注册。提供下面代码做参考:
mLogger.info("开始启动 ApplicationMaster ...");
Map<String, String> envMap = System.getenv() ;
String ctnIdString = envMap.get(ApplicationConstants.Environment.CONTAINER_ID.key());
mLogger.info("容器ID:{}", ctnIdString);
mLogger.info("JAVA_HOME:{}", envMap.get(ApplicationConstants.Environment.JAVA_HOME.key()));
try
{
ContainerId ctnId = ContainerId.fromString(ctnIdString);
ApplicationAttemptId attempId = ctnId.getApplicationAttemptId();
mAmCtx.setApplicationAttemptId(attempId) ;
RMCallbackHandler rmCall = new RMCallbackHandler(mAmCtx);
mAMRMClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCall);
Configuration conf = mAmCtx.getHadoopConf() ;
Assert.notNull(conf) ;
conf = new Configuration(conf) ;
conf.set(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS , Integer.toString(YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS));
mAMRMClient.init(conf);
mUgf = UserGroupInformation.getCurrentUser();
mUgf.addToken(mAmCtx.getAMRMToken()) ;
mUgf.doAs((PrivilegedExceptionAction<Void>) () -> {
try
{
mAMRMClient.start();
}
catch(Exception e)
{
mLogger.error("启动AMRMClientAsync出现异常,异常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
System.exit(0) ;
}
return null;
}) ;
}
catch (IOException | InterruptedException e)
{
mLogger.error("安全执行出现异常,异常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
System.exit(0) ;
}
// AM必须向RM发出心跳信号,以便让它知道AM已存在且仍在运行。
// RM的超时到期间隔由可通过YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS访问的配置设置定义,
// 默认值由YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS定义。
// ApplicationMaster需要使用ResourceManager注册自己以开始心跳。
String hostName = NetUtils.getHostname();
try
{
// 服务地址
String serviceAddr = "http://"+XNet.getPreferedIpv4()+":"
+ MSApp.instance().getHttpPort() ;
//注册服务地址
RegisterApplicationMasterResponse resp = mAMRMClient.registerApplicationMaster(hostName
, 0 , serviceAddr);
long maxMem = resp.getMaximumResourceCapability().getMemorySize();
mLogger.info("集群资源分配的最大内存是:{}MB" , maxMem);
int maxVCores = resp.getMaximumResourceCapability().getVirtualCores();
mLogger.info("集群资源分配的最大虚拟核数是:{}个 " , maxVCores);
mLogger.info("AM启动完成。");
// 取消注册
Runnable stopPerformer = ()->{
try
{
mAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED , "用户主动关闭应用。", "N/A");
}
catch (YarnException | IOException e)
{
e.printStackTrace();
}
} ;
// 任务停止时,取消注册。
MSApp.instance().withStopPerformer(stopPerformer) ;
// ShutdownHook形式不可靠
// Runtime.getRuntime().addShutdownHook(new Thread(stopPerformer)) ;
}
catch (YarnException | IOException e)
{
mLogger.error("AM运行过程中出现异常。异常消息:"+ExceptionAssist.getStackTrace(e)) ;
}