代码提交Spark任务

2019-12-04  本文已影响0人  Jorvi

Spark以Standalone模式运行,其他模式未测试


一、Spark统计任务

1.1 jar
hdfs:/home/mr/example/spark-example-1.0.jar
1.2 main class
org.learn.example.jobs.SparkJob

public class SparkJob implements Serializable {
    public static void main(String[] args) {
        String fullClassName = args[0];
        String[] strArr = fullClassName.split("\\.");
        String appName = strArr[strArr.length - 1];

        SparkSession session = SparkSession.builder().appName("SparkJob_" + appName).getOrCreate();

        try {
            Class clazz = Class.forName(fullClassName);
            Method method = clazz.getDeclaredMethod("run", SparkSession.class);
            method.invoke(clazz.newInstance(), session);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.3 传入main class的args

反射到统计任务

org.learn.example.jobs.WordCount

public class WordCount implements ISparkJob{
    @Override
    public void run(SparkSession session) {
        JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa bbb", "bbb ccc", "aaa"), Encoders.STRING()).javaRDD();

        javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                Iterator<String> iterator = Arrays.asList(line.split(" ")).iterator();
                return iterator;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).saveAsTextFile("/home/example/result");
    }
}

二、提交上面的统计任务到Spark集群

2.1 利用SparkSubmit.main提交
String[] args = {
        "--master", "spark://11.11.11.11:6066,22.22.22.22:6066",
        "--deploy-mode", "cluster",
        "--executor-memory", "1G",
        "--total-executor-cores", "2",
        "--class", "org.learn.example.jobs.SparkJob",
        "hdfs:/home/mr/example/spark-example-1.0.jar",
        "org.learn.example.jobs.WordCount"
};
SparkSubmit.main(args);
2.2 利用SparkLauncher提交
try {
    SparkLauncher launcher = new SparkLauncher()
            .setSparkHome("/home/spark")
            .setConf("spark.driver.memory", "1G")
            .setConf("spark.executor.memory", "1G")
            .setConf("spark.executor.cores", "1")
            .setConf("spark.cores.max", "2")
            .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
            .setDeployMode("cluster")
            .redirectOutput(new File("/home/example/logs/launch.log"))
            .setAppResource("hdfs:/home/mr/example/spark-example-1.0.jar")
            .setMainClass("org.learn.example.jobs.SparkJob")
            .addAppArgs("org.learn.example.jobs.WordCount");

    SparkAppHandle appHandle = launcher.startApplication(new SparkAppHandle.Listener() {
        @Override
        public void stateChanged(SparkAppHandle handle) {
        }
        @Override
        public void infoChanged(SparkAppHandle handle) {       
        }
    });
 } catch (Exception e) {
    e.printStackTrace();
 }
2.3 利用RestSubmissionClient提交

可获取提交结果

try {
    String appResource = "hdfs:/home/mr/example/spark-example-1.0.jar";
    String mainClass = "org.learn.example.jobs.SparkJob";
    String[] args = {
            "org.learn.example.jobs.WordCount"
    };
    SparkConf sparkConf = new SparkConf()
            .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
            .set("spark.executor.cores", "1")
            .set("spark.submit.deployMode", "cluster")
            .set("spark.executor.memory", "1G")
            .set("spark.cores.max", "2")
            .set("spark.app.name", ""); // 在后面的统计任务中设置

    // 注意: 这里是 scala.collection.immutable.HashMap
    CreateSubmissionResponse response = (CreateSubmissionResponse) RestSubmissionClient.run(appResource, mainClass,
                    args, sparkConf, new HashMap<String, String>());

    logger.info("======> response: " + response.toJson());
} catch (Exception e) {
    e.printStackTrace();
}
上一篇 下一篇

猜你喜欢

热点阅读