玩转大数据Flink学习指南Flink

Flink 源码之 yarn-session 启动流程

2022-04-06  本文已影响0人  AlienPaul

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

背景

本篇分析Flink yarn-session.sh部署Flink集群到Yarn的逻辑。

启动Yarn session的命令位于yarn-session.sh。从这个文件我们可以找到入口类为org.apache.flink.yarn.cli.FlinkYarnSessionCli。我们从这里开始分析。

FlinkYarnSessionCli

FlinkYarnSessionCli是Yarn session的命令行交互工具。它的main方法创建一个FlinkYarnSessionCli对象,并调用run方法。

public static void main(final String[] args) {
    // 获取Flink conf目录所在位置
    final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();

    // 从FLINK_CONF_DIR查找flink-conf.yaml。如果不存在,返回空配置
    final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();

    int retCode;

    try {
        // 创建FlinkYarnSessionCli
        final FlinkYarnSessionCli cli =
                new FlinkYarnSessionCli(
                        flinkConfiguration,
                        configurationDirectory,
                        "",
                        ""); // no prefix for the YARN session

        // 执行认证等操作
        SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));

        // 在认证环境执行run方法
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
    } catch (CliArgsException e) {
        retCode = handleCliArgsException(e, LOG);
    } catch (Throwable t) {
        final Throwable strippedThrowable =
                ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        retCode = handleError(strippedThrowable, LOG);
    }

    System.exit(retCode);
}

FlinkYarnSessionCli的构造方法创建yarn session的命令行参数定义(解析命令行用),然后检查yarn属性文件是否存在。

yarn属性文件位于/tmp/.yarn-properties-用户名。内容如下:

dynamicPropertiesString=
applicationID=application_xxxxxxxxxxxxx_xxxx

它记录了某个用户最近一次提交的application ID。

public FlinkYarnSessionCli(
        Configuration configuration,
        ClusterClientServiceLoader clusterClientServiceLoader,
        String configurationDirectory,
        String shortPrefix,
        String longPrefix,
        boolean acceptInteractiveInput)
        throws FlinkException {
    super(configuration, shortPrefix, longPrefix);
    this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
    this.configurationDirectory = checkNotNull(configurationDirectory);
    this.acceptInteractiveInput = acceptInteractiveInput;

    // Create the command line options
    // 创建各个命令行参数
    // 使用了Apache common cli
    query =
            new Option(
                    shortPrefix + "q",
                    longPrefix + "query",
                    false,
                    "Display available YARN resources (memory, cores)");
    queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
    shipPath =
            new Option(
                    shortPrefix + "t",
                    longPrefix + "ship",
                    true,
                    "Ship files in the specified directory (t for transfer)");
    flinkJar =
            new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
    jmMemory =
            new Option(
                    shortPrefix + "jm",
                    longPrefix + "jobManagerMemory",
                    true,
                    "Memory for JobManager Container with optional unit (default: MB)");
    tmMemory =
            new Option(
                    shortPrefix + "tm",
                    longPrefix + "taskManagerMemory",
                    true,
                    "Memory per TaskManager Container with optional unit (default: MB)");
    slots =
            new Option(
                    shortPrefix + "s",
                    longPrefix + "slots",
                    true,
                    "Number of slots per TaskManager");
    dynamicproperties =
            Option.builder(shortPrefix + "D")
                    .argName("property=value")
                    .numberOfArgs(2)
                    .valueSeparator()
                    .desc("use value for given property")
                    .build();
    name =
            new Option(
                    shortPrefix + "nm",
                    longPrefix + "name",
                    true,
                    "Set a custom name for the application on YARN");
    applicationType =
            new Option(
                    shortPrefix + "at",
                    longPrefix + "applicationType",
                    true,
                    "Set a custom application type for the application on YARN");
    zookeeperNamespace =
            new Option(
                    shortPrefix + "z",
                    longPrefix + "zookeeperNamespace",
                    true,
                    "Namespace to create the Zookeeper sub-paths for high availability mode");
    nodeLabel =
            new Option(
                    shortPrefix + "nl",
                    longPrefix + "nodeLabel",
                    true,
                    "Specify YARN node label for the YARN application");
    help =
            new Option(
                    shortPrefix + "h",
                    longPrefix + "help",
                    false,
                    "Help for the Yarn session CLI.");

    allOptions = new Options();
    allOptions.addOption(flinkJar);
    allOptions.addOption(jmMemory);
    allOptions.addOption(tmMemory);
    allOptions.addOption(queue);
    allOptions.addOption(query);
    allOptions.addOption(shipPath);
    allOptions.addOption(slots);
    allOptions.addOption(dynamicproperties);
    allOptions.addOption(DETACHED_OPTION);
    allOptions.addOption(YARN_DETACHED_OPTION);
    allOptions.addOption(name);
    allOptions.addOption(applicationId);
    allOptions.addOption(applicationType);
    allOptions.addOption(zookeeperNamespace);
    allOptions.addOption(nodeLabel);
    allOptions.addOption(help);

    // try loading a potential yarn properties file
    // 从配置yarn.properties-file.location读取yarn属性文件位置
    // 如果不配置,默认在系统临时文件目录
    // 该属性文件保存了当前用户提交的yarn sesion对应的application id
    this.yarnPropertiesFileLocation =
            configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
    final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);

    yarnPropertiesFile = new Properties();

    // 如果yarn属性文件存在
    if (yarnPropertiesLocation.exists()) {
        // 读取属性文件的值到yarnPropertiesFile
        LOG.info(
                "Found Yarn properties file under {}.",
                yarnPropertiesLocation.getAbsolutePath());

        try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
            yarnPropertiesFile.load(is);
        } catch (IOException ioe) {
            throw new FlinkException(
                    "Could not read the Yarn properties file "
                            + yarnPropertiesLocation
                            + ". Please delete the file at "
                            + yarnPropertiesLocation.getAbsolutePath()
                            + '.',
                    ioe);
        }

        // 读取yarn application id
        final String yarnApplicationIdString =
                yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);

        if (yarnApplicationIdString == null) {
            throw new FlinkException(
                    "Yarn properties file found but doesn't contain a "
                            + "Yarn application id. Please delete the file at "
                            + yarnPropertiesLocation.getAbsolutePath());
        }

        try {
            // try converting id to ApplicationId
            // 转换成ApplicationID对象
            yarnApplicationIdFromYarnProperties =
                    ConverterUtils.toApplicationId(yarnApplicationIdString);
        } catch (Exception e) {
            throw new FlinkException(
                    "YARN properties contain an invalid entry for "
                            + "application id: "
                            + yarnApplicationIdString
                            + ". Please delete the file at "
                            + yarnPropertiesLocation.getAbsolutePath(),
                    e);
        }
    } else {
        // 如果yarn属性文件不存在,设置为null
        yarnApplicationIdFromYarnProperties = null;
    }
}

