大数据大数据学习大数据,机器学习,人工智能

分享一篇FLINK SIDDHI ADDON的学习心得

2019-01-14  本文已影响4人  大数据首席数据师

SIDDHI 是一款功能强大的CEP 引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性, SIDDHI和FLINK的整合功能项目地址:https://github.com/haoch/flink-siddhi 本文主要介绍了这个ADDON的一些实现思路

  1. 将FLINK STREAM 转化为 SIDDHI STREAM 定义

用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)

通过 FlinkDataStream.getType 获得流对象的类型定义.registerStream方法会构造一个 SiddhiStreamSchema 对象,根据流对象的类型定义,自动获取每个field对应的数据类型存在内部的fieldTypes数组中.

SiddhiStreamSchema 内部会创建一个Siddhi StreamDefinition对象, StreamDefinition的attribute的定义根据fieldNames + fieldTypes 来添加.SiddhiTypeFactory.getAttributeType 负责将Flink 的数据类型映射为Siddhi的数据类型, 并可自动生成一段Define Stream的定义(见 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])

SiddhiStreamSchema 包括一个StreamSerializer: 用于将DataStream 中的对象转化为 Siddhi Stream 的输入(Object Array):
    如果流对象是一个简单类型 Atomic Type 直接将流对象放到 ARRAY中
    如果流对象是Tuple 类型,直接将Tuple 中前N个值放到ARRAY中
    如果流对象是Pojo或者CaseClass类型,直接根据每个fieldName 获取Class对应的属性放到ARRAY中

  1. 串联FLINK STREAM 和 SIDDHI STREAM

SiddhiStream: 抽象的Stream基类

convertDataStream 将原始的FLINK流转化为Tuple类型的流,Tuple的第一个元素为StreamId, 第二个元素为原来流中的数据,支持普通Stream 和 KeyedStream

ExecutionSiddhiStream: 构建 SiddhiOperatorContext 并调用SiddhiStreamFactory.createDataStream 创建了集成Siddhi的 DataStream. DataStream的类型为Tuple的子类.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通过Siddhi输出Stream的StreamDefinition获得其Attribute的定义,再通过 TypeInfoParser.parse构造Flink Tuple 类型的定义

ExecutableStream 根据Siddhi query 创建ExecutionSiddhiStream对象
  SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子类,支持Fluent Style的链式调用. UnionSiddhiStream 调用了DataStream.union 方法

SiddhiStreamFactory.createDataStream 通过 FLINK DataStream的transform方法使用了自定义的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中创建SiddhiManager 和 SiddhiAppRuntime 并注册了 InputHandler 和 OutputCallback (StreamOutputHandler)

SiddhiStreamOperator.processElement 需要处理两种场景:
    Flink TimeCharacteristic = ProcessingTime: 先调用StreamSerializer将数据转化为Object Array, 再直接调用InputHandler.send将数据发送给Siddhi处理
    Flink TimeCharacteristic = EventTime: 缓存接收到的StreamRecord 到内部的priorityQueue中,直到收到Watermark, 将priorityQueue中小于watermark的StreamRecord一次发送给Siddhi处理

StreamOutputHandler:根据Output的TypeInfo将Siddhi Event 转化为 Flink StreamRecord. 再转发到SiddhiStreamOperator的Output

  1. CHECKPOINT

SiddhiStreamOperator中保留了两种State信息,一种是priorityQueue中保存的由于watermark未发送给Siddhi的消息. 另一种是Siddhi本身的State, 通过SiddhiAppRuntime.snapshot() 获得。

如果你不想再体验一次自学时找不到资料,没人解答问题,坚持几天便放弃的感受的话,可以加我们的大数据交流群:894951460,里面有各种大数据学习的资料和技术交流。

加油吧,程序员!路就在脚下,成功就在明天!

未来的你肯定会感谢现在拼命的自己!

如果你不想再体验一次自学时找不到资料,没人解答问题,坚持几天便放弃的感受的话,可以加我们的大数据交流群:894951460,里面有各种大数据学习的资料和技术交流。

加油吧,程序员!路就在脚下,成功就在明天!

未来的你肯定会感谢现在拼命的自己!

上一篇下一篇

猜你喜欢

热点阅读