netty

开发流式量测数据处理系统的基本模式

2018-02-12  本文已影响0人  Queen〇fLaponia

  译:Patterns for Streaming Measurement Data with Akka Streams

  可用于关键系统worker分离集群的思路:分布式物联网,并使用MQTT 协议作为传输协议。Akka actor系统具有的轻量级的松耦合的actors,可以在去中心化的集群中具有强大的路由,分片和发布/订阅功能。文章展示了基于这个系统的物联网架构,并编写了相关代码来作演示

  akka MQTT物联网

  对于流式数据宜看作为一个个独立数据消息组成的数据流,因为这样符合计算机处理流式数据的本质比如TCP(因为是流所以传输层的TCP并不知道数据截止到哪里就是一个完整消息,所以在TCP之上需要指定消息格式子协议,最常用的就是在头部就指定本消息长度,其他方式比如短连接可以以客户端断开连接作为截止信号、对xml消息可以以根元素的结束元素为截止),总体上看流式数据最像时序数据,按照时间维度有序流动的数据消息,适宜用Seq上的时间窗口处理。

  actor是适合处理流式数据的,但你自己在actor之间实现稳定的流式数据传输却是冗长易错的,除了发送接收,还得注意限流防止压垮缓存/邮箱,最重要的是,流处理已经总结归纳为了诺干最佳实践模式,基于actor去实现这些模式代码已经可以相对固定了,也就是可以形成高度抽象、高度通用的模板,这就是Akka Streams API。流式处理的最佳实践模式在全行业的沉淀是RSocket

Chana

  Akka Streams为你提供直观安全的模板化/程式化的流式处理开发模式,自带基于资源的限流、反压,消灭OutOfMemoryErrors,为此我们的流必须能指定使用缓冲大小、如果消费者跟不上还需要能够逐层反压上游生产者。Akka Streams API是给最终用户用的,响应流宣言是描述不同流处理系统间如何交互的,这是他们的不同之处。

  一些优质中文scala、AKKA资源:

1、掘金水滴

2、scala官网

3、推特

4、ScalaCool

5、shawdubie.com/

6、  http://letitcrash.com/post/30509298968/case-study-an-auto-updating-cache-using-actors

  作者Colin Breck长期致力于工业监控流式数据系统,具备分布式数据采集系统、持久化队列/发布订阅的消息系统、高性能时序数据存储系统经验;在Unix, Linux, Windows, Azure, and AWS上开发系统的经验;使用C, C++, C#, and Scala开发经验;开发过商业系统、开源解决方案;具备platform-as-a-service云组件使用经验。不管采用何种开发语言和平台,它们都有同样的流式量测数据处理模式,而Akka Streams API是作者使用过的最自然、强大、高效的开发此类系统的框架。


  Akka Streams是高效描述和执行流式应用的框架,使用它比actor模型以及系统语言c++、java等更加自然、强大。作为演示,客户端(往往是移动端)使用WebSocket连接上送流式量测数据到server,server做解析并存储入库。上送的频度变化跨度很大,有峰值爆发也有低谷。

  下面分类总结了流式量测数据处理模式、展示基于Akka Streams API如何可靠实现这些模式,流式量测消息来自MQ、broker或基于WebSocket的某种协议,比如从WebSocket接收消息、发给Database Actor去全异步解析、存储。

分组消息

  处理unbounded流式数据往往需要先对其分组,比如处理一批传感器定时上报的量测数据,当将处理后的量测数据存入数据库或MQ,一般要按批写入而不是一次写一条,这是为了减少请求次数、提高写入性能(类似网络通讯的合并缓发,但需测试证实有效并量化效果,因为也有说法是每actor独立存储情况下,每次存储很小的数据量,性能也不错。也许是因为现代数据库连接池普遍有所优化)。基于 Akka Streams API分组消息数据很方便,只要增加调用grouped:

