使用java代码定时提交spark任务

2018-10-22  本文已影响0人  zxydut

在spark做批处理时,有时需要定时提交spark任务,可以采用写shell脚本和java代码两种方式。本文介绍使用java代码的方式。

@Scheduled(cron = "0 20 0 * * *")
    public void tnvDSumSchd(){
        try{
            List<String> taskType_files_list = new ArrayList<>();
            taskType_files_list.add("TESTIN_VIDEO_D_SUMMAY");
            String legalDir = tools.getLegalDir(testinHdfsDir);
            taskType_files_list.addAll(tools.fileLists(legalDir, new TestInFilterPath(FileType.VIDEO.getType() + getDay())));

            String[] sparkParam = new String[4];
            sparkParam[0] = "1g";
            sparkParam[1] = "2g";
            sparkParam[2] = "2";
            sparkParam[3] = "1";
            if (taskType_files_list.size() > 1){
                submitLancher.submit(taskType_files_list,sparkParam,"TESTIN_VIDEO_D_SUMMAY");
            }else {
                log.info("{} does not have any testin video file",legalDir);
            }
        } catch (InterruptedException e) {
            log.error(e.getMessage(),e);
        } catch (IOException e) {
            log.error(e.getMessage(),e);
        }
    }
@Component
@Slf4j
public class SubmitLancher {
    @Value("${spark.home}")
    private String sparkHome;//sparkhome的路径

    @Value("${spark.submit.appResource}")
    private String submitAppResource;//提交到spark上的jar包

    public void submit(List<String> taskType_files_list, String[] sparkParam,String appName) throws IOException, InterruptedException {
        long start = System.currentTimeMillis();
        String[] taskType_files = taskType_files_list.toArray(new String[taskType_files_list.size()]);

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new SparkLauncher()
                .setSparkHome(sparkHome)
                .setMaster("yarn")
                .setAppName(appName)
                .setConf("spark.driver.memory", sparkParam[0])
                .setConf("spark.executor.memory", sparkParam[1])
                .setConf("spark.executor.cores", sparkParam[2])
                .setConf("spark.driver.cores",sparkParam[3])
                .setAppResource(submitAppResource)
                .setMainClass("submit.SubmitMain")//程序主入口
                .setDeployMode("cluster")
                .addAppArgs(taskType_files)
                .startApplication(new SparkAppHandle.Listener(){
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        if (handle.getState().isFinal()){
                            countDownLatch.countDown();
                        }
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                    }
                });
        countDownLatch.await();
        long end = System.currentTimeMillis();
        log.info("{} summary finished,and used time:{} ms",taskType_files_list.get(0),end - start);
    }
}
  1. countDownLatch.await(); 这行代码会一直阻塞代码,直到countDownLatch 的值减到0结束阻塞。这样做是为了防止spark任务在exector上未运行结束,但是driver代码已经停止导致的任务异常结束。
  2. 使用java代码调度时尽量不用使用springboot框架,因为spark任务是从master上发送任务到executor上,在executor上不能创建spring容器。
  3. 由于spark依赖都是使用线上的spark环境,所以在启动程序时需要加上 SparkLauncher 类,因此需要将spark-launcher_2.10-1.6.2.jar 一起作为依赖包提交执行。
上一篇 下一篇

猜你喜欢

热点阅读