RDD编程
2020-03-26 本文已影响0人
雨滴滴滴答答
RDD基础概念:
- RDD:弹性分布式数据集(Resilient Distributed Dataset),spark对数据的核心抽象。分布式的元素集合。
- RDD操作:创建、转化、调用,spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。
1. RDD基础
spark中的rdd就是一个不可变的分布式对象集合。每个RDD都被分成多个分区,这些分区运行在集群的不同节点上。
创建RDD的两种方法
- 读取一个外部的数据集(文件、MySQL、hive等)
- 在驱动器程序里分发驱动器程序中的对象集合(list和set)
RDD支持两种类型的操作
- 转化操作(transformation):由一个RDD生成一个新的RDD。filter
- 行动操作(action):对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部系统(如HDFS)中。会触发实际的计算。first、count
转化操作和行动操作的区别:spark计算RDD的方式不同。可以在任何时候定义新的RDD,但spark指挥惰性计算这些RDD。只有在第一个行动操作中用到时才会真正计算。可以避免计算过程中存储数据,消耗存储空间。spark在了解完整的转化操作链之后,可以只计算求结果时真正需要的数据。
以rdd.filter().first()方法为例, spark只需要扫描文件,直到找到第一个匹配的数据即可,不需要读整个文件。
spark缓存
spark 的rdd会在每次进行行动操作时重新计算。如果想在多个行动操作中重用一个RDD,可以使用RDD.persist() 让spark把这个RDD缓存下来。缓存方法有多种,可以把数据持久化到不同的地方。
在第一次对RDD持久化计算之后,spark会把RDD的内容保存到内存中(以分区的方式存储到集群中的各个机器上),这样就可以重用这些数据了。
spark大体流程
- 从外部数据创建出输入RDD
- 使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
- 中间结果缓存
- 使用行动操作出发一次并行计算,spark会对计算进行优化后再执行。
2. 创建RDD
- 基于已有集合,传给SparkContext.parallelize()方法。
JavaRDD<String> listRdd = javaSparkContext.parallelize(javaRDDList);
- 基于外部数据集。
JavaRDD<String> sourceInputRDD = javaSparkContext.textFile(inputFilePath);
//或者hive
spark.sql(hiveSql)
3. RDD操作
区分是转化操作还是行动操作,可以看方法的返回值:转化操作返回的是RDD, 而行动操作返回的是其他的数据类型。
转化出来的RDD是惰性求值的,只有再行动操作中用到这些RDD时才会被计算。
一个RDD经转化操作得到新的RDD1, RDD还可以用于其他操作。
spark使用谱系图(lineage graph)来记录不同RDD之间的依赖关系。spark需要用这些信息按需计算每个RDD, 也可以依靠谱系图再持久化的RDD丢失部分数据时回复所丢失的数据(有分析工具可以画吗?可以研究一下)。
行动操作:
要确保整个数据集合能在单台机器的内存中放得下才能使用。如collect, 不能再大规模数据集上使用
javaRdd.take(num),手机RDD的元素。
插播spark创建dataFrame
Dataset<Row> schemaInputRddDF = spark.createDataFrame(schemaInputRdd, OlapUserEventAgg.class);