处理无限输入流,一次向数据库写入1k条元素

  grouped是标准scala集合Iterable特质的方法,可以将Iterable对象分成指定大小的组,除最后一组可能会小一些以外,每组大小相同,例:

  val t = Iterable(1 to 16: _*)

  val groups = t.grouped(5)

  groups仍为一个Iterable,它包含4个List,前三个都是5个元素最后一个只有一个16。类似Spark对scala集合类操作的扩展,Stream采纳这种泛化的函数式集合操作。

  流式数据处理系统的一个天生特点:数据往往需要及时处理以及传输给其他系统。比如对于来自传感器的量测数据处理重点关注的需求有:需要近实时的聚合、生成告警或在UI界面展示,而且,传感数据都是秒级采样上报的(频率高,总数据量也不小)。分组group处理往往会带来无法接受的延迟,针对这一点Akka Streams API定义了 groupedWithin 机制来分组 events(这就是合并缓发了,在网络通讯中被证实有效,storm等多种框架采用):

but also emit events within a bounded time-frame, even if the maximum number of events has not been satisfied. This leads to the efficient batching of events, without the introduction of unacceptable latency.

分解消息

  和分组处理消息同样很常见的做法却正相反:将一批消息的聚合分解为它的独立元素再处理,比如JSON消息往往是聚合起来的集合,你收到的都是数组,数组的元素才是独立消息,如下风机上送的json消息

{ 资产编号,信号采样时间,量测值{电量,转速,风速} }

  这样的消息很常见,一般是按量测类型分解为独立的采样数据点、发送到下游做聚合计算、存储入库或者展示到用户界面上,这样一条消息分解之后的独立采样数据点使用case class可以表示为:

此处宜加final;signal是一个量测的名字,value是值

 Akka Streams新手想当然的会想把消息按照signal分组为采样数据点的seq、然后flatten这个seq、把每个元素作为一条消息发往下游,这实际上会制造一个unbounded JSON消息流:

处理单条消息的例子

  mapConcat可以进一步简化:

