spark中rdd的Transformation API(一)
2020-02-19 本文已影响0人
小草莓子桑
上一篇已经简单说过了RDD。今天开始,我们来了解一下RDD的Transformation算子的各个API
模拟一个场景,来讲各个Transformation的API
在这里我们模拟一个统计网站页面浏览情况的需求。
业务场景
我们这里使用网站浏览日志作为业务场景,就用简单的三个字段页面埋点id、用户id、停留时长
业务场景
日志格式
假定网站浏览日志以HDFS形式通过我们的系统埋点到我们数据仓库中了,埋点日志的数据格式如下:
日志格式
假定我们代码中已经使用这样的格式,把日志记录到服务器上,并通过flume上传到了HDFS服务器上
页面id|用户id|停留时长
index|2|6
表示:用户id为2的用户访问了index页面,并停留了6秒钟
map(func)
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
直接上代码
package com.edu.spark.rdd.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: .
* @author: XiaoCaoMei .
* @createdTime: 2019/12/21 16:41.
* @version: 1.0 .
*/
object LogProcess {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 如果这个参数不设置,默认认为你运行的是集群模式
* 如果设置成local代表运行的是local模式
*/
conf.setMaster("local")
//设置任务名
conf.setAppName("EduSpark")
//创建SparkCore的程序入口
val sc = new SparkContext(conf)
//读取文件 生成RDD
val file : RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
println(file.collect().toBuffer)
//把每一行数据按照 | 分割
val traceRdd : RDD[(String, String)] = file.map(s => {
//把每一行数据按照 | 分割
//注意这里 | 需要使用转义字符
var sArray: Array[String] = s.split("\\|")
//split函数的作用是 通过|分隔字符串返回数组
//埋点页面 用户id
(sArray(0), sArray(1))
})
println(traceRdd.collect().toBuffer)
sc.stop()
}
}
先来看结果
第一处日志打印
第一处打印
第二处日志打印
第二处打印
我们给map算子传入的函数就是把每一行按|分隔,返回分隔后数组 0 ,1索引上的元素,也就是页面id与用户id。所以第二处,我们得到的rdd就相当于一个 key-value 键值对的一个数据集。
flatMap(func)
flatmap是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
直接上代码
package com.edu.spark.rdd.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: .
* @author: XiaoCaoMei .
* @createdTime: 2019/12/21 16:41.
* @version: 1.0 .
*/
object LogProcess {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 如果这个参数不设置,默认认为你运行的是集群模式
* 如果设置成local代表运行的是local模式
*/
conf.setMaster("local")
//设置任务名
conf.setAppName("EduSpark")
//创建SparkCore的程序入口
val sc = new SparkContext(conf)
//读取文件 生成RDD
val file: RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
println(file.collect().toBuffer)
//把每一行数据按照 | 分割
val traceRdd1: RDD[String] = file.flatMap(_.split("\\|"))
println(traceRdd1.collect().toBuffer)
sc.stop()
}
}
先来看结果
第二处打印
第二处打印
看出,我们给flatMap算子传入的函数是把我们每一行文件按|分隔,返回分隔后数组所有索引上的元素,也就是页面id与用户id与停留时长。所以第二处,我们得到的rdd就相当于一个是每一行数据中的三个元素聚合到一起的序列。
说说map与flatmap的区别吧
- map(func)将原RDD的每个元素使用func进行格式化,返回一个新的RDD
- flatmap(func) 跟map(func)类似,但是每个输入项会成为0个或多个输出项(所以func函数应该返回的是一个序列化的数据而不是单个数据项)
- 怎么理解,map对集合中每个元素进行操作,最后得到的RDD会与原始RDD的长度一致
- 而flatmap对集合中每个元素进行操作然后再扁平化,最后得到的RDD与原始RDD长度不一致
- 如上图的结果,map只是对于原始日志文件的每一行,做了映射,把原始文件映射成了一个页面id与用户id的对象序列。
- 而flatmap,确实把每一行文件按|分隔后,把三个元素都加入到了一个序列中。
- 所以flatmap在词频统计方面有很大优势。
Transformation API中的map与flatmap就说到这了,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!