分析完构造方法,我们接下来重点分析启动逻辑。它位于run方法,首先解析命令行参数,然后部署集群到Yarn上。

内容如下:

public int run(String[] args) throws CliArgsException, FlinkException {
    //
    //  Command Line Options
    //
    // 解析命令行
    final CommandLine cmd = parseCommandLineOptions(args, true);

    // 如果有help参数,打印帮助信息
    if (cmd.hasOption(help.getOpt())) {
        printUsage();
        return 0;
    }
    // 创建configuration(上面传入的flinkConfiguration)
    final Configuration effectiveConfiguration = new Configuration(configuration);
    // 解析特定命令行参数,放入configuration
    final Configuration commandLineConfiguration = toConfiguration(cmd);
    // 合并这两个configuration
    effectiveConfiguration.addAll(commandLineConfiguration);
    LOG.debug("Effective configuration: {}", effectiveConfiguration);

    final ClusterClientFactory<ApplicationId> yarnClusterClientFactory =
            clusterClientServiceLoader.getClusterClientFactory(effectiveConfiguration);
    effectiveConfiguration.set(
            DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName());

    // 创建Yarn cluster描述符
    // 包含YarnClient,flink配置和yarn配置等
    final YarnClusterDescriptor yarnClusterDescriptor =
            (YarnClusterDescriptor)
                    yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);

    try {
        // Query cluster for metrics
        if (cmd.hasOption(query.getOpt())) {
            // 如果是查询集群状态命令
            final String description = yarnClusterDescriptor.getClusterDescription();
            System.out.println(description);
            return 0;
        } else {
            final ClusterClientProvider<ApplicationId> clusterClientProvider;
            final ApplicationId yarnApplicationId;

            if (cmd.hasOption(applicationId.getOpt())) {
                yarnApplicationId =
                        ConverterUtils.toApplicationId(
                                cmd.getOptionValue(applicationId.getOpt()));

                // 获取Yarn上已经存在的Flink集群
                clusterClientProvider = yarnClusterDescriptor.retrieve(yarnApplicationId);
            } else {
                // 获取集群资源参数配置
                // 包含job manager内存,task manager内存和每个TM的slot数量
                final ClusterSpecification clusterSpecification =
                        yarnClusterClientFactory.getClusterSpecification(
                                effectiveConfiguration);

                // 部署Flink集群
                clusterClientProvider =
                        yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
                // 获取集群对应的client
                ClusterClient<ApplicationId> clusterClient =
                        clusterClientProvider.getClusterClient();

                // ------------------ ClusterClient deployed, handle connection details
                // 获取Yarn application ID
                yarnApplicationId = clusterClient.getClusterId();

                try {
                    // 打印出web页面访问地址
                    System.out.println(
                            "JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
                    
                    // 写入Yarn属性文件,稍后分析
                    writeYarnPropertiesFile(yarnApplicationId, dynamicPropertiesEncoded);
                } catch (Exception e) {
                    try {
                        // 遇到异常,关闭集群client
                        clusterClient.close();
                    } catch (Exception ex) {
                        LOG.info("Could not properly shutdown cluster client.", ex);
                    }

                    try {
                        // 停止Yarn application
                        yarnClusterDescriptor.killCluster(yarnApplicationId);
                    } catch (FlinkException fe) {
                        LOG.info("Could not properly terminate the Flink cluster.", fe);
                    }

                    throw new FlinkException(
                            "Could not write the Yarn connection information.", e);
                }
            }

            if (!effectiveConfiguration.getBoolean(DeploymentOptions.ATTACHED)) {
                // 如果是detached模式,打印detached集群信息
                YarnClusterDescriptor.logDetachedClusterInformation(yarnApplicationId, LOG);
            } else {
                ScheduledExecutorService scheduledExecutorService =
                        Executors.newSingleThreadScheduledExecutor();

                // 创建Yarn application状态监视器
                // 另起一个线程周期性检测状态
                final YarnApplicationStatusMonitor yarnApplicationStatusMonitor =
                        new YarnApplicationStatusMonitor(
                                yarnClusterDescriptor.getYarnClient(),
                                yarnApplicationId,
                                new ScheduledExecutorServiceAdapter(scheduledExecutorService));
                // 创建shutdown hook,JVM停止时关闭集群
                Thread shutdownHook =
                        ShutdownHookUtil.addShutdownHook(
                                () ->
                                        shutdownCluster(
                                                clusterClientProvider.getClusterClient(),
                                                scheduledExecutorService,
                                                yarnApplicationStatusMonitor),
                                getClass().getSimpleName(),
                                LOG);
                try {
                    // 运行交互式client
                    runInteractiveCli(yarnApplicationStatusMonitor, acceptInteractiveInput);
                } finally {
                    // 关闭集群
                    shutdownCluster(
                            clusterClientProvider.getClusterClient(),
                            scheduledExecutorService,
                            yarnApplicationStatusMonitor);

                    if (shutdownHook != null) {
                        // we do not need the hook anymore as we have just tried to shutdown the
                        // cluster.
                        ShutdownHookUtil.removeShutdownHook(
                                shutdownHook, getClass().getSimpleName(), LOG);
                    }

                    tryRetrieveAndLogApplicationReport(
                            yarnClusterDescriptor.getYarnClient(), yarnApplicationId);
                }
            }
        }
    } finally {
        try {
            yarnClusterDescriptor.close();
        } catch (Exception e) {
            LOG.info("Could not properly close the yarn cluster descriptor.", e);
        }
    }

    return 0;
}

