flink1.8 JobGraph生成流程

2019-09-27  本文已影响0人  todd5167

用户编写的可执行Jar包,在本地客户端生成JobGraph,先看下flink生成并提交JobGraph的大致流程。

提交过程

  1. 首先,生成PackagedProgram。
  2. 接着,通过PackagedProgramUtils从PackagedProgram中提取JobGraph。
  3. 最后,通过ClusterDescriptor将JobGraph提交到集群中。

关于任务提交这部分可以参考flinkStreamSQL任务提交流程代码,比直接看源码要简单很多。此处,我参考flink1.8源码进行说明。

生成PackagedProgram

buildProgram通过提取命令行中传递的参数,将用户程序jarFile、程序参数programArgs、程序依赖的classpaths传递给PackagedProgram。entryPointClass如果不传递,可以通过程序解析出来。

CliFrontend#buildProgram

PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
    String[] programArgs = options.getProgramArgs();
    String jarFilePath = options.getJarFilePath();
    List<URL> classpaths = options.getClasspaths();

    File jarFile = new File(jarFilePath);
    // main方法类
    String entryPointClass = options.getEntryPointClassName();

    PackagedProgram program = entryPointClass == null ?
            new PackagedProgram(jarFile, classpaths, programArgs) :
            new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
    return program;
}


public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
    //  用户jar路径 
    URL jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
    this.jarFile = jarFileUrl;
    this.args = args == null ? new String[0] : args;

    // 如果没有传递entryPointClassName, 可以通过manifest提取
    if (entryPointClassName == null) {
        entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
    }
    
    //抽取当前jar中包含的jar,包含/lib且名称以.jar结尾
    this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
    this.classpaths = classpaths;
    // 使用FlinkUserCodeClassLoaders加载getAllLibraries中的jar
    this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());

    // 使用FlinkUserCodeClassLoaders加载器加载该类,然后切换回父类加载器
    this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);

    // if the entry point is a program, instantiate the class and get the plan
    if (Program.class.isAssignableFrom(this.mainClass)) {
        Program prg = null;
        try {
            prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
        } catch (Exception e) {
            // validate that the class has a main method at least.
            // the main method possibly instantiates the program properly
            if (!hasMainMethod(mainClass)) {
                throw new ProgramInvocationException("The given program class implements the " +
                        Program.class.getName() + " interface, but cannot be instantiated. " +
                        "It also declares no main(String[]) method as alternative entry point", e);
            }
        } catch (Throwable t) {
            throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
        }
        this.program = prg;
    } else if (hasMainMethod(mainClass)) {
        //用户类不为Program的子类
        this.program = null;
    } else {
        throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
                Program.class.getName() + " interface.");
    }
}
提取JobGraph

通过生成的PackagedProgram、flinkconf中参数配置、任务并行度生成jobGraph,并通过clusterDescriptor进行提交。

CliFrontend#runProgram

## 部署任务到新集群
if (clusterId == null && runOptions.getDetachedMode()) {
    int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
    // 构建jobGraph
    final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

    final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
    //部署
    client = clusterDescriptor.deployJobCluster(
        clusterSpecification,
        jobGraph,
        runOptions.getDetachedMode());

    logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());

    try {
        client.shutdown();
    } catch (Exception e) {
        LOG.info("Could not properly shut down the client.", e);
    }
}

## createJobGraph

public static JobGraph createJobGraph(
        PackagedProgram packagedProgram,
        Configuration configuration,
        int defaultParallelism,
        @Nullable JobID jobID) throws ProgramInvocationException {
    Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
    // 优化器
    final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
    final FlinkPlan flinkPlan;

    final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);

    optimizerPlanEnvironment.setParallelism(defaultParallelism);
    
    // 形成StreamingPlan
    flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);

    final JobGraph jobGraph;

    if (flinkPlan instanceof StreamingPlan) {
        // 从StreamingPlan中提取jobGraph
        jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID);
        jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
    } else {
        final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
        jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
    }
    //  用户Jar中的依赖jar包传递给jobGraph
    for (URL url : packagedProgram.getAllLibraries()) {
        try {
            jobGraph.addJar(new Path(url.toURI()));
        } catch (URISyntaxException e) {
            throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e);
        }
    }
    // classpaths的传递有多中方式:
    // 1.通过packagedProgram构造函数直接传递
    // 2.先产生jobGraph,然后调用setClasspaths绑定。
    // 3. ContextEnvironment.getClasspaths.add(url)传递。
    jobGraph.setClasspaths(packagedProgram.getClasspaths());ContextEnvironment

    return jobGraph;
}


