函数式内功心法-08: 流式复合技术之Conduit海纳百川
流式计算是我非常喜欢的技术。无流式,不快活!ALL in streaming!
所以,这次会废话多一点,嘿嘿。
早期学习spark跟storm时候,决然选择clojure发展下去,就是看到了流式引擎未来的辉煌。虽然目前spark在机器学习推动下如日中天,storm随着创始人自立门户成为弃子,但是流式浪潮必将掀起腥风血雨,浴火重生。
我已经深深地感觉到,流式引擎始于数据世界,亦将辉煌于云计算时代。流式技术是一剂能从根本上解决读写分离的良药,能一统计算引擎与查询引擎, 在各个发展方向中不断壮大,最终九九归一,笑傲江湖!
流式引擎应用在哪里?
流式引擎无处不在。从性质上主要分为事件流与数据流。
事件流较为简单,一般相互独立,数据流则复杂一些,亦可以将事件消息作为数据处理,即将事件流作为数据流来玩。在涉及状态管理的情况下,事件与数据流引擎基本无差异。
简单介绍一下各个方面的应用:
- 文件系统(ceph)
a. 从简单的文件系统来说, inotify接口就是一个事件流
b. 有了事件流filebeat可以调用系统接口得到数据流。
c. 云存储ceph亦是如此 - 数据库(postgresql, elasticsearch)
a. postgresql数据库的事件流为触发器,可以很方便地使用扩张与外界流通。
b. postgresql数据库的数据流则为replicate接口,本质上从底层日志文件读取
c. elasticsearch使用NRT策略实现数据流准实时 - 消息队列(rabbitmq, zeromq, kafka)
最标准的实时流组件自然就是消息队列了
a. RabbitMQ主要用于事务事件流,保障消息稳定性
b. ZeroMQ主要用于网络事件流,提高网络的扩展性
c. kafka主要用于数据流机制,高并发低延迟 - 配置管理(zookeeper)
zookeeper对配置变化进行监听,产生事件流. - 前端展现(websocket, react)
a. 从后台服务到前端的事件流主要通过websocket触发
b. 界面ui的事件流触发主要通过facebook的react框架完成
现在可以看到,所有的系统已经完成了实时的闭环.
- ui通过react框架完成前端事件流机制.
- 前端事件流通过http以及websocket建立后台事件流通讯.
- 后台服务事件流通过CSP机制完成业务接口实时响应功能.
- 后台服务配置状态通过zookeeper事件流管理
- 后台服务数据流通过kafka消息队列机制完成数据整合.
- kafka通过postgresql以及es存储引擎完成数据分析功能.
有了事件流及数据流,接着看强大的流式引擎。
- 事件流与数据流处理引擎(csp,ksql,conduit)
常用的事件流引擎就是CSP跟Actor了, 简单快捷!
常用的数据流引擎kafka streams & KSQL/storm/onyx/flink
conduit与transducer为可复合的流式引擎,也是本文的核心。 - 流式引擎状态存储(rocksdb)
目前主要采用的状态存储一般为KV存储,主要分为rocksdb以及redis+hbase
rocksdb更简单好用一些, 在区块链及kafka streams引擎中起着至关作用。
流式引擎相比较于批量引擎强大在何处?
随着响应式编程跟实时计算的兴盛, 批量引擎逐渐在衰落。
- 批量引擎没有事件流响应机制。
所以需要引入调度器事件流机制来连接流程,引起了极大的复杂度跟维护性。并且对于调度机制来说,目前的技术大部分采用手工配置,日积月累形成较大的维护负担。 - 批量引擎不能管线运行,造成极大的内存消耗, 存储浪费和时间延迟。
例如如果有五个表进行join,就得进行四步中间操作,如果采用流式就可以同时一起遍历所有流对kv存储进行操作。加上调度系统的作业依赖,延迟性进一步加大。 - 批量引擎无法动态资源管理,对于系统稳定性有极大影响.
当前分布式批量引擎对于资源的分配都是静态的,对于hdfs来说按照块来化分更是硬伤。由于数据的存储,内存的分配,计算的复杂度没有较好的关联性,需要大量的人工介入与参数调整。而流式系统对于内存消耗较少,存储压力极小,计算复杂度可以自动化动态增加节点进行微调,所以有着更大的扩展性与稳定性。 - 批量引擎调试成本太高,人工介入性过多。
批量引擎失败后,没有自动恢复触发机制,需要人工去干预。对于作业并行以及数据间的相互影响,一旦不慎重考虑,将是一个巨大的恶梦等在前面。而流式处理在设计之处并行上极为独立,相关的数据修复及影响将会自动触发,所以简化了系统设计,将系统的健壮性根值在程序逻辑中,减少了人工性,加强了事件性自主修复功能。由于流式机制的响应式特性,对于排除故障亦是更加方便。 - 批量引擎短时间产生极大io及CPU压力,无法与查询引擎共存。
由于批量引擎处理过程中,带来大量的io及CPU压力,产生jvm gc及磁盘读取过慢,会极大影响外部的查询接口。目前唯一能做的办法是将批量过程隔离开来,仅同步最后结果至服务平台。但是限制还是非常大的,同步数据量占整个io消耗的比重是很难预估的。如果产生的结果大部分要同步,最终批量分隔出来产生的效果并不会明显改善,并且对于分布式系统需要两套集群环境开销,非常不经济。如果采用实时系统,所有的载荷通过实时平摊,加入分片机制扩展缓和,流式计算引擎与服务查询引擎是可以很简单共存的。
前面都是废话,我们要开始正题了
- 流式引擎的分类
a. Tuple事件流架构-storm
b. Sequence型架构-kafka streams, storm trident, flink
c. Transducer复合型架构-clojure
d. Conduit复合型架构-haskell - 深入理解Conduit
a. Conduit的基本原理
b. Conduit的高级技巧 - Conduit源码剖析
a. Conduit类型定义
b. Conduit的传递与融合
c. Conduit的基本函数-yield, wait, lift, leftover
d. Conduit的组合函数-yieldMany, awaitForever, fold, map
e. Conduit的应用函数
e. Conduit的Chunk机制
一. 流式引擎的分类
前面谈到,数据流是更高级的事件流,因为事件流仅仅关心当前事件并做出响应,而数据流在此之上要考虑更多的状态计算,跨行汇总之类的东西。
a. Tuple事件流架构-storm
当年最早在流式引擎领域称雄的就是storm了,那时候的流式处理比较原始,基于事件流来完成。
比如发送一个tuple事件之后,产生ack应答事件,保障数据处理的完整性。当然底层考虑的问题会比较复杂,一旦涉及网络,消息的保障性就会异常麻烦。
- 最简单的搞法是最多一次,发送了我就不管了,至少下游是否接受到并处理我不关心,算是脏乱差的搞法,在网络不复杂的情况下已经能处理大部分数据精度要求不高的情况。
- 接着就是至少一次,发送之后要求接受方应答,如果没有收到应答,定期重新发送。在这种情况下,如果应答发送后没有接受到回应,是会再次发送的。所以存在多次发送的情况。多次发送的情况下游通过消息id信息进行幂等处理一下就好了。
- 当然大家都想要严格一次的效果。如果前端幂等方便实现当然是最好的。如果不能实现,我们可以对消息进行批次分类,第一批给一个事务id,对没有应答的消息再次发送。对于接受方,可以通过事务id来进行幂等处理。
当然事件流引擎还要处理数据流入速度过快的情况,这就是回压技术。当压力过大时,减缓上游的取数速度。
在事件流引擎里面没有状态管理的概念,所以需要人工管理。
- 一种是内存性的,基于内存运算,可以通过rpc机制访问。
- 另一种是借助redis, hbase, rocksdb之类的KV存储引擎完成.
在这种事件流机制的情况下,开发效率较低,代码复用性差。好在有了事件流机制的保障下,我们可以开始下一阶段的流式处理抽象。
b. Sequence型架构-kafka streams, storm trident, flink
storm的tuple事件流机制之后,函数式技术在数据领域方面的先天优势开始发挥出来。
当把事件流抽象到数据流的概念的时候,我们可以无需额外编码就可以运用映射函数式里的map,filter, concat, reduce, sliding, group by各种内存状态操作。
映射函数式的核心是sequence,就是一个惰性的序列,与java的iterator接口类似。
映射函数式是函数式的基本功能,不够强大,但是应用广泛,包括java streams, spark rdd, scala, flink, storm trident, kafka streams都是映射函数式,属于函数式初级功能。
映射函数式有很大的缺陷,只能向下复合,不能平行复合。拿最基本的平行复合例子来说,比如用映射函数式同时独立计算sum跟count,是很难实现的。
为什么呢? 因为映射式函数是不可侵入的。
当我们计算sum的时候,整个sequence是作为一个整体处理的,没法同时侵入count的过程。之所以映射式函数不可侵入,是因为映射函数式考虑的是整体操作。
我们知道基本的函数式操作是transform/reduce,就是单行转换与跨行聚合操作。而聚合操作这个过程无法平行复合的,因为它需要跨行遍历sequence。当然,如果需要同时进行reduce操作,是可以在reduce里面同时进行两个操作,自己分别管理状态。但是这不是复合操作,没去完全合并独立的两个reduce操作。
基本上市面上的流式引擎大部分落在这个阶段,因为函数式编程仍处于初步阶段。kafka streams在此基础上通过rocksdb立置了状态管理,加上KSQL的解析引擎,在传统流式引擎方向是属于技术比较领先的了。
c. Transducer复合型架构-clojure
作为函数式鼻祖实用性分支的clojure认识到了映射性函数式的局限性,并开创了transducer技术。
transducer技术是什么呢? transducer不再关心sequence。而是关心函数的复合。transducer分为两个过程,transform跟reduce。前面讲过transform过程是可以自由组合的。
所以transducer的核心思想就是,数据不再是sequence,而是transform函数复合调用链,最后进行reduce对结果进行合并。
这个时候,数据流是可侵入的,因为transform的每个操作是单行处理的调用链,只有reduce是负责整体。
那么我看看一下前面这个问题,如何同时计算sum跟count呢?
我们可以使用juxt函数,将调用链同时传递给sum跟count。juxt进行reduce操作的时候,对sum跟count也进行reduce即可。
=> (into {} (x/by-key odd? (x/transjuxt {:sum (x/reduce +) :mean x/avg :count x/count})) (range 256))
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}
对于transducer技术来说,算是一个很大的进步了。transducer在垂直复合的情况下,完全了reduce的水平复合,基本上可以说解决了99%的问题。那么另外的1%呢? transform的水平复合!
由于transform必须调用reduce来完成复合,所以两个transform仍然是无法复合的。比如我要对数据进行奇偶分类,接着对奇数进行平方,对偶数进行3次方。不进行reduce处理,它的中间transform结果是无法流式合并的。
d. Conduit复合型架构-haskell
最后haskell老大忍不住了,他毫不谦虚地喊道,小弟们,老夫就是专业搞复合技术几十年啊!
众人不解,前面该想到的办法都想到了,难道还有方法不成?
haskell老大无所畏慎地说了一句,去中心化。
这里我们就要开始提到conduit的思想了。
conduit基于pipe的概念。数据流可以是pipe, 处理逻辑也可以是pipe。tranform跟reduce是同样的pipe, reduce是有返回值的pipe。有了pipe之后,我们可以构建连接器进行融合。
对于前面的问题: 对数据进行奇偶分类,接着对奇数进行平方,对偶数进行3次方。
我们对于这两个pipe逻辑构建水平连接器, 接着调用垂直连接器从上游获取数据。当从上游获取数据后,水平连接器分别单行调用垂直连接器,输出合并后的结果。一口气分成了分流及合流操作。
对比于前面的transducer思想来说,数据的采集不再区分transform跟reduce,transform是多行结果数据, reduce是单行结果数据,统一由连接器收集组合。
基本思想如此,那我们今天的主角就是conduit,让我们看一看它是具体如此实现的。
二. 深入理解Conduit
Conduit的官方文档如下: https://github.com/snoyberg/conduit
1. Conduit的基本原理
Conduit的Pipe分为生产者Pipe, 转换器Pipe, 消费者Pipe。
- 生产者Pipe是一个递归的惰性数据结构. 包括自己的值与生产下一个生产者Pipe的逻辑。
- 转换器Pipe则是一个递归的惰性函数结构. 对于每个函数,接受输入值之后,产生新的Pipe. 新的Pipe可以是新的生产者Pipe,或者是部分消费的leftover Pipe,或者递归消费的转换器Pipe.
- 消费者Pipe则是一种特殊的转换器Pipe,即为最后Folds(同reduce)操作完成的单行值。
所以,整个连接的过程大体上有三种
- 单个pipe内部组合过程, 也叫自连接传递>>
- 同种pipe相互组合的过程,叫做Zip
- 不同种pipe的连接组合过程,叫做融合fuse
a. 单个Pipe内部组合的过程有两种基本操作:
- 对于生产者Pipe为yield操作,多个yield操作对数据进行递归连接产生数据流。
- 对于转换器Pipe为await操作,await操作之后可以yield新的数据,leftover将读取的原始数据重新连接回去, 连接新的await递归生成新的数据流
b. 同种Pipe Zip显然有三种:ZipSource,ZipConduit,ZipSink。
- 由于这里是单线程, 对于生产者ZipSource,仅仅只是将两个数据流结对组合起来,比较简单。
- ZipSink则是将两个Fold 消费者Pipe结对起来, 比如同时计算的sum跟count。
- ZipConduit则是分别水平组合两个转换Pipe,将输出的结果依次连接起来,由于不同pipe对于同样的数据输入输出逻辑不一样,所以没有固定的顺序,谁输出就连接谁。
c. 不同Pipe之间融合逻辑比较通用
融合逻辑是Conduit的核心,也是流式架构大杀器。
当然在极少数情况下,融合操作比较困难的情况下,可以选择直接将函数逻辑与Pipe相连,而不一定需要融合。这里不过多展开介绍.
为了进一步理解融合的过程,我们对前面的一些概念进行具体定义。
- 生产者pipe yield连接出的pipe定义为:
HaveOutput [next HaveOutput] o - 消费者pipe await读取输入的pipe定义为:
NeedInput (i -> [new Pipe]) (u -> [new Pipe])
这里await的pipe分为两个分去,一个有数据输入,一个是数据读取结束 - 消费者pipe await递归连接之后,还可以推回消费值Leftover, 生成新的HaveOutput
Leftover [new Pipe] l
介绍了基本的Pipe类型,我们可以简单了级一下Pipe的融合过程.
组合时,拨开右边发现是NeedInput,我们从左边读取数据交给右边产生新的Pipe,如果新产生的Pipe依然是NeedInput接着递归下去,直至结束,两边递归完成。当然这里只是最简单的一种情况,完全的情况我们后面在源码里面详细讲解。
2. Conduit的高级技巧
前面讲了基本的传递自连接>>,融合fuse及zip操作。
这里介绍得是一些便于使用的高级功能。
- Chunk功能
- 消费策略
a. Chunk功能
由于数据处理中涉及批量读取的过程,即一次读取n条数据。对于常用的sequence方案来说,一般会进行concat手工拆解,带来了不必要的麻烦跟性能开销。
Conduit提供了Chunk原生实现,及基于Chunk函数,可以原生操作在底层的数据中。比如读取文件时,一下子读取1024个字节,分多批次读完。有了chunk函数之后,我们可以直接基于chunk实现丢弃掉2000个字节,而不需要进行对字节拼接连之后再做处理。
b. 消费策略
消费策略分为两种,一种是输出消费策略,另一种是分层消费策略
- 输出消费的意思是,仅消耗输出需要的数据。如果输出不需要输出,那么融合的过程是不需要去读取父Pipe的数据的。
- 分层消费策略,就是可以对数据进行分层处理。传统的基于sequence的操作take读取部分操作之后,数据流是直接丢弃的,不能进行分层读取。而在conduit中可以对于前面未消耗的数据递交给下面的逻辑消费,达到分层消费。但在未些情况下,不太明确程序内部逻辑的情况下,可以强制消费读取数据。比如先读取10条数据给子转换器处理,子转换器不知道是否进行了消费,我们可以对未消费的数据进行强制消费操作,最终紧接着的转换逻辑消费的必定是10条之后的数据。
三. Conduit源码剖析,好好玩!
a. Conduit类型定义
b. Conduit的传递与融合
c. Conduit的基本函数-yield, wait, lift, leftover
d. Conduit的组合函数-yieldMany, awaitForever, fold, map
e. Conduit的应用函数
e. Conduit的Chunk机制