Spark Streaming之DStream转换
DStream上的操作与RDD的类似,分为
转换
和输出
两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化操作
无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。
常规无状态转化操作
DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。
注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。
函数名称 | 目的 | Scala示例 | 函数签名 |
---|---|---|---|
map() | 对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream。 | ds.map(x=>x + 1) | f: (T) -> U |
flatMap() | 对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。 | ds.flatMap(x => x.split(" ")) | f: T -> Iterable[U] |
filter() | 返回由给定DStream中通过筛选的元素组成的DStream | ds.filter(x => x != 1) | f: T -> Boolean |
repartition() | 改变DStream的分区数 | ds.repartition(10) | N / A |
reduceByKey() | 将每个批次中键相同的记录规约。 | ds.reduceByKey( (x, y) => x + y) | f: T, T -> T |
groupByKey() | 将每个批次中的记录根据键分组。 | ds.groupByKey() | N / A |
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD
批次
组成,且无状态转化操作是分别应用到每个RDD批次
上的。
Transform
通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。
还是通过一个程序来演示Transform
具体的使用吧
需求:
从mysql中读取数据,对数据进行操作
- 环境准备
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
- 自定
Receiver
读取数据库资源
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}
import com.mysql.jdbc.Driver
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
import scala.collection.mutable.ArrayBuffer
class MysqlReceiver(val sql:String) extends Receiver[ArrayBuffer[Any]](StorageLevel.MEMORY_ONLY){
// 建立连接
var conn: Connection=null
//
var statement: PreparedStatement=null
// receiver 启动时执行
override def onStart(): Unit = {
// 注册驱动
classOf[Driver]
// url
val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
// 账号
val username="root"
// m密码
val password="123321"
//建立连接
conn = DriverManager.getConnection(url, username, password)
// 获取 prepareStatement
statement = conn.prepareStatement(sql)
// 只需要执行查询sql
val rs: ResultSet = statement.executeQuery()
val metaData: ResultSetMetaData = rs.getMetaData
while (rs.next()) {
val array = ArrayBuffer[Any]()
for(i <- 1 to metaData.getColumnCount){
val columnName: String = metaData.getColumnName(i)
val className: String = metaData.getColumnClassName(i)
val value: Any = className match {
case "java.lang.Long" => rs.getLong(i)
case _ => rs.getString(i)
}
array.+=(value)
}
// 存储
store(array)
}
}
// receiver 关闭时执行
override def onStop(): Unit = {
import java.sql.ResultSetMetaData
// 关闭资源
if(statement!=null){
statement.close()
}
if(conn!=null){
conn.close()
}
}
}
- 编写
main
程序
获取用户的邮件长度
def main(args: Array[String]): Unit = {
// 创建 StreamingContext
val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 准备sql
val sql="select * from user_info"
// 使用自定义Receiver
val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))
value.transform(rdd=>{
// 统计邮箱长度
rdd.map(e=>{
// 邮箱
val email: Any = e(6)
// 姓名
val name: Any = e(4)
(name,email.toString.length)
})
}).print()
// 启动
ssc.start()
// 等待
ssc.awaitTermination()
}
运行测试
-------------------------------------------
Time: 1625961725000 ms
-------------------------------------------
(孟河,16)
(令狐岚艺,20)
(邬羽,16)
(公孙昭,16)
(许超浩,19)
(茅馥,13)
(汤卿聪,18)
(钱固,19)
(汪云莲,18)
(司徒艺,18)
有状态转化操作
说有状态之前,修改一下上面的程序,统计男女人数。
设计到部分代码改动,大部分都是一样。
目前数据量比较少,只有一百条,为了不让程序马上跑完,读取一行数据时休眠2秒
。
Thread.sleep(2000)
while (rs.next()) {
val array = ArrayBuffer[Any]()
for(i <- 1 to metaData.getColumnCount){
val columnName: String = metaData.getColumnName(i)
val className: String = metaData.getColumnClassName(i)
val value: Any = className match {
case "java.lang.Long" => rs.getLong(i)
case _ => rs.getString(i)
}
array.+=(value)
}
// 存储
store(array)
// 休眠2秒
Thread.sleep(2000)
}
统计当前RDD 男女人数
value.transform(rdd=>{
// 统计邮箱长度
val value1: RDD[String] = rdd.map(e => {
// 性别
val gender: Any = e(10)
gender.toString
})
// 统计当前RDD男女人数
value1.groupBy(e => e).map(e=>{
(e._1,e._2.toList.size)
})
}).print()
运行结果
-------------------------------------------
Time: 1625963620000 ms
-------------------------------------------
(F,1)
(M,1)
-------------------------------------------
Time: 1625963625000 ms
-------------------------------------------
(F,2)
-------------------------------------------
Time: 1625963630000 ms
-------------------------------------------
(F,2)
(M,1)
-------------------------------------------
Time: 1625963635000 ms
-------------------------------------------
(F,1)
(M,1)
列出一部分,有没有发现无状态转化
每次只能统计当前批次
的数据。假设数据量比较大肯定需要一个批次一个批次的统计,然后对每个批次的数据进行汇总。
如何获取上一个批次数据呢?这里就需要使用有状态转化
。
- UpdateStateByKey
updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。
updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。
我们知道RDD是不存储数据的,既然要获取上一个批次数据,那么肯定需要将数据进行持久化,如何进行持久化?这里就需要配置checkpoint
,记录上一次的结果。
如果不配置,将会抛出如下异常错误
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
更改一点点之前的程序
def main(args: Array[String]): Unit = {
// 创建 StreamingContext
val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 持久化,保存上一个批次数据
ssc.checkpoint("output//updateStateByKey")
// 准备sql
val sql="select * from user_info"
// 使用自定义Receiver
val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))
// 获取性别
val gender: DStream[(String, Int)] = value.map(e => {
// 性别
(e(10).toString,1)
})
// currentValue 当前批次 value 值
// nextValues 上一个批次 value 值
val fun=(currentValue:Seq[Int],nextValues:Option[Int])=>{
Some(currentValue.sum+nextValues.getOrElse(0))
}
// 聚合
val result: DStream[(String, Int)] = gender.updateStateByKey(fun)
// 输出结果
result.print()
// 启动
ssc.start()
// 等待
ssc.awaitTermination()
}
输出结果
-------------------------------------------
Time: 1625965715000 ms
-------------------------------------------
(M,1)
-------------------------------------------
Time: 1625965720000 ms
-------------------------------------------
(M,1)
(F,2)
-------------------------------------------
Time: 1625965725000 ms
-------------------------------------------
(M,2)
(F,4)
21/07/11 09:08:46 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
-------------------------------------------
Time: 1625965730000 ms
-------------------------------------------
(M,3)
(F,5)
-------------------------------------------
Time: 1625965735000 ms
-------------------------------------------
(M,3)
(F,8)
这样也许看不太出来每次的变化,我们把当前批次的数据结果也打印出来。
改这部分代码即可
// 获取性别
val gender: DStream[(String, Int)] = value.map(e => {
println("当前批次数据结果:",(e(10).toString,1))
// 性别
(e(10).toString,1)
})
运行测试
-------------------------------------------
Time: 1625966020000 ms
-------------------------------------------
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
-------------------------------------------
Time: 1625966025000 ms
-------------------------------------------
(M,1)
(F,2)
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
-------------------------------------------
Time: 1625966030000 ms
-------------------------------------------
(M,2)
(F,3)
21/07/11 09:13:53 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
(当前批次数据结果:,(F,1))
-------------------------------------------
Time: 1625966035000 ms
-------------------------------------------
(M,3)
(F,5)
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(F,1))
-------------------------------------------
Time: 1625966040000 ms
-------------------------------------------
(M,3)
(F,7)
这样就可以很方便看出每个批次的数据累计情况。
功能也看到了,那么就该具体的了。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
updateStateByKey
有很多重载方法,其他的还不怎么熟悉,就说说这一个吧。
updateFunc
:需要我们指定函数,并且有两个参数,分别类型为(Seq[V],Option[S])返回的类型也需要为Option[S]
第一个参数,Seq[V] :表示当前批次的所有key对应的value数据,
第二个参数,Option[S]:表示上一个批次的数据,为什么是Option
呢?那是因为上一个批次的数据可能还没有。
// currentValue 当前批次 value 值
// nextValues 上一个批次 value 值
val fun=(currentValue:Seq[Int],nextValues:Option[Int])=>{
Some(currentValue.sum+nextValues.getOrElse(0))
}
// 聚合
val result: DStream[(String, Int)] = gender.updateStateByKey(fun)
- Window
updateStateByKey
用于统计全局数据,我们还是拿上一个案例(统计男女人数)为例,假设我的数据量越来越大,不停的有人向我网站注册会员,我需要统计10分钟,一个小时,男女注册人数,只需要统计这一个时间段的数据。
需求明白了之后,如何实现了?此时就需要使用到了WindowOperations
函数。
理解WindowOperations
函数,就需要先理解 scala中的sliding
(滑窗函数)
val list=List(1,2,3,4,5,6,7,8,9,10)
滑窗
list,sliding(窗口大小,步长)
例如
list,sliding(2,1)
结果
List(1,2)
List(2,3)
List(3,4)
List(4,5)
List(5,6)
List(7,8)
List(8,9)
List(9,10)
sliding
滑动的是数据,WindowOperations
滑动的就是时间(Streaming 是按照时间拉取一个批次数据)。
感觉有点抽象?用案例说话吧。
def main(args: Array[String]): Unit = {
// 创建 StreamingContext
val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 持久化,保存上一个批次数据
ssc.checkpoint("output//updateStateByKey")
// 准备sql
val sql="select * from user_info"
// 使用自定义Receiver
val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))
// 获取性别
val gender: DStream[(String, Int)] = value.map(e => {
println("统计当前批次的数据:",(e(10).toString,1))
// 性别
(e(10).toString,1)
})
// 窗口
val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))
// 输出结果
windowValue.print()
// 聚合当前窗口的数据
val result: DStream[(String, Int)] = windowValue.reduceByKey(_ + _)
result.print()
println("*"*50)
// 启动
ssc.start()
// 等待
ssc.awaitTermination()
}
设置滑窗
val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))
Seconds(10):窗口时长:计算内容的时间范围;
Seconds(5):滑动步长:隔多久触发一次计算。
之前设置的,五秒拉取一次。
val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))
我想了半天,确实不知道怎么表达
看看这个数据,1~10;可以这么理解,集合中的每一个元素相当于存储5秒一个批次的数据,
1:第一个批次数据,2:表示第二个批次数据,以此类推。
val list=List(1,2,3,4,5,6,7,8,9,10)
设置的创建为Seconds(10)
也就是10秒,是不是相当窗口大小为2,也就是两个批次的数据。
设置步长为Seconds(5)
,也就是每次滑动一个元素的数据,也就是滑动一个一批次的数据。
最终的结果是不是应该也是这样,只是这里应该为数字,而是一个批次的数据。
List(1,2)
List(2,3)
List(3,4)
List(4,5)
List(5,6)
List(7,8)
List(8,9)
List(9,10)
我们再来看看数据结果
**************************************************
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625969490000 ms
-------------------------------------------
(M,1)
-------------------------------------------
Time: 1625969490000 ms
-------------------------------------------
(M,1)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969495000 ms
-------------------------------------------
(M,1)
(F,1)
(F,1)
(F,1)
-------------------------------------------
Time: 1625969495000 ms
-------------------------------------------
(M,1)
(F,3)
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969500000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(M,1)
(F,1)
-------------------------------------------
Time: 1625969500000 ms
-------------------------------------------
(M,1)
(F,4)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969505000 ms
-------------------------------------------
(M,1)
(F,1)
(F,1)
(M,1)
(F,1)
-------------------------------------------
Time: 1625969505000 ms
-------------------------------------------
(M,2)
(F,3)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969510000 ms
-------------------------------------------
(F,1)
(M,1)
(F,1)
(F,1)
(F,1)
-------------------------------------------
Time: 1625969510000 ms
-------------------------------------------
(M,1)
(F,4)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625969515000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(F,1)
(M,1)
-------------------------------------------
Time: 1625969515000 ms
-------------------------------------------
(M,1)
(F,4)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969520000 ms
-------------------------------------------
(F,1)
(F,1)
(M,1)
(F,1)
(F,1)
-------------------------------------------
Time: 1625969520000 ms
-------------------------------------------
(M,1)
(F,4)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969525000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(M,1)
(F,1)
-------------------------------------------
Time: 1625969525000 ms
-------------------------------------------
(M,1)
(F,4)
因为我不仅打印了每个窗口的数据,还打印了每个窗口统计后的结果
数据
滑窗
微信图片_20210711104114.png
结果
结果
图片有点模糊,我也很无奈呀,就是想说数据的结果就是通过滑窗的方式获取的,
注意:
滑动时间、窗口时间必须为批次时间的整数倍。
- reduceByKeyAndWindow
向上面这种操作
// 窗口
val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))
// 聚合当前窗口的数据
val result: DStream[(String, Int)] = windowValue.reduceByKey(_ + _)
还可以进行简写,这里就需要使用到reduceByKeyAndWindow
,对窗口进行聚合。效果和上面两行代码一样
def main(args: Array[String]): Unit = {
// 创建 StreamingContext
val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 持久化,保存上一个批次数据
ssc.checkpoint("output//updateStateByKey")
// 准备sql
val sql="select * from user_info"
// 使用自定义Receiver
val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))
// 获取性别
val gender: DStream[(String, Int)] = value.map(e => {
println("统计当前批次的数据:",(e(10).toString,1))
// 性别
(e(10).toString,1)
})
// 窗口
val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => agg + curr, windowDuration=Seconds(10), Seconds(5))
// 输出结果
result.print()
println("*"*50)
// 启动
ssc.start()
// 等待
ssc.awaitTermination()
}
运行:
**************************************************
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625973660000 ms
-------------------------------------------
(M,1)
(F,2)
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625973665000 ms
-------------------------------------------
(M,2)
(F,3)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625973670000 ms
-------------------------------------------
(M,2)
(F,3)
注意:
windowDuration 不能简写,具体是什么原因不太清除,可能是编译器的问题。
// 窗口
val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => agg + curr, windowDuration=Seconds(10), Seconds(5))
reduceByKeyAndWindow
还有一个重载方法
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
)
}
需要指定两个函数
invReduceFunc
:逆归约函数;
我现在需要统计用户注册人数,假设1秒钟拉取一次,我需要统计一分钟的数据(窗口应该60),每一秒统计一次。

