spark学习笔记-RDD(win+java)

2020-03-09  本文已影响0人  Legents

1、def:

​ RDD,弹性分布式数据集(Resilient Distributed Datasets),表示一个只读、分区且不变的数据集合,是spark应用中最核心的部分。spark处理数据时会将一整块数据分割成由多个分块数据组成的数据集(RDD),然后找到多于数据集分块个数的执行器进行数据处理,最终将计算的结果进行汇总。

主要优势 :RDD中的批量错做会根据数据存放的位置来调度任务;对于扫描类型的操作,如果内存不足以缓存整个RDD,就进行部分缓存,避免内存溢出。

2、创建

1)通过已存在的并行集合创建(调用SparkContext的parallelize方法将一个已存在的集合变成RDD):

        //初始化
        private static JavaSparkContext sc;
        //初始化本地配置
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDDDemo");
        //初始化sparkContext对象
        sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");
        // 原始数据转换成RDD
        List<Integer> list1 = Arrays.asList(5, 4, 3, 2, 1, 5);
        JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        List<Integer> list2 = Arrays.asList(3,4,5,6,7,8);
        JavaRDD<Integer> rdd2 = sc.parallelize(list2);
        System.out.println("rdd1原始数据:" + rdd1.collect());
        System.out.println("rdd2原始数据:" + rdd2.collect());

运行结果:

rdd1原始数据:[5, 4, 3, 2, 1, 5]
rdd2原始数据:[3, 4, 5, 6, 7, 8]

​ parallelize方法还有一个参数是分区(Partitions)数量,它可以用来指定数据集的分区个数,例如上述代码中sc.parallelize(list)可以写成sc.parallelize(list,10),其中10就是数据集分区的个数。集群中的每一个分区对应一个spark任务,每一个cpu计算2-4个分区时较好,若不设置spark就会根据集群的 情况来自动设置分区数量,一般默认与cpu核心数相同。

2)从外部数据集(Dataset)创建

​ spark可以从本地文件系统、文本文件、sequenceFiles、HDFS、Cassandra、HBase、Amazon S3以及Hadoop所支持的任何存储源中创建RDD。通过SparkContext的textFile方法将数据源文件转换成RDD,此方法的参数为文件地址。转换后的数据将会以行集合的方式进行存储,例如:

 JavaRDD<String> rdd = sc.textFile("wordCount");
 System.out.println("原始数据:" + rdd.collect());

运行结果:

原始数据:[here, where, my, your, hello, world, test, file, jump, you, can, you, jump]

注:当使用本地文件系统进行读取操作转换时,必须保证所有工作节点在相同路径下能够访问该文件,可以将文件复制到所有工作节点的相同目录下,或者使用共享文件系统。

3、操作

  1. 转换(transformations):在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作规程,所有的计算都发生在actions环节。

    • map转换

      依次取出RDD中的每一个元素,传给表达式进行转换,返回转换后的结果。

         /**
           * 对每个元素进行操作(+10),返回一个新的RDD
           */
          public static void map(JavaRDD<Integer> rdd) {
              System.out.println("RDD每个元素加10:" + rdd.map(v -> v + 10).collect());
          }System.out.println("RDD每个元素乘10:" + rdd.map(v -> v + 10).collect());
      

      运行结果:

      RDD每个元素加10:[15, 14, 13, 12, 11, 15]
      
  1. 动作(actions):执行记录的所有transformations操作并计算结果,结果可返回到driver程序,也可保存到相关存储系统中。

PS :lambda表达式不支持解决方法:

case1:maven配置文件setting.xml中JDK改为8以上:

    <profile>
      <id>jdk-1.8</id>

      <activation>
        <activeByDefault>true</activeByDefault>
        <jdk>1.8</jdk>
      </activation>

       <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    </properties>

      <repositories>
        <repository>
          <id>jdk18</id>
          <name>Repository for JDK 1.8 builds</name>
          <url>http://www.myhost.com/maven/jdk18</url>
          <layout>default</layout>
          <snapshotPolicy>always</snapshotPolicy>
        </repository>
      </repositories>
    </profile>

case2:

file --> Project Structure -->modules

将language level改为8以上

apply-->ok

感谢:

https://www.cnblogs.com/diaozhaojian/p/9152530.html

https://www.jianshu.com/p/d573573dd97f

https://www.cnblogs.com/dhrwawa/p/10981167.html

https://blog.csdn.net/wolf2s/article/details/78958275?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

上一篇 下一篇

猜你喜欢

热点阅读