Flink 源码之 yarn-session 启动流程
Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
背景
本篇分析Flink yarn-session.sh部署Flink集群到Yarn的逻辑。
启动Yarn session的命令位于yarn-session.sh
。从这个文件我们可以找到入口类为org.apache.flink.yarn.cli.FlinkYarnSessionCli
。我们从这里开始分析。
FlinkYarnSessionCli
FlinkYarnSessionCli
是Yarn session的命令行交互工具。它的main
方法创建一个FlinkYarnSessionCli
对象,并调用run
方法。
public static void main(final String[] args) {
// 获取Flink conf目录所在位置
final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
// 从FLINK_CONF_DIR查找flink-conf.yaml。如果不存在,返回空配置
final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
int retCode;
try {
// 创建FlinkYarnSessionCli
final FlinkYarnSessionCli cli =
new FlinkYarnSessionCli(
flinkConfiguration,
configurationDirectory,
"",
""); // no prefix for the YARN session
// 执行认证等操作
SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
// 在认证环境执行run方法
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
} catch (CliArgsException e) {
retCode = handleCliArgsException(e, LOG);
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
retCode = handleError(strippedThrowable, LOG);
}
System.exit(retCode);
}
FlinkYarnSessionCli
的构造方法创建yarn session的命令行参数定义(解析命令行用),然后检查yarn属性文件是否存在。
yarn属性文件位于
/tmp/.yarn-properties-用户名
。内容如下:dynamicPropertiesString= applicationID=application_xxxxxxxxxxxxx_xxxx
它记录了某个用户最近一次提交的application ID。
public FlinkYarnSessionCli(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
String configurationDirectory,
String shortPrefix,
String longPrefix,
boolean acceptInteractiveInput)
throws FlinkException {
super(configuration, shortPrefix, longPrefix);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
this.configurationDirectory = checkNotNull(configurationDirectory);
this.acceptInteractiveInput = acceptInteractiveInput;
// Create the command line options
// 创建各个命令行参数
// 使用了Apache common cli
query =
new Option(
shortPrefix + "q",
longPrefix + "query",
false,
"Display available YARN resources (memory, cores)");
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath =
new Option(
shortPrefix + "t",
longPrefix + "ship",
true,
"Ship files in the specified directory (t for transfer)");
flinkJar =
new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
jmMemory =
new Option(
shortPrefix + "jm",
longPrefix + "jobManagerMemory",
true,
"Memory for JobManager Container with optional unit (default: MB)");
tmMemory =
new Option(
shortPrefix + "tm",
longPrefix + "taskManagerMemory",
true,
"Memory per TaskManager Container with optional unit (default: MB)");
slots =
new Option(
shortPrefix + "s",
longPrefix + "slots",
true,
"Number of slots per TaskManager");
dynamicproperties =
Option.builder(shortPrefix + "D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator()
.desc("use value for given property")
.build();
name =
new Option(
shortPrefix + "nm",
longPrefix + "name",
true,
"Set a custom name for the application on YARN");
applicationType =
new Option(
shortPrefix + "at",
longPrefix + "applicationType",
true,
"Set a custom application type for the application on YARN");
zookeeperNamespace =
new Option(
shortPrefix + "z",
longPrefix + "zookeeperNamespace",
true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
nodeLabel =
new Option(
shortPrefix + "nl",
longPrefix + "nodeLabel",
true,
"Specify YARN node label for the YARN application");
help =
new Option(
shortPrefix + "h",
longPrefix + "help",
false,
"Help for the Yarn session CLI.");
allOptions = new Options();
allOptions.addOption(flinkJar);
allOptions.addOption(jmMemory);
allOptions.addOption(tmMemory);
allOptions.addOption(queue);
allOptions.addOption(query);
allOptions.addOption(shipPath);
allOptions.addOption(slots);
allOptions.addOption(dynamicproperties);
allOptions.addOption(DETACHED_OPTION);
allOptions.addOption(YARN_DETACHED_OPTION);
allOptions.addOption(name);
allOptions.addOption(applicationId);
allOptions.addOption(applicationType);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(nodeLabel);
allOptions.addOption(help);
// try loading a potential yarn properties file
// 从配置yarn.properties-file.location读取yarn属性文件位置
// 如果不配置,默认在系统临时文件目录
// 该属性文件保存了当前用户提交的yarn sesion对应的application id
this.yarnPropertiesFileLocation =
configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
yarnPropertiesFile = new Properties();
// 如果yarn属性文件存在
if (yarnPropertiesLocation.exists()) {
// 读取属性文件的值到yarnPropertiesFile
LOG.info(
"Found Yarn properties file under {}.",
yarnPropertiesLocation.getAbsolutePath());
try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
yarnPropertiesFile.load(is);
} catch (IOException ioe) {
throw new FlinkException(
"Could not read the Yarn properties file "
+ yarnPropertiesLocation
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath()
+ '.',
ioe);
}
// 读取yarn application id
final String yarnApplicationIdString =
yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
if (yarnApplicationIdString == null) {
throw new FlinkException(
"Yarn properties file found but doesn't contain a "
+ "Yarn application id. Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath());
}
try {
// try converting id to ApplicationId
// 转换成ApplicationID对象
yarnApplicationIdFromYarnProperties =
ConverterUtils.toApplicationId(yarnApplicationIdString);
} catch (Exception e) {
throw new FlinkException(
"YARN properties contain an invalid entry for "
+ "application id: "
+ yarnApplicationIdString
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath(),
e);
}
} else {
// 如果yarn属性文件不存在,设置为null
yarnApplicationIdFromYarnProperties = null;
}
}
分析完构造方法,我们接下来重点分析启动逻辑。它位于run
方法,首先解析命令行参数,然后部署集群到Yarn上。
内容如下:
public int run(String[] args) throws CliArgsException, FlinkException {
//
// Command Line Options
//
// 解析命令行
final CommandLine cmd = parseCommandLineOptions(args, true);
// 如果有help参数,打印帮助信息
if (cmd.hasOption(help.getOpt())) {
printUsage();
return 0;
}
// 创建configuration(上面传入的flinkConfiguration)
final Configuration effectiveConfiguration = new Configuration(configuration);
// 解析特定命令行参数,放入configuration
final Configuration commandLineConfiguration = toConfiguration(cmd);
// 合并这两个configuration
effectiveConfiguration.addAll(commandLineConfiguration);
LOG.debug("Effective configuration: {}", effectiveConfiguration);
final ClusterClientFactory<ApplicationId> yarnClusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(effectiveConfiguration);
effectiveConfiguration.set(
DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName());
// 创建Yarn cluster描述符
// 包含YarnClient,flink配置和yarn配置等
final YarnClusterDescriptor yarnClusterDescriptor =
(YarnClusterDescriptor)
yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
try {
// Query cluster for metrics
if (cmd.hasOption(query.getOpt())) {
// 如果是查询集群状态命令
final String description = yarnClusterDescriptor.getClusterDescription();
System.out.println(description);
return 0;
} else {
final ClusterClientProvider<ApplicationId> clusterClientProvider;
final ApplicationId yarnApplicationId;
if (cmd.hasOption(applicationId.getOpt())) {
yarnApplicationId =
ConverterUtils.toApplicationId(
cmd.getOptionValue(applicationId.getOpt()));
// 获取Yarn上已经存在的Flink集群
clusterClientProvider = yarnClusterDescriptor.retrieve(yarnApplicationId);
} else {
// 获取集群资源参数配置
// 包含job manager内存,task manager内存和每个TM的slot数量
final ClusterSpecification clusterSpecification =
yarnClusterClientFactory.getClusterSpecification(
effectiveConfiguration);
// 部署Flink集群
clusterClientProvider =
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
// 获取集群对应的client
ClusterClient<ApplicationId> clusterClient =
clusterClientProvider.getClusterClient();
// ------------------ ClusterClient deployed, handle connection details
// 获取Yarn application ID
yarnApplicationId = clusterClient.getClusterId();
try {
// 打印出web页面访问地址
System.out.println(
"JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
// 写入Yarn属性文件,稍后分析
writeYarnPropertiesFile(yarnApplicationId, dynamicPropertiesEncoded);
} catch (Exception e) {
try {
// 遇到异常,关闭集群client
clusterClient.close();
} catch (Exception ex) {
LOG.info("Could not properly shutdown cluster client.", ex);
}
try {
// 停止Yarn application
yarnClusterDescriptor.killCluster(yarnApplicationId);
} catch (FlinkException fe) {
LOG.info("Could not properly terminate the Flink cluster.", fe);
}
throw new FlinkException(
"Could not write the Yarn connection information.", e);
}
}
if (!effectiveConfiguration.getBoolean(DeploymentOptions.ATTACHED)) {
// 如果是detached模式,打印detached集群信息
YarnClusterDescriptor.logDetachedClusterInformation(yarnApplicationId, LOG);
} else {
ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
// 创建Yarn application状态监视器
// 另起一个线程周期性检测状态
final YarnApplicationStatusMonitor yarnApplicationStatusMonitor =
new YarnApplicationStatusMonitor(
yarnClusterDescriptor.getYarnClient(),
yarnApplicationId,
new ScheduledExecutorServiceAdapter(scheduledExecutorService));
// 创建shutdown hook,JVM停止时关闭集群
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
() ->
shutdownCluster(
clusterClientProvider.getClusterClient(),
scheduledExecutorService,
yarnApplicationStatusMonitor),
getClass().getSimpleName(),
LOG);
try {
// 运行交互式client
runInteractiveCli(yarnApplicationStatusMonitor, acceptInteractiveInput);
} finally {
// 关闭集群
shutdownCluster(
clusterClientProvider.getClusterClient(),
scheduledExecutorService,
yarnApplicationStatusMonitor);
if (shutdownHook != null) {
// we do not need the hook anymore as we have just tried to shutdown the
// cluster.
ShutdownHookUtil.removeShutdownHook(
shutdownHook, getClass().getSimpleName(), LOG);
}
tryRetrieveAndLogApplicationReport(
yarnClusterDescriptor.getYarnClient(), yarnApplicationId);
}
}
}
} finally {
try {
yarnClusterDescriptor.close();
} catch (Exception e) {
LOG.info("Could not properly close the yarn cluster descriptor.", e);
}
}
return 0;
}
writeYarnPropertiesFile
方法负责写入上面所述的yarn属性文件。内容如下所示:
private void writeYarnPropertiesFile(
ApplicationId yarnApplicationId, @Nullable String dynamicProperties) {
// file that we write into the conf/ dir containing the jobManager address and the dop.
// 获取系统临时文件目录和执行的用户名。生成文件名为"{临时文件目录}/.yarn-properties-{用户名}"的文件
// 例如临时文件目录为/tmp,用户名为root,生成的文件名为"/tmp/.yarn-properties-root"
final File yarnPropertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
Properties yarnProps = new Properties();
// 写入applicationID
yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
// add dynamic properties
// 写入dynamicPropertiesString
if (dynamicProperties != null) {
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
}
// 写入属性到文件
writeYarnProperties(yarnProps, yarnPropertiesFile);
}
YarnClusterDescriptor
YarnClusterDescriptor
负责部署Flink集群到yarn上。部署集群的方法为deploySessionCluster
。它的代码分析如下:
@Override
public ClusterClientProvider<ApplicationId> deploySessionCluster(
ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink session cluster",
getYarnSessionClusterEntrypoint(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
}
}
部署的具体逻辑位于deployInternal
方法中,分析如下:
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
// 获得当前提交任务的用户
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
// 如果启用了kerberos
// 获取flink的配置,是否启用ticket cache
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
// 检查当前登录凭据是否有效(ticket cache和delegation token)
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
// 检查fetchToken和yarn access相关配置
final boolean fetchToken =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
final boolean yarnAccessFSEnabled =
!CollectionUtil.isNullOrEmpty(
flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
if (!fetchToken && yarnAccessFSEnabled) {
throw new IllegalConfigurationException(
String.format(
"When %s is disabled, %s must be disabled as well.",
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
YarnConfigOptions.YARN_ACCESS.key()));
}
}
// 检查jar,configuration和资源要求等是否可用
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
// 检查配置的yarn queue是否存在,size是否大于0
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// Create application via yarnClient
// 创建一个yarn application
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
// 获取最大可用的vcore和内存
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
// 获取可用内存资源,包含总可用内存,单个container最大可用内存和每个node manager的可用内存
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
// 获取最小分配内存
final int yarnMinAllocationMB =
yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(
"The minimum allocation memory "
+ "("
+ yarnMinAllocationMB
+ " MB) configured via '"
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "' should be greater than 0.");
}
final ClusterSpecification validClusterSpecification;
// 校验jobmanager和taskmanager要求的内存是否超过了内存限制
// 如果超过了最大可用内存,抛出异常退出。如果超过了剩余可用内存,给出告警
try {
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
// 从传入的参数判断是detached模式还是normal模式
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
// 设置执行模式到flink configuration
flinkConfiguration.setString(
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
// 启动application master(Job Manager)
ApplicationReport report =
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
// print the application id for user to cancel themselves.
// 如果是detach模式,打印用户提示信息
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
// 设置运行环境信息(JM地址,application ID等)到flink configuration
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
// 返回RestClusterClient,用于和Yarn application交互
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Error while creating RestClusterClient.", e);
}
};
}
startAppMaster
负责配置并在Yarn上启动Application Master(即Job Manager)。方法内容特别长。包含的逻辑主要如下:
- 初始化文件系统
- 上传Flink的jar和用户jar
- 上传shipped jar(-t参数指定)
- 上传
flink-conf.yaml
和log4j配置文件,kerberos配置文件等 - 设置AM container的环境变量
- 设置AM container的启动命令行,入口类为YarnSessionClusterEntrypoint
- 设置AM container的运行资源
- 启动AM(Job Manager)
详细代码解析如下所示:
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
// ------------------ Initialize the file systems -------------------------
// 创建共享文件系统
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
final FileSystem fs = FileSystem.get(yarnConfiguration);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme()
// method.
// 检查是否是GoogleHDFS,它没有重写getScheme方法
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
&& fs.getScheme().startsWith("file")) {
LOG.warn(
"The file system scheme is '"
+ fs.getScheme()
+ "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
// 获取Yarn app提交上下文
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
// 从yarn.provided.lib.dirs配置项解析出
// 该路径用于缓存flink运行所需的jar文件,避免每次提交任务从本地读取
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
// 获取提交yarn application时候需要存储文件的路径。配置项为yarn.staging-directory
// 默认值为文件系统的home目录
Path stagingDirPath = getStagingDir(fs);
FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
// 创建yarn application文件上传器
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
stagingDirFs,
stagingDirPath,
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
// The files need to be shipped and added to classpath.
// 记录需要上传并且需要添加到classpath的文件绝对路径
// 通过提交任务命令的-t参数指定
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
// 获取日志配置文件所在的路径,配置项为internal.yarn.log-config-file
// 不建议修改
final String logConfigFilePath =
configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
// 将日志配置文件加入到需要传输的文件集合中
systemShipFiles.add(new File(logConfigFilePath));
}
// Set-up ApplicationSubmissionContext for the application
// 获取application ID
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
// 设计集群高可用配置的cluster ID
setHAClusterIdIfNotSet(configuration, appId);
// 检查是否启用的高可用模式(high-availability)
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
// 设置app master的最大重试次数,配置项为yarn.application-attempts
// 如果没有配置,默认值为DEFAULT_RM_AM_MAX_ATTEMPTS(2)
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
// 激活HA支持
// 配置yarn.application-attempt-failures-validity-interval(在这个时间间隔内的重启尝试次数才会累加)
activateHighAvailabilitySupport(appContext);
} else {
// 如果没有启用HA配置。只设置尝试次数,默认为1
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
}
// 获取所有作业需要使用的jar的URI
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
// 获取需要随着作业jar一起上传的jar的URI,例如UDF等
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
// only for per job mode
// 只有使用per-job模式的时候,jobgraph才不为空
if (jobGraph != null) {
// 获取分布式缓存中的文件,写入文件路径到configuration
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
// 将provided lib文件添加到传输文件集合中
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
// 将provided lib dir注册为yarn本地资源,允许公开访问。不同yarn节点,不同的yarn app也能够访问
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
// 将systemShipFiles(传输文件集合)注册为本地资源,只允许当前application访问
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
// 这两类文件都属于系统classpath
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
// 上传Flink plugin
// 它们只需要传输但不需要加入到classpath的文件
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<File> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
}
// 处理需要传输的压缩文件
// 配置项为yarn.ship-archives。和arn.ship-files不同的是压缩文件上传后会自动解压
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.ARCHIVE);
}
// Upload and register user jars
// 上传用户jar
// 获取用户classpath
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
// usrlib will be automatically shipped if it exists.
// 如果存在user lib,也需要上传。然后加入到用户classpath中
if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
final Set<File> usrLibShipFiles = new HashSet<>();
addUsrLibFolderToShipFiles(usrLibShipFiles);
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
usrLibShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
}
// 如果user jar需要和system jar classpath放在一起且按照命名排序
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
// 排序system classpath和user classpath
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
// 如果是用户class在前,先append用户classpath
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// append系统classpath
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
// 将Flink jar(配置项yarn.flink-dist-jar)上传并加入到classpath中
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
// 写入job graph文件到yarn本地资源,加入classpath
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
// Upload the flink configuration
// write out configuration file
// 上传flink-conf.yaml到yarn
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
// 如果用户classpath在后,此时才需要添加user classpath到classPathBuilder
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
// 处理IN_TESTS模式,上传yarn-site.xml文件,配置krb5.conf文件到flink configuration
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
// 找到并上传krb5.conf文件
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
// 处理kerberos的keytab
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab =
flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
// 如果需要上传(yarn.security.kerberos.ship-local-keytab)
// 上传到yarn.security.kerberos.localized-keytab-path作为yarn的本地资源
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
// 解析Flink各部分内存配置
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
// 为AM container启动上下文配置启动命令行和命令行参数
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
// setup security tokens
// 配置HDFS delegation token
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
final List<Path> pathsToObtainToken = new ArrayList<>();
boolean fetchToken =
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
if (fetchToken) {
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
pathsToObtainToken.addAll(yarnAccessList);
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
}
Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
}
// 设置AM容器的本地资源
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
// 关闭文件上传器
fileUploader.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
// 设置AM的环境变量,包含classpath
final Map<String, String> appMasterEnv =
generateApplicationMasterEnv(
fileUploader,
classPathBuilder.toString(),
localResourceDescFlinkJar.toString(),
appId.toString());
// 配置keytab文件相关环境变量
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
// To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
// 设置AM container的环境变量
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
// 从这里开始配置AM container的资源
Resource capability = Records.newRecord(Resource.class);
// 设置内存
capability.setMemory(clusterSpecification.getMasterMemoryMB());
// 设置CPU vcore
capability.setVirtualCores(
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
// 配置app名称
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
// app类型
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
// 设置container配置和资源配置
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// Set priority for application
// 设置app优先级
int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
// 设置作业运行的队列
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
// 设置节点label和tag
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
// 增加JVM shutdown hook,部署失败的时候执行清理操作
// 停止掉application,清理app的文件
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
// 提交application
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
// 等待application启动成功
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
// 每250ms检测一次app启动状态
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// since deployment was successful, remove the hook
// 启动成功,移除shutdown hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
至此Flink on Yarn集群的AM(JobManager)已启动完毕。
三种部署模式
Flink on Yarn支持三种部署模式,分别为:
- Yarn Session模式
- Application模式
- Per job模式
Yarn Session模式仅在Yarn上配置一个空的Flink集群。提交时只有Job Manager运行,Task Manager随着提交的任务自动创建和销毁。
@Override
public ClusterClientProvider<ApplicationId> deploySessionCluster(
ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink session cluster",
getYarnSessionClusterEntrypoint(),
null, // 部署集群的时候用户作业没有提交,故JobGraph为null
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
}
}
Application Cluster模式,创建一个专用于用户作业的集群,该集群随着用户作业的执行而开始,随着用户作业的结束而销毁。用户作业的main
方法在集群中运行,而不是在提交作业客户端所在的机器。
@Override
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
// 检查部署模式是否为APPLICATION模式
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster."
+ " Expected deployment.target="
+ YarnDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget.getName()
+ "\"");
}
// 合并两个configuration
applicationConfiguration.applyToConfiguration(flinkConfiguration);
// 检查只能有一个pipeline jar
final List<String> pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
try {
return deployInternal(
clusterSpecification,
"Flink Application Cluster",
YarnApplicationClusterEntryPoint.class.getName(),
null, // 因为JobGraph的生成在Yarn集群中,因此启动Yarn Flink集群的时候该参数为null
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
}
}
Per-job 模式和Application模式相同,集群归当前用户作业独享。但不同的是用户作业的main方法在提交作业客户端所在机器执行。Yarn集群接收到的是客户端生成的JobGraph。
@Override
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
throws ClusterDeploymentException {
LOG.warn(
"Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph, // 这里传入客户端生成好的JobGraph
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
deployInternal
方法之前已分析过,不再赘述。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。