## flinkstreamsql中classpath传递
main方法中通过env.registerCachedFile(url.getPath(),  classFileName, true);将插件包的url存放到cacheFile中,当提交时取出并绑定给jobGraph。


private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
    Map<String, String> jobCacheFileConfig = jobGraph.getJobConfiguration().toMap();
    Set<String> classPathKeySet = Sets.newHashSet();

    for(Map.Entry<String, String> tmp : jobCacheFileConfig.entrySet()){
        if(Strings.isNullOrEmpty(tmp.getValue())){
            continue;
        }

        if(tmp.getValue().startsWith("class_path")){
            //DISTRIBUTED_CACHE_FILE_NAME_1
            //DISTRIBUTED_CACHE_FILE_PATH_1
            String key = tmp.getKey();
            String[] array = key.split("_");
            if(array.length < 5){
                continue;
            }

            array[3] = "PATH";
            classPathKeySet.add(StringUtils.join(array, "_"));
        }
    }

    for(String key : classPathKeySet){
        String pathStr = jobCacheFileConfig.get(key);
        jobGraph.getClasspaths().add(new URL("file:" + pathStr));
    }
}

形成StreamingPlan

在提取StreamingPlan时,需要先构建OptimizerPlanEnvironment,由该Environment通过反射执行用户Main类。在Main方法中,通过执行execute形成StreamingPlan并主动抛出ProgramAbortException,被上层捕获到。

     final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
     optimizerPlanEnvironment.setParallelism(defaultParallelism);
     ## 抽取StreamingPlan
     flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);

    public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
               
         // ExecutionEnvironmentFactory创建的对象为OptimizerPlanEnvironment
        setAsContext();
        try {
         // 在OptimizerPlanEnvironment环境下执行Main方法,形成optimizerPlan
            prog.invokeInteractiveModeForExecution();
        }
        catch (ProgramInvocationException e) {
            throw e;
        }
        catch (Throwable t) {
            // 捕获Main方法中抛出的异常,获取StreamingPlan
            if (optimizerPlan != null) {
                return optimizerPlan;
            } else {
                throw new ProgramInvocationException("The program caused an error: ", t);
            }
        }
        finally {
            unsetAsContext();
        }
    }

Main方法中的execute()方法:

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        //生成streamGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
    
        transformations.clear();

        if (env instanceof OptimizerPlanEnvironment) {
            ((OptimizerPlanEnvironment) env).setPlan(streamGraph);
        } else if (env instanceof PreviewPlanEnvironment) {
            ((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
        }

        throw new OptimizerPlanEnvironment.ProgramAbortException();
    }

StreamExecutionEnvironment.getExecutionEnvironment()方法获取执行环境:

由于在getOptimizedPlan中调用了setAsContext方法,ExecutionEnvironmentFactory创建OptimizerPlanEnvironment,因此返回StreamPlanEnvironment。

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment();
        }

        // because the streaming project depends on "flink-clients" (and not the other way around)
        // we currently need to intercept the data set environment and create a dependent stream env.
        // this should be fixed once we rework the project dependencies

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }
    }
  // 根据contextEnvironmentFactory生成ExecutionEnvironment
   public static ExecutionEnvironment getExecutionEnvironment() {
        return (ExecutionEnvironment)(contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment());
    }

总之,根据用户启动任务的环境,构造相应的执行环境。通过反射执行用户编写的Main方法,并抽取出StreamPlan,Main在执行时使用的是外部创建出的执行环境。

上一篇下一篇

猜你喜欢

热点阅读