Flink 流处理在中信建投证券的实践与应用
Flink 流处理在中信建投证券的实践与应用 FL.jpg img摘要:本篇内容整理自中信建投证券金融实时数仓项目负责人刘成龙、金融资讯数据研发工程师蔡跃在 Flink Forward Asia 2021 行业实践专场的演讲。主要内容包括:
- 中信建投证券 Flink 框架
- Flink 流处理场景
- 金融资讯实时化改造
- 未来展望
中信建投证券公司成立于 2005 年,2016 年港交所上市,2018 年上交所主板上市。投行业务连续 8 年保持行业前 3,托管证券规模行业第 2,主要经营指标目前均列于行业前 10。伴随着公司的业务一路高歌猛进,技术方面也不容落后,数字化转型正在成为我们近些年来的发展重点。
由于金融行业涉及的业务领域众多,公司多年来积累了大量复杂的与业务高度相关的基础数据,在发现问题、分析问题,解决问题的过程中,如何协调业务前、中、后台以及科技部门等多方面配合来开展业务口径的梳理与加工逻辑的开发,成为目前亟待解决的关键问题。
一、中信建投证券 Flink 框架
img数据中台架构如图所示。主要分为以下几大板块:由 Greenplum 数据仓库和 Hadoop 大数据平台构成的数据中心板块;以离线开发、实时开发、数据交换为主的数据开发板块;以及数据门户、数据网关、数据治理、运营管理等板块构成。
其中数据开发板块目前的任务主要以离线开发与数据交换的离线数据处理为主。但随着业务对数据时效性的提高,基于离线批处理的 t+1 业务模式已经无法完全满足当前市场环境下对信息及时性的需求,这也是大力发展实时开发,力求为客户提供更高时效性数据服务的目的。
我们以实时开发整个链路为例,对数据中台各个板块间的相互联动进行说明。
从数据门户统一入口进入实时开发模块,首先将集中交易、融资融券等业务信息的实时增量数据拉取到 Kafka 消息队列,Flink 消费 Kafka 实时流数据并与维表数据进行数据加工。加工逻辑中涉及的维表数据量比较大时,需要离线开发与数据交换,通过离线跑批的方式完成对维表的数据准备。最后将结果数据写入关系型数据库或 NoSQL 数据库。数据网关再通过读取结果数据生成 API 接口,对下游的系统提供数据服务。
数据治理板块中的数据管控模块主要管理数据中台的数据库表以及业务相关的数据库表的元数据,用户可以在数据门户订阅他们所关注数据库表的变更信息。当订阅的数据表发生了变化的时候,运营中心可以通过统一告警模块,多渠道通知订阅用户数据库表的变更情况,以便于开发人员及时调整数据加工的任务。
imgFlink 实时流处理架构首先通过 Attunity 工具采集业务数据库的 CDC 日志,将同一系统下的数据库表变化写入 Kafka 的一个 topic 队列中,这也就意味着 Kafka 的每一个 topic 中都会有多个表的数据,所以在 Flink 的 Kafka source 要先对 schema 和 tablename 这两个字段进行一次过滤,获取想要拿到的数据表的 CDC 数据流,再进行后续与维表的加工逻辑。将处理后的数据写入结果表,根据需求不同写入不同的数据库进行存储。
数据库的选型一般遵循以下原则:
- 数据量比较小且不要求高并发的情况下,通常选择关系型数据库进行存储;
- 数据量较大,而且对高并发有需求的时候,通常选择 HBase 作为存储介质;
- 少量数据但要求高并发的情况下,选择 Redis 进行缓存;
- 涉及大量数据检索的情况下,一般选择 ES 作为存储组件。
证券行业数据有两个明显特征:
- 其中一个是开盘的时间固定,大量业务在收盘后数据量会大幅减少,甚至有一些业务在收盘后不再产生新的数据,所以为了节约资源,我们会根据实际情况对那些与开盘时间紧密相关的任务设置启停时间;
- 第二个特点是金融数据的重要性,大量场景下不允许数据偏差存在,针对数据可靠性要求极高的特征,我们对大量实时任务设置了夜间数据修正的离线任务,保证数据的正确性。
二、Flink 流处理场景
下面以几个实际场景说明 Flink 流处理的应用情况。主要分为三个场景,零售业务实时指标统计、基金投顾实时指标统计和资金流水明细查询。
2.1 零售业务场景
img零售业务线实时指标是管理驾驶舱的重要组成部分,决策者通过分析公司运营指标,对公司的运营和发展作出合理决策。
面向零售业务设计实时数仓,需要获得开户统计、客户服务、APP 运营几个主题的统计指标,根据实时数据处理架构和数据仓库分层的设计,面向零售业务的实时数仓可以分为以下几个流程:
- 首先是构建 ODS 层数据,实时采集客户信息表、业务流水表、渠道表等相关基础表的 CDC 日志。每个业务库的数据表对应接入到一个 Kafka 的 topic 中建立实时数仓的 ODS 层;
- 其次是 DWD 层的数据建模,创建 Flink 任务消费 ODS 层的 Kafka 消息,进行数据清洗,过滤、脱敏、关联转换等处理。同时以客户账户粒度进行数据合流,借助离线维表进行扩围操作,以获得账户粒度的明细表,实现 DWD 层的建立;
- 之后是 DWS 层的数据建模,基于 DWD 层的数据进行汇总,通过分析业务需求,将 DWD 层的数据按照主题进行划分,汇总出渠道服务主题宽表、业务部运营主题宽表、交易产品主题宽表等公共指标宽表,建立 DWS 层;
- 最后根据实际业务需求,计算业务指标建立 ADS 层。对于一部分用户账户粒度的业务指标,可通过 DWD 层的明细直接计算得到,部分粗粒度的业务指标比如 APP 渠道服务客户人数、投顾产品阅读人数等,可以通过 DWS 层计算获得。最终计算结果接入到数据网关将数据统一提供给下游系统或通过 BI 系统展示。
通过对实时数仓进行分层管理,能够带来两方面的好处:
- 首先是避免烟囱式的数据开发模式,无需所有任务都从消费 Kafka 的 ODS 层数据开始,减少了时间上的开销,更有利于数据的恢复,并能够支撑不同业务主题的灵活分析;
- 其次在数据加工出错的情况下,更容易判断是哪个分层的数据加工逻辑出了问题,缩短排错时长。
2.2 基金投顾实时指标统计场景
img基金业务在证券行业的重要性日益凸显,它能实时提供基金投顾产品的销售信息,为基金投顾及时调整策略提供数据支持。基金投顾场景的数据有三个特点:
- 第一,涉及的数据规模比较小;
- 第二,数据在开盘时间提供给公司内部人员查看;
- 第三,数据对准确性的要求特别高。
针对数据量小的特点,我们将数据指标结果输出到 Oracle 关系数据库;针对开盘时间将数据供给内部人员查看的特点,我们开启实时任务的启停策略,将更多的资源留给夜间跑批的任务来使用;针对数据准确性要求很高的特点,我们通过夜间离线跑批的方式对数据进行修正,以保证数据的准确性。
原来的方案是通过页面触发存储过程来读取数据,而且读取的数据不是源系统数据,存在分钟级别的延迟。而实时数据加工方案通过实时推送客户新增、追加、签约、保有、签约率、规模等维度的指标,让业务部门可以更高效地掌握核心数据。
2.3 实时 ETL-资金流水场景
img此场景主要满足业务人员在开盘期间快速查询客户某个时间段内的交易流水明细数据。它需要解决三个问题:
- 第一,资金流水明细,总共几十亿的数据,数据量很大的情况下,如何做到快速查询?
- 第二,开盘时间内满足业务人员查询,且非开盘时间内数据量较小,是否采用定时调度?
- 第三,资金流水一定不能出错,如何保证数据的准确性?
针对数据量大的特点,我们最终选择通过 Hbase 组件来存储数据,通过合理设计 rowkey 与建立 region 分区,达到快速查询指定时间段内的资金流水明细情况;针对非开盘时间内交易数据量很小的特点,开启任务的定时启停策略,将更多的资源留给夜间跑批任务;针对数据准确性要求高的特点,通过离线数据修正的方法来达到准确性的要求。
三、金融资讯实时化改造
在金融领域每天有着各种新闻公告等这些每个市场参与方最常阅读和关注的资讯。我们公司对资讯的定义不仅包含上述这些传统意义下的资讯,考虑到数据本身的庞杂多样及收集、管理、应用等实际流转过程,我们对资讯做了重新定义,即所有的非用户非交易相关的数据均为金融资讯范畴。
img我们中心汇聚了如下 4 大类金融资讯数据,最常见的就是新闻、公告、研报,此外还有交易市场相关的货币、股票、债券、基金、衍生品等证券市场数据和各个维度的宏观行业数据,最后一类是包罗万象作为兜底的其他及衍生,涵盖了各种基于市场原始数据进行分析的其他第三方机构分析的数据,比如公司舆情、基本面分析预测等。
如果把交易和用户比作金融市场的骨骼和经络的话,资讯数据就是我们金融市场的血液,产自前者贯穿全身且源源不断。
img那么品类庞杂的资讯数据具体如何流动?很简单,如图所示的三层结构:最底层是我们引入的数据源,目前大多数资讯数据已经被 Wind、同花顺等资讯数据商收集整理,我们不需要花费太多的时间成本即可获得种种基础数据。
但随着引入数据商的增多,问题也随之而来。假设某个数据商出了问题导致合作不能继续,数据服务也必定会受到影响。为了解决这个问题,我们推出了中心库的概念,自建了一套金融数据模型,下游系统都和中心库的数据结构对接,我们负责把资讯数据商屏蔽掉,这就是图中的第二层。
上图最右侧还有一个小模块叫数据直发,因为实际应用中并不是所有的下游系统都适合对接,有些依然依赖原始的数据商结构,所以这个小接口依然存在,和中心库并行共同输出数据服务。
最上层是服务对象,覆盖了公司内部的各个业务线,持续为各个业务系统输血。
在三层的整体架构下,日益增多的数据源还有数据种类提升了我们数据服务的整体质量,有能力服务更多的客户。同时中心库为核心的架构,提高了整体服务的抗风险能力,防范风险是金融公司重中之重的事。
我们前期的工作主要集中在这两点上,当这两个功能慢慢完善且稳定后,关注点逐渐转移到资讯数据传输和资讯内容优化上。市场瞬息万变,数据在链路上传播耗时越短,资讯在时间上的价值越高,传输速度没有上限、越快越好,这就是数据传输效率。但是,数据快了而上游数据商的质量参差不齐,服务只快不准,提供给用户的数据存在问题,那么如何在不损失 1、2、3 点的情况下,把控数据内容质量也成了棘手问题。
为了优化 3、4 两个点,我们以 Flink 引擎为核心进行了架构改造,选取了两个场景进行分享。
3.1 蜻蜓点金 APP F10 新闻场景
img蜻蜓点金 APP 主要提供金融资讯,数据服务给广大的投资者浏览。上图是第一版方案,主要流程为从上游的标签系统为新闻打标,流入到 Kafka 中,进而进入刚刚所设计的中心库,下游使用时将数据进行抽取转化,传输到接口库,最终通过 API 对外提供服务。
为了及时获取数据库的变化情况,我们在众多的 CDC工 具中选取了部署轻量、集成方便的 Canal 来实施。通过捕获数据库的变更,开发人员编写程序实时读取订阅 Canal 数据,将数据解析组合为业务所需的数据格式,然后主动更新写入到最上方的 Redis 中。下游用户在使用相关接口时,就可以获得最新的资讯数据,无需再等待数据被动过期。
方案一运行一段时间之后我们发现两个问题,一是链路偏长,会损失时效性;二是主动写缓存过程逐渐成为整个资讯服务的重要一环。但 Canal 作为开源工具,功能还在不断完善中,如程序监控、告警等需要单独开发实现,此外稳定性和高可用方面也略显不足。
img为此我们引入 Flink 对数据链路和数据处理环节做了调整。数据处理方面,利用 Flink 高效的 ETL 能力,引入了高时效性要求的资讯数据处理场景,同时 Flink 作为流式计算引擎,天然和 Kafka 集成,可以无缝对接,具有直接输出消息到 Kafka 能力的系统,如新闻标签系统。社区一直在完善各种 connector,像 CDC 方式就为 Flink ETL 能力提供更大的空间。同时 Redis sink 的支持也使得原有的缓存程序、业务逻辑可以整合到 Flink 中统一实现。最终使整个资讯数据处理过程得到了集中管理,缩短链路,节约了传输时间。
强大的 ETL 能力降低了架构的复杂度,节约了原有的一系列组件。从整体来看,分布式高可用的架构贯穿了上中下游,使得资讯服务能力可以稳定高效地输出。从长远来看,资讯数据应用广泛,来源和输出多样,Flink 不断丰富的连接器也可以支持数据源和目的端的进一步扩展,为资讯数据可以应付更多的场景带来可能。
3.2 多源数据交叉检验场景
如果能用一套架构解决所有问题那是再好不过的,为此我们在多源数据交叉检验的场景上做了尝试。这个场景主要为了解决把控数据内容质量的问题。数据更快,可以通过技术手段得带解决,但是数据更准就不是我们处在中间环节所能左右的了。
上游依赖诸多的数据商,数据商可能是爬虫采集、人工录入、数据接收等方式得到数据,多样的数据种类和多样的环节导致了我们接收到的数据质量参差不齐,而且问题还会直接传导到下游并逐级放大。而我们由于离源头较远,不可能跨过数据商来提供准确的数据服务,只能退而求其次变成了提供及时的纠错服务。
市面上的数据商竞争激烈,同一份数据你有我也有,所以我们是非常幸运的,大多数的基础数据我们都可以得到多份,这就为数据差异的发现带来可能,通过对多源数据进行交叉检验,获取差异数据,及时提醒并纠错。
img在整个服务链路中,越早发现问题,它所造成的影响就越低。那么,如何更早地发现问题?分为以下三步:
- 第一步是 ID 拉齐 ,金融市场的股票大家都了解,交易标准、编码规范、一码贯穿所有数据,但是更多的债和基并不如股票标准,数据商往往会针对金融实体设计内码,在数据商内部局部唯一,所以如果想做交叉检验,首先就要解决 ID 拉齐的问题,这是个体力活。
- 第二步是指标化,数据校验的需求往往是具体的,比如校验每日的股票收盘价这种。但是数据商针对校验点的数据结构往往差异较大,因此通过指标化,利用 Flink SQL 编写针对多源库的指标生成逻辑,将异构的数据结构拉齐。
- 第三步,实时校验窗口。开始想法比较简单,运行脚本再定期取数一比就可以了。但是随着指标校验需求的增多,数据量的增长,跑批的脚本处理能力略显乏力。所以利用 Flink 的窗口特性,我们开发了用于校验的实时校验窗口,聚合所需要校验的指标,在时间和数量维度上触发校验窗口计算,结果输出到 Kafka,可以支持消息的实时推送。
Flink 中支持两种窗口,一种是基于时间的窗口,另一种是基于数量的窗口。如果在时间和数量两个维度上均要加以控制,使用全局窗口加触发器就可以实现多样的用户自定义窗口分配。图中放了几行伪代码,在全局窗口上,触发器分别在元素来的时候和定时器到的时候加以判断。
在校验窗口里,利用 maxcount 判断多元指标数据是否都已到达,到达则触发窗口函数,并对比指标值;如果某数据传输出现问题,对应指标值未到达,则需要在时间维度上加以控制,定义窗口的最大持续时间,到时就不再等待,直接触发窗口函数,并将对应的数据源指标定义为延迟到达,最终的结果输出如右上表格。技术和业务人员均可以依据这份校验结果及时应对,最终实现绕路提升数据服务的准确性。差异的及时发现和处理,使得对下游的影响降到最低。
Flink 在金融行业的应用,相信还有更多的场景值得探索。搭上开源社区这班快车,让我们券商的金融资讯服务可以得到质的提升。
四、未来展望
最后分享一些实时流处理方面的未来展望,包含正在沟通的一些场景和流批一体方向的探索。
img需求沟通中的场景分为以下几个方面:
- 账户资产,包括实时资产持仓指标统计,客户交易盈亏、交易记录的分析;
- 营销知识,包括 mot 流失客户提醒与召回、开户未成功客户提醒与跟踪、两融业务潜在新客户的挖掘、电商 APP 活动的内容与内容运营;
- 风控,包含以客户维度的持仓集中度指标,以公司维度的融资额度占公司净资本等指标的分析统计。
另一方面我们项目组正在调研 OLAP 多维分析组件,由于目前实时开发仍然采用 Lambda 架构,结果表存储组件涉及到关系型数据库比如 MySQL、SQL server、oracle 以及 NoSQL 数据库比如 HBase、ES、Redis。数据孤岛是目前面临的严重问题,希望通过 OLAP 组件实现实时数据的与离线数据的统一写入,实现流批一体,打破目前数据孤岛的局面,希望在流批一体存储层达到统一存储、统一对外服务、统一分析处理的目的。