Flink支持用户自定义资源

2020-11-17  本文已影响0人  BigDatavid

一、构建JobGraph

JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, flinkConfiguration, parallelism, false);

在这个入口中,构建program

PackagedProgram program = PackagedProgram.newBuilder()
                .setJarFile(runJarFile)
                .setArguments(execArgs)
                .setUserClassPaths(classPaths)
                .setSavepointRestoreSettings(savepointRestoreSettings)
                .build();

classPaths中需指定用户资源:

List<URL> classPaths = new ArrayList<URL>();
        classPaths.add(new URL("file://D:\\tmp\\flink\\jar\\udf_test-1.0-SNAPSHOT.jar"));//jar包classpath为jar包本身
        classPaths.add(new URL("file://D:\\tmp\\flink\\jar\\"));//非jar文件classpath为文件目录

二、添加文件

1、standalone模式

public static void fillDependFilesJobGraph(JobGraph jobGraph, String[] dependFiles) {
        Arrays.stream(dependFiles).forEach(path -> jobGraph.addJar(new Path("file://" + path)));
    }

将user资源通过jobGraph.addJar()方法添加到jobGraph

2、yarn-cluster模式

YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) YarnClusterClientFactory.INSTANCE
                .createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
clusterDescriptor.addShipFiles(shipFiles);

通过addShipFiles()方法将用户资源添加到yarn所需的资源列表中

上一篇 下一篇

猜你喜欢

热点阅读