实时数据相关

flink异步io应用场景之流表join维表

2019-01-20  本文已影响2人  岳过山丘

1.flink异步io的定义参考

http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

2.应用场景之流表join维表。

流表是kafka等流式数据。
维表可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api。
根据流表join维表的字段去异步查询维表。

3.举个例子

流表:kafka id1,id2,id3三列
维表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的结果就是: id1,id2,id3,age,name 流表的字段加上mysql维表的字段。
流表这边提供id1,给到维表,维表那边执行的sql是select * from mysql where id=id1

4.实战

参考袋鼠云开源的flinkStreamSQL:

https://github.com/DTStack/flinkStreamSQL
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
源表:kafka 0.9,1.x版本
维表:mysql,SQlServer,oracle,hbase,mongo,redis,cassandra
结果表:mysql,SQlServer,oracle,hbase,elasticsearch5.x,mongo,redis,cassandra

核心是

   AsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);

        //TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
        if (ORDERED.equals(sideTableInfo.getCacheMode())){
            return AsyncDataStream.orderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
                    .setParallelism(sideTableInfo.getParallelism());
        }else {
            return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
                    .setParallelism(sideTableInfo.getParallelism());
        }
inputStream 就是我们的流表
loadAsyncReq 就是返回一个RichAsyncFunction,定义是

public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row>
就是从维表中查询数据,目前袋鼠云支持的几种维表有


image.png

5.扩展

流表来源只有kafka,太少,我们可以扩展一下读取mysql作为流。参考这里https://www.jianshu.com/p/5faa7f822d89

上一篇下一篇

猜你喜欢

热点阅读