spark

Spark Streaming 2.1.0 Programmin

2017-11-10  本文已影响345人  chenfh5

简单写一下自己读了Spark Streaming 2.1.0 Programming Guide之后的体验,也可以说是自己对该编程指南的理解与翻译。

https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html


Overview

Spark Streaming(下称streaming)是Spark core的拓展,一个易扩展、高吞吐、高容错的流式数据处理系统。

streaming-arch

streaming接收输入数据(kafka等)然后根据设置的处理时长batch interval将其切割为一个个的小数据集,然后对小数据集进行spark core/sql/mllib的操作,最后将处理后的小数据集输出。

streaming-flow

streaming具有一个高度抽象概念叫离散化的流(即DStream),代表了一块连续的数据流。

A DStream is represented as a sequence of RDDs.

A Quick Example

NetworkWordCount.scala

Basic Concepts

Linking

Initializing StreamingContext

Discretized Streams (DStreams)

DStream可以是来自于接收到的上游source(kafka),也可以是经过transformating转换后的DStream。

Input DStreams and Receivers

Input DStream通过Receiver接收上游source的数据,receiver负责将上游数据接住,同时将其保存在spark的内存系统中以供后续transformation处理。

streaming提供的两种内建源和自定义源:

如果streaming程序需要并行接收多个数据源,可以创建多个receiver。但是因为一个receiver是一个长期的任务伴随着streaming的开始和结束,所以其会始终占用一个core。所以,streaming程序要分配足够的core来接收数据(#receiver)和处理数据(#processer)。
注意:本地跑streaming程序,不要使用local或者local[1]。因为两种设置都是只分配一个core/thread给streaming程序,而该core会被receiver占用,但processer就没有额外的core来驱动,导致整个程序只接收数据,但是不能够处理数据。所以通常设置为local[n], n > #receiver

Receiver Reliability
根据是否能够发出acknowledgment(ack)到source来区分接收器的reliable/unreliable。

Transformation on DStreams

与RDD的transformation类似,是一种lazy操作。输入的DStream可以经过transformation转换成另一种DStream。

Transformation Meaning
map 作用于DStream里面的每一个元素
flatMap 先调用map,然后调用flatten展平
filter 符合filter条件的则保留
repartition 通过shuffle来修改并行度
union 合流,将多个DStream合并成一个DStream,多job合并可以提高并行度
reduce 所有元素及其中间结果逐一顺序执行,最后得到一个结果
countByValue 计算key[T]的frequency, DStream(T, Long)
reduceByKey 根据key分组,再对每个key的pairs应用reduce
join DStream(k1, v1) join DStream(k1, v2) = DStream(k1, (v1,v2))
cogroup DStream(k1, v1) join DStream(k1, v2) = DStream(k1, Seq[v1], Seq[v2])
updateStateByKey 记录状态的操作,需要initial state和定义state update function,需要开启checkpoint
transform 作用于DStream里面的每一个RDD
windows 基于窗宽的窗口函数
streaming-dstream-window

插入Spark Structured Streaming关于窗函数的使用

在流式处理中,有两个时间概念,

时序

上图time1, time2, time3是process time,图中方块中的数字代表这个event time。可能由于网络抖动导致部分机器的日志收集产生了延迟,在time3的batch中包含了event time为2的日志。kafka中不同partition的消息也是无序的,在实时处理过程中也就产生了两个问题,

  1. Streaming从kafka中拉取的一批数据里面可能包含多个event time的数据
  2. 同一event time的数据可能出现在多个batch interval中

Structured Streaming可以在实时数据上进行sql查询聚合,如查看不同设备的信号量的平均大小

avgSignalDf = eventsDF
              .groupby("deviceId")
              .avg("signal")

进一步地,如果不是在整个数据流上做聚合,而是想在时间窗口上聚合。如查看每过去5分钟的不同平均信号量,这里的5分钟时间指的是event time,而不是process time,

windowedAvgSignalDF1 = eventsDF
                       .groupBy("deviceId", window("eventTime", "5 minute"))
                       .count()
windowedAvgSignalDF1

更进一步要求,每5分钟统计过去10分钟内所有设备产生日志的条数,也是按照event time聚合,

windowedAvgSignalDF2 = eventsDF
                       .groupBy("deviceId", window("eventTime", "10 minute", "5 minute"))
                       .count()
windowedAvgSignalDF2

如果一条日志因为网络原因迟到了怎么办?Structured Streaming还是会将其统计到属于它的分组里面。

windowedAvgSignalDF3_delay

上面强大的有状态功能是通过Spark Sql内部维护一个高容错的中间状态存储,key-value pairs,key就是对应分组,value就是对应每次增量统计后的一个聚合结果。每次增量统计,就对应key-value的一个新版本,状态就从旧版本迁移到新版本,所以才认为是有状态的。

有状态的数据存储在内存中是不可靠的,spark sql内部使用write ahead log(WAL, 预写式日志),然后间断的进行checkpoint。如果系统在某个时间点上crash了,就从最近的checkpoint点恢复,再开始使用WAL进行重放replay。checkpoint的点更新了以后,才将WAL清空clean,然后重新累积WAL,再flush到checkpoint,再clean(类似于es的translog)。

WAL

当然,streaming的数据源是一个流,这个数据是无限的,为了资源和性能考虑,只能保存有限的状态。即落后多久以后的数据,即便来了,系统也不要了,watermarking概念就是用来定义这个等待时间。例如,如果系统最大延迟是10分钟,意味着event time落后process time 10分钟内的日志会被拿来使用;如果超出10分钟,该日志就会被丢弃。如现在process time = 12:33,那么12:23之前的key-value pair的状态就不会再有改变,也就可以不用维护其状态了。

windowedAvgSignalDF4 = eventsDF
                       .withWatermark("eventTime", "10 minutes")
                       .groupBy("deviceId", window("eventTime", "10 minute", "5 minute"))
                       .count()
windowedAvgSignalDF4_waterMark

x轴是process time,y轴是event time。然后有一条动态的水位线,如果在水位线下面的日志,Streaming系统就丢弃。



Output Operations on DStreams

将DStream推送至外部系统,db,hdfs。是action,会trigger the actual execution of all the DStream transformations

Output Operation Meaning
print 在driver端打印每个batch的前10个元素
saveAsTextFiles 保存DStream内容为文本文件
saveAsObjectFiles 保存DStream内容为序列化对象文件
saveAsHadoopFiles 保存为hdfs文件
foreachRDD 作用于DStream里面的所有RDD,需要里面包含RDD的action算子才会被执行

其中foreachRDD常用于写DStream内容到外部DB中,需要用到网络连接,示例如下,

errorExample
上面的是错误实例,因为connection产生在driver,但connection不能序列化到executor,所以connection.send(record)报错。 高消耗方式

上面是不推荐方式,因为需要为DStream里面的每一个元素都产生和销毁connection,而产生和销毁connection是昂贵的操作。

推荐方式1

上面的方式,为每个rdd的partition产生一个connection,该connection产生于executor,可以用于send数据。

更推荐方式
上面的方式,有别于推荐方式1,利用连接池概念,每一个batch interval都可以重复利用这些connection(后续的每个batch都会利用该连接池,而非后续batch一直new connection下去)。连接池要求懒加载和设置超时,具体可以参考这个stackoverflow answer

注意,

DataFrame and SQL Operations

DStream可以使用core、sql、mllib

MLlib Operations

DStream可以使用core、sql、mllib,eg. StreamingLinearRegressionWithSGD

Caching/ Persistence

DStream.persist()可以持久化DStream里面的每一个RDD。其中reduceByWindowreduceByKeyAndWindowupdateStateByKey是隐式带上持久化的,不需要显式调用persist()。

Checkpointing

为了解决24/7程序的容错问题,需要checkpoint(cp)两类数据,

  1. Metadata,包括configuration,DStream operations,Incomplete batches。一般用于driver的恢复。
  2. RDDs,将生成的rdd保存到cp点,为了减少rdd lineage链的长度,也便于快速恢复

需要开启cp的应用场景,

  1. driver需要自动恢复的场景
  2. 带状态转换算子(stateful transformations);需要组合多个batch的数据,如窗函数,stateUpdateFunc

如何开启cp,

  1. 设置cp目录(用于带状态转换算子)
  2. 设置functionToCreateContext(用于driver恢复)
cp_driver_recovery_func

cp的间隔时间需要谨慎设置,太频繁会影响性能;相反太久会导致lineage链和task size太大。dstream.checkpoint(checkpointInterval),一般是窗宽的5到10倍比较好。

Accumulators, Broadcast Variables, and Checkpoints

累加器和广播变量不能从cp中恢复,但是通过lazily instantiated singleton instances单例懒加载可以从cp中重新实例化。

Deploying Applications

Streaming应用的部署

Requirements

Upgrading Application Code

如果需要更新running状态的streaming程序的代码或者配置,

Monitoring Applications

  • Processing Time < Batch Interval 才算正常
  • Scheduling Delay 越小越好
monitor ui.png normal timer

Performance Tuning

  • 减少每个batch interval的Processing Time
  • 设置正确的batch size(每个batch interval的数据量大小)

Reducing the Batch Processing Times

Level of Parallelism in Data Receiving

Level of Parallelism in Data Processing

如果parallel task不足,那么core利用率不高。通过提高默认并行度来加速spark.default.parallelism,task数量也不宜过多,太多了,task的序列化与反序列化耗时也更高,适得其反。建议是#executors * #core_per_executor * 4

Data Serialization

Task Launching Overheads

任务数不宜过多,driver发送任务也需耗时。

Setting the Right Batch Interval

一般以5~10s为初始值,然后观察Streaming UIScheduling DelayProcessing time来调整。

Memory Tuning

内存用量与GC策略的调优,

注意事项


Fault-tolerance Semantics

容错语义

Background

RDD是不可变、明确可重复计算的、分布式的数据集合。每个RDD会记录其确定性的操作血统lineage,这个血统用于在容错的输入数据集上恢复该RDD。

为了spark内部产生的RDDs高容错,设置replication,然后将该RDDs及其副本分发到不同的executor上。如果产生crash,那么有两类数据恢复途径,

  1. 从副本恢复
  2. 没有副本的话,从数据源恢复,再根据lineage rebuild该RDD

这两类错误需要关注,

  1. executor failure,executor里面的in-memory数据会lost
  2. driver failure,SparkContext会lost,然后所有executors的in-memory数据也会lost

Definitions

  1. at most once, 最多被执行一次
  2. at least once, 至少被执行一次
  3. exactly once, 有且仅有被执行一次

Basic Semantics

每一个Streaming程序都可以分为三步,

  1. receiving the data
  2. transforming the data
  3. pushing out the data

如果一个系统要实现端到端的exactly once语义,那么上面三步的每一步都要保证是exactly once的。

Semantics of Received Data

  1. files
  2. reliable receiver, with ack
  3. unreliable receiver, without ack
  4. direct kafka api (1.3+),所有接收到的kafka数据都是exactly once的
    为了避免丢失过去接收过的数据,Spark引入了WAL,负责将接收到的数据保存到cp/log中,有了WAL和reliable receiver,我们可以做到零数据丢失和exactly once语义
fault tolerant

Semantics of output operations

output operation输出算子,如foreachRDD是at least once语义的,即同一份transformed数据在woker failure的情况下,可能会被多次写入外部DB系统,为了实现其exactly once语义,有以下做法,

  1. 幂等操作,如saveAs***Files将数据保存到hdfs中,可以容忍被写多次的,因为文件会被相同的数据覆盖?如果两个job同时写一份数据呢?(不能,因为job串行。如果是开启了speculation呢?)
  2. 事务性的更新,利用一个唯一标识来控制输出操作 val uniqueId = generateUniqueId(time.milliseconds, TaskContext.get.partitionId())
上一篇 下一篇

猜你喜欢

热点阅读