Spark Structured Streaming 2.4 踩

2019-03-06  本文已影响0人  华安火车迷

最近参与一个公司大数据项目开始入坑Spark,Spark从2.0开始从RDD 的底层API转向了面向Dataset/Dataframe 的高级API,Spark Streaming 也换成了Structured Streaming,而我们用的是2.4,带上了watermark 功能,对流媒体的处理算是圆满了。

如果希望从框架结构方面了解1.x 和2.x 的差异,我比较推荐下面这个腾讯团队写的wiki

CoolplaySpark

另外下面这篇总结对于想了解2.4 特性也是非常不错

《Spark Structured Streaming》 官方文档解读

而从实际的项目来说,实践了一段时间,还是遇到了几个坑,这里逐一分享

dropDuplicates 算子

这里我就不累赘举例这个算子怎么用了,西面直接说说用这个算子的话小心3点

1) 如果你的项目是用了window机制的话,那么去重的语义一般可能就是说在同一个window下去重,这个语义是比较合理的,那么如果你是需要在同一个window下对数据去重的话,参数是提供window 列就可以了,注意有些人是提供 $timestamp 列是不太对的,但是还有一点要注意,调用这个算子并不是每次只操作一个window,而是hold在你内存的所有消息,因此自己去衡量一下性能。我最终没有用这个算子主要是因为下面这个点

2)dropDuplicates目前只能做keepFirst,而我们的业务需要可能需要做的是keepLask,目前我没有发现有办法可以实现keepLask。

3)看它源码可以知道,其实里面就是做了一次的repartition + reduceByKey, 这里最最最要命的就是它需要shuffle,而我们的业务其实在每个kafka 的partition已经做过一次了,这样的话其实每个分区自己来做去重就好了,而Spark 又没有诸如 dropDuplicatesWithinPartition() 的方法,因此该算子满足不了需求,目前我还没找到很完美的方法,配合用Apache Collection API 基本可以最少限度的破坏数据流处理的语义,可以参考下面的代码

                todayDs.mapPartitions(it -> {
                    Set<Long> gidSet = new HashSet<>(3000);
                    return IteratorUtils.filteredIterator(it,
                            item -> {
                                if(gidSet.contains(item.getGlobalId())) {
                                    return false;
                                } else {
                                    gidSet.add(item.getGlobalId());
                                    return true;
                                }
                            });
                },
                ......

在Structured Streaming with window watermark 下如何提交consumed Kafka 的offset

可以说这个是我遇到最头疼的问题,google了非常多的材料,并且也咨询过一些大厂玩 spark streaming的团队,也没有一个比较好的方案,可能也是由于我们业务本身的特点,无法提供到一个幂等的sink() 语义,因此就使得很难找到一个合理的点去submit Kafka的 offset。

从Spark 2.x Structured Streaming 提供了一些API,可以帮助用户实现这种 end-to-end exactly-once fault-tolerance guarantees 的语义,这个可以参考一下SSS的官方文档,上面有些介绍,而我大概总结了一下,可以这样去理解:

  1. check point 机制,我理解这个应该是一个不down机的机制,当需要去替换一些 worker 或者rolling restart 一些worker的时候,Spark应该是可以从内部的一些checkpoint 恢复出当前工作的taskId或者说是batchId,从而可以恢复之前中断的作业,因此这个状态下是不需要考虑作业的中断或者重哪里开始重新消费的问题

  2. 而有一些场景比如我们发版时往往是需要整个集群restart的时候,就需要一个机制来界定上次消费的offset了,这个问题在下面这个 Stack Overflow 是讨论的比较全面的,而结论很不幸是无法达到我想要的目的

How to get Kafka offsets for structured query for manual and reliable offset management?

其中一个解决方案是采用2.x 提供的Listener机制,在processing 的时候自行去commit 一下offset 到外部持久化地方

这里有两个致命的问题

我们一般的预期是从管道的左边往右边顺序消费的,但是watermark破坏了这个规则,假设当前window正在清洗蓝色框的数据,那么我们期望应该是有个机制来标记 j,k 的offset,那么下次就算有问题,我们从这里开始消费就可以了,然而假设现在来了一个迟到的数据m,那么其实它是会归并到了蓝色的window下一起来洗数据的,而这时你很可能记录的是m的下标,那么这时如果是down机的话我们就会丢失了 j 到 m 中间的所有数据了

因此这个问题我想到最终要实现的话必须要满足2点

如果大家还有什么可以实现的方案麻烦告知

foreachBatch 是单线程的

这里我们重新温习一下 Structured Steaming的整个流程

那么foreachBatch 其实是把所有的partition的结果集再汇总回去driver,由driver来处理,而我们项目由于sink的数据比较大,并且是非常重的DB操作,因此导致最后这个阶段非常耗时,上到几十秒都有,而这个东西又阻塞了下一次JOB 的启动,因此就造成了我们的JOB的调度频率非常低下


最后就是抛弃了这个方法,仍然采用 foreach 算子,在里面汇总了一批 partition的数据一个批次提交,实测性能提升了好几倍。

最后

这些在Spark UI 上可视化信息都非常直观,照着来优化就可以了。

最后关于 Spark 2 非常值得推荐的一本书就是 《Spark - The Definitive Guide - Big data processing made simple》 可惜现在只有英文版,看起来非常费劲,希望早日有大虾翻译成中文。
上一篇 下一篇

猜你喜欢

热点阅读