Spark core RDD API

2019-03-19  本文已影响0人  王龙江_3c83

1. RDD 的概述

1.1 RDD 的优势

1.2 RDD 的方法

函数 功能
compute(split: Partition, context: TaskContext): Iterator[T] 一个计算每个分区数据的函数
getPartitions: Array[Partition] 一个分区列表,用于并行计算
getDependencies: Seq[Dependency[_]] = deps 一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错)
getPreferredLocations(split: Partition): Seq[String] = Nil 分区数据的优先存储地址。
val partitioner: Option[Partitioner] = None RDD 是如何分区的,比如某个rdd是通过hash partitioner得到的

1.3 常见类型的 RDD

1.3.1 MapPartitionsRDD

1.3.2 ParallelCollectionRDD

1.3.3 ShuffleRDD

2. RDD 的创建

函数 功能
sc.textFile(path) 从文件系统中(本地文件系统或 HDFS ),得到行数据的 HadoopRDD。
sc.sequenceFile[KeyClass,ValueClass](path) 加载 HDFS sequenceFile 文件。
sc.parallelize(Seq(1,2,3)) 从内存中已经存在的序列列表(Seq、List、Array)中,得到 ParallelCollectionRDD
sc.makeRDD(Seq(1,2)) parallelize 别名
sc.range(start,end) 创建区间为 [start,end) 的 MapPartitionsRDD

3. 依赖

3.1 窄依赖

父亲 RDD 的一个分区数据只能被子 RDD 的一个分区消费,子 RDD 的一个分区可以对应父 RDD 的多个分区。

3.2 宽依赖

父亲 RDD 的一个分区数据被子 RDD 的多个分区消费。

4. Partitioner

给这个RDD数据进行分区的分区器。

4.1 实现

4.2 分区优化

4.3 对比

HashPartitioner RangePartitioner
将可以排序的 key 分到几个大概相等的范围分区内的一个分区中。
不支持 Array 类型的 key。 不支持不能排序的 Key。
可能导致数据倾斜。 可以解决分区数据倾斜的问题。
分区后的数据不会排序。 分区后分区之间的数据是有序的。

5. 单类型 RDD 操作 API

5.1 Transformations

方法 功能
map(func) 接收函数,将函数应用到 RDD 中到每一个元素,返回新的 RDD。
mapPartition(func) 类似于 map,但独立地在RDD的每一个分片上运行,因此在类型为 T 的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]
flatMap 对每个输入元素,可以输出多个元素。
sample
filter 接收函数,返回只包含满足 filter 函数的元素的新 RDD。
distinct 去重

5.2 Actions

方法 功能
collect 返回 RDD 的所有元素。
count 计数。
countByValue 返回一个 Map,表示唯一元素出现的次数。
take 返回几个元素。
top 返回前几个元素。
takeOrdered 返回基于提供的排序算法前几个元素。
takeSample(withReplacement,num,[seed]) 取样
reduce 合并 RDD 中的元素
fold 与 reduce 相似,提供 zero value,rdd.flod(0)(+)
aggregate()
foreach() 遍历 rdd 中的每个元素。

6. key-value 类型 RDD 操作 API

6.1 key-value 类型 RDD 生成方法

6.2 KeyValue 对 RDD

函数 功能
combinerByKey
aggregateByKey aggregateByKey((0,0))(mergeValue,mergeCombiner)可以实现 value 值、和词频统计功能。
reduceByKey createCombiner 不对数据进行任何处理,mergeValue 和 mergeCombiner 调用传入的 reduce 函数。
distinct 基于 reduceByKey 实现,键值对都相同,则去重。reduce=((x,y)=>x)
foldByKey(n) createCombiner=mergeValue(n,value)
groupByKey createCombiner:元素转 ArrayBuffer 集合;mergeValue:将新元素添加进集合;mergeCombiner:集合合并。
sortByKey
sortBy

7. 二元操作 API

方法 功能
union 并集
intersection 交集
subtract 差集
方法 功能
persist(StorageLevel) 给 RDD 的 StorageLevel成员变量(默认为 None)赋值,存储级别:MEMORY_ONLY(默认)、DISK_ONLY、MEMORY_AND_DISK、OFF_HEAP。
cache 分布式缓存,等于 cache(StorageLevel.MEMORY_ONLY)。
unpersist 移除持久化数据。
ietrator 获得当前 RDD 的输出。
localCheckpoint() 本地磁盘文件,等于 cache(StorageLevel.MEMORY_AND_DISK)。
checkpoint() HDFS 文件系统。

8.集成 Spark SQL

8.1 集成步骤

8.1.1 SparkSession & SQLContext

    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder.config(conf).getOrCreate()
    import spark.implicits._
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

8.1.2 将外部数据映射为 RDD

    val fileRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
    // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = fileRDD.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).trim.toInt)
    })

8.1.3 使用 class 或 StructType 封装数据

val peopleRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
      .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
    // 将RDD 转换成 DataFrames
    val df = peopleRDD.toDF
val structType: StructType = StructType(
      //字段名,字段类型,是否可以为空
      StructField("name", StringType, true) ::
        StructField("age", IntegerType, true) :: Nil
    )
    /**
      * rows: java.util.List[Row],
      * schema: StructType
      * */
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)

8.1.4 使用 DataFrame 创建临时表

    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()

8.1.5 将 df 写入外部数据源

df.write.format("parquet").save("/Users/dreamaker/Downloads/data/people.parquet")

参考资料

上一篇 下一篇

猜你喜欢

热点阅读