若按原来的做法,每次都需要从头开始(
1~60
)开始结算,但是2~59的数据始终不会变的,变得永远都第一个和最后一个。那么。
逆归约函数的思想简单点说,就是丢弃第一个,加上新加入的。中间的永远不变,这样的好处就是减少重复计算。
def main(args: Array[String]): Unit = {
// 创建 StreamingContext
val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 持久化,保存上一个批次数据
ssc.checkpoint("output//updateStateByKey")
// 准备sql
val sql="select * from user_info"
// 使用自定义Receiver
val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))
// 获取性别
val gender: DStream[(String, Int)] = value.map(e => {
println("统计当前批次的数据:",(e(10).toString,1))
// 性别
(e(10).toString,1)
})
// 窗口
val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => {
println(s"累加滑入批次数据: ${agg} ${curr}")
agg + curr
}, (agg1, curr1) => {
println(s"减去滑出的批次数据: ${agg1} ${curr1}")
agg1 - curr1
}, Seconds(15), Seconds(5))
// 输出结果
result.print()
println("*"*50)
// 启动
ssc.start()
// 等待
ssc.awaitTermination()
}
结果
**************************************************
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625975440000 ms
-------------------------------------------
(M,1)
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
累加滑入批次数据: 1 1
-------------------------------------------
Time: 1625975445000 ms
-------------------------------------------
(M,1)
(F,2)
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
累加滑入批次数据: 1 1
累加滑入批次数据: 1 1
累加滑入批次数据: 2 2
-------------------------------------------
Time: 1625975450000 ms
-------------------------------------------
(M,2)
(F,4)