Flink大数据架构Flink

Flink Client端作业处理流程

2021-04-23  本文已影响0人  todd5167

有Flink开发经验的用户应该知道,在flink-conf.yaml文件进行作业参数配置后,通过如下指令以Perjob模式,将作业提交到Yarn集群运行。

flink 1.12版本,命令行提交指令:
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

早期版本,命令行提交指令:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 

本文将对命令行后的代码逻辑进行讲解,让用户了解在Client端,Flink如何解析命令行参数、构建jobGraph、并提交到远程Yarn集群执行。

运行架构图

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。

image.pngimage.png

由此可见,Flink在Client端,基于用户作业构建出执行计划JobGraph,并将该JobGraph交由JobManager进
行调度执行。

Yarn 部署模式

基于Yarn,Flink支持三种模式的作业执行,即Application Mode、Per-Job Cluster Mode、Session Mode。

CliFrontend处理逻辑

flink run指令执行后,CliFrontend#Main方法处理具体的提交流程。在提交过程中主要围绕一下几个阶段进行,即构建最终使用的Configuration配置项、基于用户程序构建JobGraph、根据提交模式构建出ClusterDescriptor、由ClusterDescriptor部署生成的JobGraph。

构建配置信息

提交动作执行时,首先,基于flink-conf.yaml文件构建出全局默认的Configuration配置,接着,通过接收的命令指令覆盖默认的配置项。同时,也接收和程序执行相关的参数。

  1. find the configuration directory:

查找顺序:

  1. 查找环境变量中的FLINK_CONF_DIR参数值。
  2. 判断../conf下是否存在。
  3. 判断./conf下是否存在。
  4. load the global configuration:
    1. 加载configuration directory下的flink-conf.yaml文件,作为默认的configuration。
  5. load the custom command lines: 构建CustomCommandLine用来接收命令行参数,其中一个为Active。
    1. add GenericCLI: 新版本指令接收器, 接收-t -D参数。
    2. add FlinkYarnSessionCli。 向YarnSession集群提交作业时,接收的参数。
    3. add DefaultCLI。 向Standalone集群提交作业时,接收的参数。
  6. validate and get active commandLine:根据参数信息,筛选出active commandLine。

通过使用新版本命令行 -t yarn-per-job,确定active commandLine为GenericCLI。

  1. build ProgramOptions: 解析命令行中与程序相关的参数,填充给ProgramOptions。
  2. get effective Configuration: 通过调用active CommandLine的toConfiguration方法,构建出最终有效的
    Configuration。

构建JobGraph

在构建配置信息过程中,生成了programOptions包含程序执行使用的参数信息,生成effectiveConfiguration包含作业执行使用的配置信息。通过programOptions及effectiveConfiguration生成PackagedProgram,代表生成JobGraph使用的外部配置,例如JarFile、mainClas以及为加载用户程序构建的自定义类加载器userCodeClassLoader。

  1. getPackagedProgram(programOptions, effectiveConfiguration)
  PackagedProgram buildProgram(final ProgramOptions runOptions, final Configuration configuration)
            throws FileNotFoundException, ProgramInvocationException, CliArgsException {
        runOptions.validate();

        String[] programArgs = runOptions.getProgramArgs();
        String jarFilePath = runOptions.getJarFilePath();
        List<URL> classpaths = runOptions.getClasspaths();

        // Get assembler class
        String entryPointClass = runOptions.getEntryPointClassName();
        File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;

        return PackagedProgram.newBuilder()
                .setJarFile(jarFile)
                .setUserClassPaths(classpaths)
                .setEntryPointClassName(entryPointClass)
                .setConfiguration(configuration)
                .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
                .setArguments(programArgs)
                .build();
    }
  1. executeProgram。执行用户程序,生成Jobgraph,同时进行提交操作。

    在早期的flink版本中,Jobgraph的构建只由PackagedProgramUtils#createJobGraph来完成,提交动作由CliFronted进程从外部来完成。新版本中,CliFronted进程负责外部提交环境的准备,JobGraph的生成、提交均由StreamExecutionEnvironment#execute方法触发。
    CliFronted进程准备外部提交环境相关逻辑:

// execute Program
protected void executeProgram(.....) {
  ClientUtils.executeProgram( 
    ## *** 根据执行环境通过SPI加载PipelineExecutor(当前环境YarnJobClusterExecutor),
    ## *** 并最终创建ClusterDescriptor来进行作业提交。
    new DefaultExecutorServiceLoader(), 
    configuration, 
    program, 
    false, 
    false
  );
}

 public static void executeProgram(....) {
    final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(userCodeClassLoader);
        // 1. 准备作业执行依赖的外部执行环境
        ContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        StreamContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        try {
            //2. 通过反射触发用户程序的执行
            program.invokeInteractiveModeForExecution();
        } finally {
            ContextEnvironment.unsetAsContext();
            StreamContextEnvironment.unsetAsContext();
        }
    } finally {
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}  

