Java Spark 简单示例(一)

2018-06-22  本文已影响0人  憨人Zoe

上一篇文章简单介绍了Spark的一些基本概念,看起来蛮抽象的,很多部分是摘自网络,有兴趣的朋友可以看看,传送门:初识Apache Spark(附示例)

我准备了两个入门示例,其实官网上也有,只是分开讲解的而已,我做了整合并在本机运行。

这两个例子非常简单,主要是区分两种rdd的创建方式。复杂的spark应用以后有机会再来分享(等我先学会了再说)。

Maven 引用:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

第一种:并行化集合。点此进入官网查看

上一篇文章的示例就是此种方式创建,这里我再用一个接近官网官方的例子展示一下

示例一:将一组数值求和

package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.Arrays;
import java.util.List;

public class demo{
    private static String appName = "spark.demo";
    private static String master = "local[*]";

    public static void main(String[] args) {
        JavaSparkContext sc = null;
        try {
            //初始化 JavaSparkContext
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            sc = new JavaSparkContext(conf);

            // 构造数据源
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

            //并行化创建rdd
            JavaRDD<Integer> rdd = sc.parallelize(data);

            //map && reduce
            Integer result = rdd.map(new Function<Integer, Integer>() {
                public Integer call(Integer integer) throws Exception {
                    return integer;
                }
            }).reduce(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer o, Integer o2) throws Exception {
                    return o + o2;
                }
            });

            System.out.println("执行结果:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (sc != null) {
                sc.close();
            }
        }
    }
}

执行结果

// 省略若干行
18/06/22 19:15:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (executor driver) (1/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 70 ms on localhost (executor driver) (2/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 72 ms on localhost (executor driver) (3/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 72 ms on localhost (executor driver) (4/4)
18/06/22 19:15:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/06/22 19:15:33 INFO DAGScheduler: ResultStage 0 (reduce at demo.java:37) finished in 0.256 s
18/06/22 19:15:33 INFO DAGScheduler: Job 0 finished: reduce at demo.java:37, took 0.319634 s
执行结果:15

第二种:外部数据集。点此进入官网查看

本例以textfile 为例

示例二:读取txt文件,计算包含【spark】的每一行字符长度之和

//test.txt 内容
spark demo
this is a spark demo file
hello world 
package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

public class demo2 {
    private static String appName = "spark.demo";
    private static String master = "local[*]";

    public static void main(String[] args) {
        JavaSparkContext sc = null;
        try {
            //初始化 JavaSparkContext
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            sc = new JavaSparkContext(conf);

            //从test.txt 构建rdd,test.txt 放在项目根目录下
            JavaRDD<String> rdd = sc.textFile("test.txt");

            //过滤
            rdd = rdd.filter(new Function<String, Boolean>() {
                public Boolean call(String s) throws Exception {
                    return s.contains("spark");
                }
            });

            //map && reduce
            Integer result = rdd.map(new Function<String, Integer>() {
                public Integer call(String s) throws Exception {
                    return s.length();
                }
            }).reduce(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });

            System.out.println("执行结果:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (sc != null) {
                sc.close();
            }
        }
    }
}

执行结果

//省略若干行
18/06/22 19:27:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 78 ms on localhost (executor driver) (1/2)
18/06/22 19:27:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 94 ms on localhost (executor driver) (2/2)
18/06/22 19:27:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/06/22 19:27:33 INFO DAGScheduler: ResultStage 0 (reduce at demo2.java:38) finished in 0.203 s
18/06/22 19:27:33 INFO DAGScheduler: Job 0 finished: reduce at demo2.java:38, took 0.243153 s
执行结果:35
上一篇 下一篇

猜你喜欢

热点阅读