writeYarnPropertiesFile方法负责写入上面所述的yarn属性文件。内容如下所示:

private void writeYarnPropertiesFile(
        ApplicationId yarnApplicationId, @Nullable String dynamicProperties) {
    // file that we write into the conf/ dir containing the jobManager address and the dop.
    // 获取系统临时文件目录和执行的用户名。生成文件名为"{临时文件目录}/.yarn-properties-{用户名}"的文件
    // 例如临时文件目录为/tmp,用户名为root,生成的文件名为"/tmp/.yarn-properties-root"
    final File yarnPropertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);

    Properties yarnProps = new Properties();
    // 写入applicationID
    yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());

    // add dynamic properties
    // 写入dynamicPropertiesString
    if (dynamicProperties != null) {
        yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
    }
    
    // 写入属性到文件
    writeYarnProperties(yarnProps, yarnPropertiesFile);
}

YarnClusterDescriptor

YarnClusterDescriptor负责部署Flink集群到yarn上。部署集群的方法为deploySessionCluster。它的代码分析如下:

@Override
public ClusterClientProvider<ApplicationId> deploySessionCluster(
        ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
    try {
        return deployInternal(
                clusterSpecification,
                "Flink session cluster",
                getYarnSessionClusterEntrypoint(),
                null,
                false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
    }
}

部署的具体逻辑位于deployInternal方法中,分析如下:

private ClusterClientProvider<ApplicationId> deployInternal(
        ClusterSpecification clusterSpecification,
        String applicationName,
        String yarnClusterEntrypoint,
        @Nullable JobGraph jobGraph,
        boolean detached)
        throws Exception {

    // 获得当前提交任务的用户
    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
        // 如果启用了kerberos
        // 获取flink的配置,是否启用ticket cache
        boolean useTicketCache =
                flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

        // 检查当前登录凭据是否有效(ticket cache和delegation token)
        if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
            throw new RuntimeException(
                    "Hadoop security with Kerberos is enabled but the login user "
                            + "does not have Kerberos credentials or delegation tokens!");
        }

        // 检查fetchToken和yarn access相关配置
        final boolean fetchToken =
                flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
        final boolean yarnAccessFSEnabled =
                !CollectionUtil.isNullOrEmpty(
                        flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
        if (!fetchToken && yarnAccessFSEnabled) {
            throw new IllegalConfigurationException(
                    String.format(
                            "When %s is disabled, %s must be disabled as well.",
                            SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
                            YarnConfigOptions.YARN_ACCESS.key()));
        }
    }

    // 检查jar,configuration和资源要求等是否可用
    isReadyForDeployment(clusterSpecification);

    // ------------------ Check if the specified queue exists --------------------

    // 检查配置的yarn queue是否存在,size是否大于0
    checkYarnQueues(yarnClient);

    // ------------------ Check if the YARN ClusterClient has the requested resources
    // --------------

    // Create application via yarnClient
    // 创建一个yarn application
    final YarnClientApplication yarnApplication = yarnClient.createApplication();
    final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

    // 获取最大可用的vcore和内存
    Resource maxRes = appResponse.getMaximumResourceCapability();

    final ClusterResourceDescription freeClusterMem;
    try {
        // 获取可用内存资源,包含总可用内存,单个container最大可用内存和每个node manager的可用内存
        freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    } catch (YarnException | IOException e) {
        failSessionDuringDeployment(yarnClient, yarnApplication);
        throw new YarnDeploymentException(
                "Could not retrieve information about free cluster resources.", e);
    }

    // 获取最小分配内存
    final int yarnMinAllocationMB =
            yarnConfiguration.getInt(
                    YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                    YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    if (yarnMinAllocationMB <= 0) {
        throw new YarnDeploymentException(
                "The minimum allocation memory "
                        + "("
                        + yarnMinAllocationMB
                        + " MB) configured via '"
                        + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
                        + "' should be greater than 0.");
    }

    final ClusterSpecification validClusterSpecification;
    // 校验jobmanager和taskmanager要求的内存是否超过了内存限制
    // 如果超过了最大可用内存,抛出异常退出。如果超过了剩余可用内存,给出告警
    try {
        validClusterSpecification =
                validateClusterResources(
                        clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
    } catch (YarnDeploymentException yde) {
        failSessionDuringDeployment(yarnClient, yarnApplication);
        throw yde;
    }

    LOG.info("Cluster specification: {}", validClusterSpecification);

    // 从传入的参数判断是detached模式还是normal模式
    final ClusterEntrypoint.ExecutionMode executionMode =
            detached
                    ? ClusterEntrypoint.ExecutionMode.DETACHED
                    : ClusterEntrypoint.ExecutionMode.NORMAL;

    // 设置执行模式到flink configuration
    flinkConfiguration.setString(
            ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());

    // 启动application master(Job Manager)
    ApplicationReport report =
            startAppMaster(
                    flinkConfiguration,
                    applicationName,
                    yarnClusterEntrypoint,
                    jobGraph,
                    yarnClient,
                    yarnApplication,
                    validClusterSpecification);

    // print the application id for user to cancel themselves.
    // 如果是detach模式,打印用户提示信息
    if (detached) {
        final ApplicationId yarnApplicationId = report.getApplicationId();
        logDetachedClusterInformation(yarnApplicationId, LOG);
    }

    // 设置运行环境信息(JM地址,application ID等)到flink configuration
    setClusterEntrypointInfoToConfig(report);

    return () -> {
        try {
            // 返回RestClusterClient,用于和Yarn application交互
            return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
        } catch (Exception e) {
            throw new RuntimeException("Error while creating RestClusterClient.", e);
        }
    };
}

startAppMaster负责配置并在Yarn上启动Application Master(即Job Manager)。方法内容特别长。包含的逻辑主要如下:

详细代码解析如下所示:

private ApplicationReport startAppMaster(
        Configuration configuration,
        String applicationName,
        String yarnClusterEntrypoint,
        JobGraph jobGraph,
        YarnClient yarnClient,
        YarnClientApplication yarnApplication,
        ClusterSpecification clusterSpecification)
        throws Exception {

    // ------------------ Initialize the file systems -------------------------

    // 创建共享文件系统
    org.apache.flink.core.fs.FileSystem.initialize(
            configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

    final FileSystem fs = FileSystem.get(yarnConfiguration);

    // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
    // method.
    // 检查是否是GoogleHDFS,它没有重写getScheme方法
    if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
            && fs.getScheme().startsWith("file")) {
        LOG.warn(
                "The file system scheme is '"
                        + fs.getScheme()
                        + "'. This indicates that the "
                        + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                        + "The Flink YARN client needs to store its files in a distributed file system");
    }

    // 获取Yarn app提交上下文
    ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

    // 从yarn.provided.lib.dirs配置项解析出
    // 该路径用于缓存flink运行所需的jar文件,避免每次提交任务从本地读取
    final List<Path> providedLibDirs =
            Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);

    // 获取提交yarn application时候需要存储文件的路径。配置项为yarn.staging-directory
    // 默认值为文件系统的home目录
    Path stagingDirPath = getStagingDir(fs);
    FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
    // 创建yarn application文件上传器
    final YarnApplicationFileUploader fileUploader =
            YarnApplicationFileUploader.from(
                    stagingDirFs,
                    stagingDirPath,
                    providedLibDirs,
                    appContext.getApplicationId(),
                    getFileReplication());

    // The files need to be shipped and added to classpath.
    // 记录需要上传并且需要添加到classpath的文件绝对路径
    // 通过提交任务命令的-t参数指定
    Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
    for (File file : shipFiles) {
        systemShipFiles.add(file.getAbsoluteFile());
    }

    // 获取日志配置文件所在的路径,配置项为internal.yarn.log-config-file
    // 不建议修改
    final String logConfigFilePath =
            configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
    if (logConfigFilePath != null) {
        // 将日志配置文件加入到需要传输的文件集合中
        systemShipFiles.add(new File(logConfigFilePath));
    }

    // Set-up ApplicationSubmissionContext for the application

    // 获取application ID
    final ApplicationId appId = appContext.getApplicationId();

    // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
    
    // 设计集群高可用配置的cluster ID
    setHAClusterIdIfNotSet(configuration, appId);

    // 检查是否启用的高可用模式(high-availability)
    if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
        // activate re-execution of failed applications
        // 设置app master的最大重试次数,配置项为yarn.application-attempts
        // 如果没有配置,默认值为DEFAULT_RM_AM_MAX_ATTEMPTS(2)
        appContext.setMaxAppAttempts(
                configuration.getInteger(
                        YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

        // 激活HA支持
        // 配置yarn.application-attempt-failures-validity-interval(在这个时间间隔内的重启尝试次数才会累加)
        activateHighAvailabilitySupport(appContext);
    } else {
        // 如果没有启用HA配置。只设置尝试次数,默认为1
        // set number of application retries to 1 in the default case
        appContext.setMaxAppAttempts(
                configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
    }

    // 获取所有作业需要使用的jar的URI
    final Set<Path> userJarFiles = new HashSet<>();
    if (jobGraph != null) {
        userJarFiles.addAll(
                jobGraph.getUserJars().stream()
                        .map(f -> f.toUri())
                        .map(Path::new)
                        .collect(Collectors.toSet()));
    }

    // 获取需要随着作业jar一起上传的jar的URI,例如UDF等
    final List<URI> jarUrls =
            ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
    if (jarUrls != null
            && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
        userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
    }

    // only for per job mode
    // 只有使用per-job模式的时候,jobgraph才不为空
    if (jobGraph != null) {
        // 获取分布式缓存中的文件,写入文件路径到configuration
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                jobGraph.getUserArtifacts().entrySet()) {
            // only upload local files
            if (!Utils.isRemotePath(entry.getValue().filePath)) {
                Path localPath = new Path(entry.getValue().filePath);
                Tuple2<Path, Long> remoteFileInfo =
                        fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                jobGraph.setUserArtifactRemotePath(
                        entry.getKey(), remoteFileInfo.f0.toString());
            }
        }

        jobGraph.writeUserArtifactEntriesToConfiguration();
    }

    // 将provided lib文件添加到传输文件集合中
    if (providedLibDirs == null || providedLibDirs.isEmpty()) {
        addLibFoldersToShipFiles(systemShipFiles);
    }

    // Register all files in provided lib dirs as local resources with public visibility
    // and upload the remaining dependencies as local resources with APPLICATION visibility.
    // 将provided lib dir注册为yarn本地资源,允许公开访问。不同yarn节点,不同的yarn app也能够访问
    final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
    // 将systemShipFiles(传输文件集合)注册为本地资源,只允许当前application访问
    final List<String> uploadedDependencies =
            fileUploader.registerMultipleLocalResources(
                    systemShipFiles.stream()
                            .map(e -> new Path(e.toURI()))
                            .collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.FILE);
    // 这两类文件都属于系统classpath
    systemClassPaths.addAll(uploadedDependencies);

    // upload and register ship-only files
    // Plugin files only need to be shipped and should not be added to classpath.
    // 上传Flink plugin
    // 它们只需要传输但不需要加入到classpath的文件
    if (providedLibDirs == null || providedLibDirs.isEmpty()) {
        Set<File> shipOnlyFiles = new HashSet<>();
        addPluginsFoldersToShipFiles(shipOnlyFiles);
        fileUploader.registerMultipleLocalResources(
                shipOnlyFiles.stream()
                        .map(e -> new Path(e.toURI()))
                        .collect(Collectors.toSet()),
                Path.CUR_DIR,
                LocalResourceType.FILE);
    }

    // 处理需要传输的压缩文件
    // 配置项为yarn.ship-archives。和arn.ship-files不同的是压缩文件上传后会自动解压
    if (!shipArchives.isEmpty()) {
        fileUploader.registerMultipleLocalResources(
                shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
                Path.CUR_DIR,
                LocalResourceType.ARCHIVE);
    }

    // Upload and register user jars
    // 上传用户jar
    // 获取用户classpath
    final List<String> userClassPaths =
            fileUploader.registerMultipleLocalResources(
                    userJarFiles,
                    userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                            ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                            : Path.CUR_DIR,
                    LocalResourceType.FILE);

    // usrlib will be automatically shipped if it exists.
    // 如果存在user lib,也需要上传。然后加入到用户classpath中
    if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
        final Set<File> usrLibShipFiles = new HashSet<>();
        addUsrLibFolderToShipFiles(usrLibShipFiles);
        final List<String> usrLibClassPaths =
                fileUploader.registerMultipleLocalResources(
                        usrLibShipFiles.stream()
                                .map(e -> new Path(e.toURI()))
                                .collect(Collectors.toSet()),
                        Path.CUR_DIR,
                        LocalResourceType.FILE);
        userClassPaths.addAll(usrLibClassPaths);
    }

    // 如果user jar需要和system jar classpath放在一起且按照命名排序
    if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
        systemClassPaths.addAll(userClassPaths);
    }

    // normalize classpath by sorting
    // 排序system classpath和user classpath
    Collections.sort(systemClassPaths);
    Collections.sort(userClassPaths);

    // classpath assembler
    StringBuilder classPathBuilder = new StringBuilder();
    // 如果是用户class在前,先append用户classpath
    if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
        for (String userClassPath : userClassPaths) {
            classPathBuilder.append(userClassPath).append(File.pathSeparator);
        }
    }
    // append系统classpath
    for (String classPath : systemClassPaths) {
        classPathBuilder.append(classPath).append(File.pathSeparator);
    }

    // Setup jar for ApplicationMaster
    // 将Flink jar(配置项yarn.flink-dist-jar)上传并加入到classpath中
    final YarnLocalResourceDescriptor localResourceDescFlinkJar =
            fileUploader.uploadFlinkDist(flinkJarPath);
    classPathBuilder
            .append(localResourceDescFlinkJar.getResourceKey())
            .append(File.pathSeparator);

    // write job graph to tmp file and add it to local resource
    // TODO: server use user main method to generate job graph
    // 写入job graph文件到yarn本地资源,加入classpath
    if (jobGraph != null) {
        File tmpJobGraphFile = null;
        try {
            tmpJobGraphFile = File.createTempFile(appId.toString(), null);
            try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                    ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                obOutput.writeObject(jobGraph);
            }

            final String jobGraphFilename = "job.graph";
            configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

            fileUploader.registerSingleLocalResource(
                    jobGraphFilename,
                    new Path(tmpJobGraphFile.toURI()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    false);
            classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
        } catch (Exception e) {
            LOG.warn("Add job graph to local resource fail.");
            throw e;
        } finally {
            if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
            }
        }
    }

    // Upload the flink configuration
    // write out configuration file
    // 上传flink-conf.yaml到yarn
    File tmpConfigurationFile = null;
    try {
        tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
        BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

        String flinkConfigKey = "flink-conf.yaml";
        fileUploader.registerSingleLocalResource(
                flinkConfigKey,
                new Path(tmpConfigurationFile.getAbsolutePath()),
                "",
                LocalResourceType.FILE,
                true,
                true);
        classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
    } finally {
        if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
            LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
        }
    }

    // 如果用户classpath在后,此时才需要添加user classpath到classPathBuilder
    if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
        for (String userClassPath : userClassPaths) {
            classPathBuilder.append(userClassPath).append(File.pathSeparator);
        }
    }

    // To support Yarn Secure Integration Test Scenario
    // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
    // the Yarn site XML
    // and KRB5 configuration files. We are adding these files as container local resources for
    // the container
    // applications (JM/TMs) to have proper secure cluster setup
    // 处理IN_TESTS模式,上传yarn-site.xml文件,配置krb5.conf文件到flink configuration
    Path remoteYarnSiteXmlPath = null;
    if (System.getenv("IN_TESTS") != null) {
        File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
        LOG.info(
                "Adding Yarn configuration {} to the AM container local resource bucket",
                f.getAbsolutePath());
        Path yarnSitePath = new Path(f.getAbsolutePath());
        remoteYarnSiteXmlPath =
                fileUploader
                        .registerSingleLocalResource(
                                Utils.YARN_SITE_FILE_NAME,
                                yarnSitePath,
                                "",
                                LocalResourceType.FILE,
                                false,
                                false)
                        .getPath();
        if (System.getProperty("java.security.krb5.conf") != null) {
            configuration.set(
                    SecurityOptions.KERBEROS_KRB5_PATH,
                    System.getProperty("java.security.krb5.conf"));
        }
    }

    // 找到并上传krb5.conf文件
    Path remoteKrb5Path = null;
    boolean hasKrb5 = false;
    String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
    if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
        final File krb5 = new File(krb5Config);
        LOG.info(
                "Adding KRB5 configuration {} to the AM container local resource bucket",
                krb5.getAbsolutePath());
        final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
        remoteKrb5Path =
                fileUploader
                        .registerSingleLocalResource(
                                Utils.KRB5_FILE_NAME,
                                krb5ConfPath,
                                "",
                                LocalResourceType.FILE,
                                false,
                                false)
                        .getPath();
        hasKrb5 = true;
    }

    // 处理kerberos的keytab
    Path remotePathKeytab = null;
    String localizedKeytabPath = null;
    String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
    if (keytab != null) {
        boolean localizeKeytab =
                flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
        localizedKeytabPath =
                flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
        // 如果需要上传(yarn.security.kerberos.ship-local-keytab)
        // 上传到yarn.security.kerberos.localized-keytab-path作为yarn的本地资源
        if (localizeKeytab) {
            // Localize the keytab to YARN containers via local resource.
            LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
            remotePathKeytab =
                    fileUploader
                            .registerSingleLocalResource(
                                    localizedKeytabPath,
                                    new Path(keytab),
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
        } else {
            // // Assume Keytab is pre-installed in the container.
            localizedKeytabPath =
                    flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
        }
    }

    // 解析Flink各部分内存配置
    final JobManagerProcessSpec processSpec =
            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                    flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
    // 为AM container启动上下文配置启动命令行和命令行参数
    final ContainerLaunchContext amContainer =
            setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

    // setup security tokens
    // 配置HDFS delegation token
    if (UserGroupInformation.isSecurityEnabled()) {
        // set HDFS delegation tokens when security is enabled
        LOG.info("Adding delegation token to the AM container.");
        final List<Path> pathsToObtainToken = new ArrayList<>();
        boolean fetchToken =
                configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
        if (fetchToken) {
            List<Path> yarnAccessList =
                    ConfigUtils.decodeListFromConfig(
                            configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
            pathsToObtainToken.addAll(yarnAccessList);
            pathsToObtainToken.addAll(fileUploader.getRemotePaths());
        }
        Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
    }

    // 设置AM容器的本地资源
    amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
    // 关闭文件上传器
    fileUploader.close();

    // Setup CLASSPATH and environment variables for ApplicationMaster
    // 设置AM的环境变量,包含classpath
    final Map<String, String> appMasterEnv =
            generateApplicationMasterEnv(
                    fileUploader,
                    classPathBuilder.toString(),
                    localResourceDescFlinkJar.toString(),
                    appId.toString());

    // 配置keytab文件相关环境变量
    if (localizedKeytabPath != null) {
        appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
        String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
        appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
        if (remotePathKeytab != null) {
            appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
        }
    }

    // To support Yarn Secure Integration Test Scenario
    if (remoteYarnSiteXmlPath != null) {
        appMasterEnv.put(
                YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
    }
    if (remoteKrb5Path != null) {
        appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
    }

    // 设置AM container的环境变量
    amContainer.setEnvironment(appMasterEnv);

    // Set up resource type requirements for ApplicationMaster
    // 从这里开始配置AM container的资源
    Resource capability = Records.newRecord(Resource.class);
    // 设置内存
    capability.setMemory(clusterSpecification.getMasterMemoryMB());
    // 设置CPU vcore
    capability.setVirtualCores(
            flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));

    // 配置app名称
    final String customApplicationName = customName != null ? customName : applicationName;

    appContext.setApplicationName(customApplicationName);
    // app类型
    appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
    // 设置container配置和资源配置
    appContext.setAMContainerSpec(amContainer);
    appContext.setResource(capability);

    // Set priority for application
    // 设置app优先级
    int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
    if (priorityNum >= 0) {
        Priority priority = Priority.newInstance(priorityNum);
        appContext.setPriority(priority);
    }

    // 设置作业运行的队列
    if (yarnQueue != null) {
        appContext.setQueue(yarnQueue);
    }

    // 设置节点label和tag
    setApplicationNodeLabel(appContext);

    setApplicationTags(appContext);

    // add a hook to clean up in case deployment fails
    // 增加JVM shutdown hook,部署失败的时候执行清理操作
    // 停止掉application,清理app的文件
    Thread deploymentFailureHook =
            new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
    Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
    LOG.info("Submitting application master " + appId);
    // 提交application
    yarnClient.submitApplication(appContext);

    LOG.info("Waiting for the cluster to be allocated");
    // 等待application启动成功
    final long startTime = System.currentTimeMillis();
    ApplicationReport report;
    YarnApplicationState lastAppState = YarnApplicationState.NEW;
    loop:
    while (true) {
        // 每250ms检测一次app启动状态
        try {
            report = yarnClient.getApplicationReport(appId);
        } catch (IOException e) {
            throw new YarnDeploymentException("Failed to deploy the cluster.", e);
        }
        YarnApplicationState appState = report.getYarnApplicationState();
        LOG.debug("Application State: {}", appState);
        switch (appState) {
            case FAILED:
            case KILLED:
                throw new YarnDeploymentException(
                        "The YARN application unexpectedly switched to state "
                                + appState
                                + " during deployment. \n"
                                + "Diagnostics from YARN: "
                                + report.getDiagnostics()
                                + "\n"
                                + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                + "yarn logs -applicationId "
                                + appId);
                // break ..
            case RUNNING:
                LOG.info("YARN application has been deployed successfully.");
                break loop;
            case FINISHED:
                LOG.info("YARN application has been finished successfully.");
                break loop;
            default:
                if (appState != lastAppState) {
                    LOG.info("Deploying cluster, current state " + appState);
                }
                if (System.currentTimeMillis() - startTime > 60000) {
                    LOG.info(
                            "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                }
        }
        lastAppState = appState;
        Thread.sleep(250);
    }

    // since deployment was successful, remove the hook
    // 启动成功,移除shutdown hook
    ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
    return report;
}

至此Flink on Yarn集群的AM(JobManager)已启动完毕。

三种部署模式

Flink on Yarn支持三种部署模式,分别为:

Yarn Session模式仅在Yarn上配置一个空的Flink集群。提交时只有Job Manager运行,Task Manager随着提交的任务自动创建和销毁。

@Override
public ClusterClientProvider<ApplicationId> deploySessionCluster(
        ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
    try {
        return deployInternal(
                clusterSpecification,
                "Flink session cluster",
                getYarnSessionClusterEntrypoint(),
                null, // 部署集群的时候用户作业没有提交,故JobGraph为null
                false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
    }
}

Application Cluster模式,创建一个专用于用户作业的集群,该集群随着用户作业的执行而开始,随着用户作业的结束而销毁。用户作业的main方法在集群中运行,而不是在提交作业客户端所在的机器。

@Override
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
        final ClusterSpecification clusterSpecification,
        final ApplicationConfiguration applicationConfiguration)
        throws ClusterDeploymentException {
    checkNotNull(clusterSpecification);
    checkNotNull(applicationConfiguration);

    // 检查部署模式是否为APPLICATION模式
    final YarnDeploymentTarget deploymentTarget =
            YarnDeploymentTarget.fromConfig(flinkConfiguration);
    if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
        throw new ClusterDeploymentException(
                "Couldn't deploy Yarn Application Cluster."
                        + " Expected deployment.target="
                        + YarnDeploymentTarget.APPLICATION.getName()
                        + " but actual one was \""
                        + deploymentTarget.getName()
                        + "\"");
    }

    // 合并两个configuration
    applicationConfiguration.applyToConfiguration(flinkConfiguration);

    // 检查只能有一个pipeline jar
    final List<String> pipelineJars =
            flinkConfiguration
                    .getOptional(PipelineOptions.JARS)
                    .orElse(Collections.emptyList());
    Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");

    try {
        return deployInternal(
                clusterSpecification,
                "Flink Application Cluster",
                YarnApplicationClusterEntryPoint.class.getName(),
                null, // 因为JobGraph的生成在Yarn集群中,因此启动Yarn Flink集群的时候该参数为null
                false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
    }
}

Per-job 模式和Application模式相同,集群归当前用户作业独享。但不同的是用户作业的main方法在提交作业客户端所在机器执行。Yarn集群接收到的是客户端生成的JobGraph。

@Override
public ClusterClientProvider<ApplicationId> deployJobCluster(
        ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
        throws ClusterDeploymentException {

    LOG.warn(
            "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
    try {
        return deployInternal(
                clusterSpecification,
                "Flink per-job cluster",
                getYarnJobClusterEntrypoint(),
                jobGraph, // 这里传入客户端生成好的JobGraph
                detached);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
    }
}

deployInternal方法之前已分析过,不再赘述。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

上一篇下一篇

猜你喜欢

热点阅读