用户程序触发构建JobGraph及提交流程:

  1. 通过SPI加载PipelineExecutorFactory。
  2. 根据PipelineExecutorFactory创建出PipelineExecutor。
  3. 由PipelineExecutor从StreamGraph中生成JobGraph,并提交远程集群。
 public JobExecutionResult execute(String jobName) throws Exception {
      return execute(getStreamGraph(jobName));
 }
 
 public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
      1. SPI加载PipelineExecutorFactory
    final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);
        2. 创建PipelineExecutor:   getExecutor(configuration)
    3. 完成创建jobGraph并提交:     execute
    CompletableFuture<JobClient> jobClientFuture =
            executorFactory
                    .getExecutor(configuration)
                    .execute(streamGraph, configuration, userClassloader);
            .....
}
AbstractJobClusterExecutor#execute:
public CompletableFuture<JobClient> execute(...)
        throws Exception {
    1. 从StreamGraph中提取JobGraph.
    final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
      
    2. 由构建出的ClusterDescriptor 提交JobGraph.
    try (final ClusterDescriptor<ClusterID> clusterDescriptor =
            clusterClientFactory.createClusterDescriptor(configuration)) {
        final ExecutionConfigAccessor configAccessor =
                ExecutionConfigAccessor.fromConfiguration(configuration);

        final ClusterSpecification clusterSpecification =
                clusterClientFactory.getClusterSpecification(configuration);

        final ClusterClientProvider<ClusterID> clusterClientProvider =
                clusterDescriptor.deployJobCluster(
                        clusterSpecification, jobGraph, configAccessor.getDetachedMode());
        LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

        return CompletableFuture.completedFuture(
                new ClusterClientJobClientAdapter<>(
                        clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
    }
}

构建ClusterDescriptor

在上一阶段构建JobGraph过程中,也同时创建了ClusterDescriptor,ClusterDescriptor是对作业要部署到Yarn、Mesos、k8s集群的描述,包含了要提交集群的相关信息。
以YarnClusterDescriptor为例,查看其生成过程:

  1. 通过DefaultExecutorServiceLoader#getExecutorFactory,创建YarnJobClusterExecutorFactory。
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
    checkNotNull(configuration);
    1. 通过SPI加载当前环境下PipelineExecutorFactory的实现
    final ServiceLoader<PipelineExecutorFactory> loader =
            ServiceLoader.load(PipelineExecutorFactory.class);

    final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
    final Iterator<PipelineExecutorFactory> factories = loader.iterator();
    while (factories.hasNext()) {
        try {
            final PipelineExecutorFactory factory = factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable e) {
            if (e.getCause() instanceof NoClassDefFoundError) {
                LOG.info("Could not load factory due to missing dependencies.");
            } else {
                throw e;
            }
        }
    }

    return compatibleFactories.get(0);
}

2. ClusterExecutorFactory#getExecutor创建YarnJobClusterExecutor。YarnJobClusterExecutor的createClusterDescriptor创建出YarnClusterDescriptor。

   YarnJobClusterExecutorFactory
   @Override
    public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
        try {
            return new YarnJobClusterExecutor();
        } catch (NoClassDefFoundError e) {
            throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
        }
    }

3. YarnClusterClientFactory#createClusterDescriptor 构建逻辑。

  1. 从本地查找log4j.propertis文件路径,并填充到$internal.yarn.log-config-file。
  2. 创建指定集群的YarnClient,构建出YarnClusterDescriptor。
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
  // 1. 查找日志文件,并填充到$internal.yarn.log-config-file参数
  final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
  YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
  return getClusterDescriptor(configuration);
}

private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    // 2. 创建YarnClient
    final YarnClient yarnClient = YarnClient.createYarnClient();
    final YarnConfiguration yarnConfiguration =
            Utils.getYarnAndHadoopConfiguration(configuration);
    // 3. 初始化要链接的集群配置
    yarnClient.init(yarnConfiguration);
    yarnClient.start();
    // 4. 创建YarnClusterDescriptor
    return new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient),
            false);
}

接下来查看Flink如何获取hadoop conf,从而将作业提交到指定集群?

  1. 从flink.yarn.开头的配置项中读取。
  2. 从环境变量中的HADOOP_HOME文件夹查找。
  3. 从Flink configuration中的fs.hdfs.hdfsdefault参数项查找,已废弃。
  4. 从环境变量中的HADOOP_CONF_DIR文件夹下查找。
  5. 使用flink Configuration中的以flink.hadoop 为前缀的配置项。
Utils.getYarnAndHadoopConfiguration(configuration);

public static YarnConfiguration getYarnAndHadoopConfiguration(..) {
     // 1. 从flink.yarn开头的配置项。
     YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
   // 2.
     yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
    return yarnConfig;
}

public static Configuration getHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfiguration) {
    Configuration result = new HdfsConfiguration();
    boolean foundHadoopConfiguration = false

    // Approach 1: HADOOP_HOME environment variables
    String[] possibleHadoopConfPaths = new String[2];
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) {
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    }
    for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
        if (possibleHadoopConfPath != null) {
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        }
    }

    // Approach 2: Flink configuration (deprecated)
    final String hdfsDefaultPath =
            flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
                "Using hdfs-default configuration-file path from Flink config: {}",
                hdfsDefaultPath);
        foundHadoopConfiguration = true;
    }

    final String hdfsSitePath =
            flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
                "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
        foundHadoopConfiguration = true;
    }

    final String hadoopConfigPath =
            flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) {
        LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    }

    // Approach 3: HADOOP_CONF_DIR environment variable
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    }

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    for (String key : flinkConfiguration.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Hadoop config",
                        key,
                        newKey,
                        value);
                foundHadoopConfiguration = true;
            }
        }
    }

    return result;
}

JobGraph提交部署

通过ClusterDescriptor#deployJobCluster将JobGraph提交到远程集群。YarnClusterDescriptor提交流程就是创建YarnApp使用的几部。具体可以对比 Hadoop: Writing YARN Applications

### 相关参考
Flink 原理与实现:架构和拓扑概览)

上一篇下一篇

猜你喜欢

热点阅读