限流

  对于流式系统来说,重要的一点是对发往其他服务的并发请求做限流,以免压垮其他系统(压垮别人可能很容易做到,你可以轻易写一段并发代码写爆网卡缓存,总体上做到优雅服务降级,也就是负载均衡、保障service-level agreements,特别是数据流源源不断、消息速率动态变化情况下,对此Akka Streams提供了无缝支持,当待处理的请求饱和,系统会进行反压、暂停请求,而不是任凭内存耗尽:

对Sample异步、分批存储入库,并且限制了待处理消息上限是10个

节流

  传统多线程编程当中,对消息进行节流有一定挑战,特别是涉及到计数器、计时器、跨线程同步,节流消息可能对集成外部系统很重要。类似限流,在面对有意无意造成的消息洪流时,节流消息在维持系统一致性方面(状态保持一致,即数据及其处理保持完整)同样重要,那些场景下一些客户端会疯狂发送消息(也许是bug也许是恶意denial-of-service攻击),比如,如果预期一个客户端每分钟向服务端发送一条消息,那么服务端就没有理由接受这个客户端更高频度的消息。

  如果上游超出额定频度,那么throttle元素可以熔断流,或者通过反压手段来规范shape流的行为:

限制上游仅能每秒发送一条消息

  消息发送频度并不总是一样的,比如,一个客户端在暂时丢失与服务端连接时可能会缓存消息数据,一旦连接恢复,就会补发所有数据,这可能会造成消息洪流,maximumBurst参数用于允许客户端爆发式补发数据的同时仍然保持节流消息不要超出上限,可以参考Akka Streams官方文档了解token-bucket算法。

  在填充/搬运大量数据时节流也有用处,比如将一整年的量测数据从一个数据库到另一个库进行迁移,可以对填充做限流以免压垮数据库或冲击其他依赖该数据库的服务,使得填充可以在线进行以免维护性下线服务。

并发

  要构建高效、可扩展、低延迟的流式数据处理系统,并发执行任务是必须的,比如一个场景是程序从MQ中读取消息,执行一个CPU密集型任务、对消息进行转换、之后将转换结果异步地存储到数据库。这个场景要最大化性能怎么设计?至少我们可以考虑:在一条线程向数据库写入结果的同时,另一条线程可以从MQ读取下一条待处理消息,也就是至少可以并发读写,更进一步,甚至可以在第三个线程上并发地执行cpu密集型的数据转换,理论上这样肯定能进一步提升吞吐量性能(这个场景在ESB中相当典型:SEDA,多阶段事件驱动架构)

  Akka Streams默认会顺序地(单线程)执行一个流,单线程对于许多流处理来说就够了,比如只是执行一系列数据map转换,默认单线程没毛病,因为这样可以避免异步传递消息的成本、并且最大化利用了代价最低的young-generation GC.。对于传统的多线程编程,不同任务的并行处理可能非常复杂,涉及跨线程的数据同步,但是使用Akka Streams可以化繁为简,只要在算子之间的异步边界上用async描述一下就好。

对一百万JSON消息进行gzip编码解码(both CPU-intensive operations);单线程 使用两条线程分别并发执行编码和解码,比上述单线程快40%;async就是用于定义异步边界的,就像划分SEDA中的Stage

  Akka Streams就是这样定义异步边界,但需要你对数据处理管道有深入的理解,比如:哪些异步操作是CPU-bound、哪些是memory-bound、哪些可以异步执行.....  但是毕竟它不像传统多线程编程那么复杂,定义异步边界的方式明显简洁直观得多——因为它更合理、更简单,简单到你可以反复尝试在何处定义异步边界效果最好,这才是本来就该使用的自然方式,只要在合适的位置加入async元素。

  这是流处理所特有的、从ESB继承下来的优势,因为数据管道就这一条,在这一条线上,你可以随意定义异步边界,这也帮助你最快找到最佳边界,类似的在遵循SEDA多阶段事件驱动的ESB中比如spring integration或者mule,一个Stage叫做一个消息流flow,实际上就是一条线程的运行边界,具体结构是一个前任务队列后线程池组合,由你来划分Stage,可粗可细。实际上微服务就是SOA2.0,要说SOA中只有一个东西有价值,那只能是ESB,而不是现在的主流声音所鼓吹的ESB太重了,ESB也对流式数据处理有启发意义。SOA败在服务间交互协议,最初的soap基本全盘失败已经凉了、中间乱入的rest也不完全适用这个场景,因为它们都基于性能和版本都不稳定的Http,而Http根本不是设计用于服务间交互的,未来的方向是RSocket.  微服务没有多少新意,与SOA一脉相承,SOA做不好的微服务一样:SOA的服务编排UDDI早早就凉了,微服务同样也将无法解决这个问题;没有服务编排的服务注册中心也叫不响了,会降级为类似分布式发布订阅的功能;SOA里最火的ESB,也就是现在微服务里最火的ServiceMesh

  上述async元素是最好最直观的划分异步边界的方式,类似spring integration中的channel,或者mule中的flow,微观结构都是前任务队列后线程池。

Idle Timeouts

 我发现idle timeouts空闲超时在两种场景下特别有用(两个场景都是关于不受控行为的):一个是当流空闲时回收资源,这在 WebSocket server上很常见,或者是一个数据聚合管道,停止从上游组件接收数据(组件临时下线或者被换掉,移动客户端或者其他服务组件都是不受控的)

一分钟之后流会抛出超时异常,导致流处理中断并记录错误日志,日志会记录该设备ID

  空闲超时用令流失效的简单粗暴方式来回收资源,但体现了更多的问题,比如说为什么连接仍然正常情况下,客户端在长达十多分钟时间当中未与服务端通讯?,它能保证每分钟都发送消息吗?,如果没有强制空闲超时,这些可能根本就注意不到,因此,问题并没有得到解决.,靠个体很难解决。

  第二个强制空闲超时有用之处是为流式数据处理系统开发功能测试,我经常发现自己在写验证流输出的测试,这些流可能会做数据转换、写数据库或者是接着交给下一个流处理,测试主要是测业务逻辑和输出正确与否,但是得有数据在预定时间内通过流来处理,在具备复杂业务逻辑的分布式系统中,就用得上空闲超时来应对消息延迟。


周期性事件

  流式经常使用周期性消息,比如心跳、推送状态或者通讯流量水位,可以用于触发依赖事件

简书抽风,写了半天全丢了
上一篇下一篇

猜你喜欢

热点阅读