Flink在唯品会的实践

2021-04-27  本文已影响0人  Flink中文社区

唯品会自 2017 年开始基于 k8s 深入打造高性能、稳定、可靠、易用的实时计算平台,支持唯品会内部业务在平时以及大促的平稳运行。现平台支持 Flink、Spark、Storm 等主流框架。本文主要分享 Flink 的容器化实践应用以及产品化经验。

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

1. 发展概览

平台支持公司内部所有部门的实时计算应用。主要的业务包括实时大屏、推荐、实验平台、实时监控和实时数据清洗等。

1.1 集群规模

平台现有异地双机房双集群,具有 2000 多的物理机节点,利用 k8s 的 namespaces,labels 和 taints 等,实现业务隔离以及初步的计算负载隔离。目前线上实时应用有大概 1000 个,平台最近主要支持 Flink SQL 任务的上线。

1.2 平台架构

上图是唯品会实时计算平台的整体架构。

2. Flink容器化实践

2.1 容器化实践

上图是实时平台 Flink 容器化的架构。Flink 容器化是基于 standalone 模式部署的。

2.1.1 Flink 平台化

在实践过程中,结合具体场景以及易用性考虑,做了平台化工作。

2.1.2 Flink 稳定性

在应用部署和运行过程中,不可避免的会出现异常。以下是平台保证任务在出现异常状况后的稳定性做的策略。

2.2 kafka监控方案

kafka监控是我们的任务监控里相对重要的一部分,整体监控流程如下所示。

平台提供监控 kafka 堆积,消费 message 等配置信息,从 MySQL 中将用户 kafka 监控配置提取后,通过 jmx 监控 kafka,写入下游 kafka,再通过另一个 Flink 任务实时监控,同时将这些数据写入 ck,从而展示给用户。

3. Flink SQL 平台化建设

基于 k8s 的 Flink 容器化实现以后,方便了 Flink api 应用的发布,但是对于 Flink SQL 的任务仍然不够便捷。于是平台提供了更加方便的在线编辑发布、SQL 管理等一栈式开发平台。

3.1 Flink SQL方案

平台的 Flink SQL 方案如上图所示,任务发布系统与元数据管理系统完全解耦。

3.1.1 Flink SQL 任务发布平台化

在实践过程中,结合易用性考虑,做了平台化工作,主操作界面如下图所示:

图片

3.1.2 元数据管理

平台在 1.11 之前通过构建自己的元数据管理系统 UDM,MySQL 存储 kafka,redis 等 schema,通过自定义 catalog 打通 Flink 与 UDM,从而实现元数据管理。1.11 之后,Flink 集成 hive 逐渐完善,平台重构了 FlinkSQL 框架,通过部署一个 SQL - gateway service 服务,中间调用自己维护的 SQL - client jar 包,从而与离线元数据打通,实现了实时离线元数据统一,为之后的流批一体做好工作。在元数据管理系统创建的 Flink 表操作界面如下所示,创建 Flink 表的元数据,持久化到 hive里,Flink SQL 启动时从 hive 里读取对应表的 table schema 信息。

3.2 Flink SQL相关实践

平台对于官方原生支持或者不支持的 connector 进行整合和开发,镜像和 connector,format 等相关依赖进行解耦,可以快捷的进行更新与迭代。

3.2.1 FLINK SQL 相关实践

3.2.2 拓扑图执行计划修改

针对现阶段 SQL 生成的 stream graph 并行度无法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的 FlinkSQL 的 excution plan json 提供给用户,利用 uid 保证算子的唯一性,修改每个算子的并行度,chain 策略等,也为用户解决反压问题提供方法。例如针对 clickhouse sink 小并发大批次的场景,我们支持修改 clickhouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,提高 clickhouse sink tps。

3.2.3 维表关联 keyBy 优化 cache

针对维表关联的情况,为了降低 IO 请求次数,降低维表数据库读压力,从而降低延迟,提高吞吐,有以下几种措施:

优化之前维表关联 LookupJoin 算子和正常算子 chain 在一起。

优化之间维表关联 LookupJoin 算子和正常算子不 chain 在一起,将 join key 作为 hash 策略的 key。采用这种方式优化之后,例如原先 3000W 数据量的维表,10 个 TM 节点,每个节点都要缓存3000W 的数据,总共需要缓存 3000W * 10 = 3 亿的量。而经过 keyBy 优化之后,每个 TM 节点只需要缓存 3000W / 10 = 300W 的数据量,总共缓存的数据量只有 3000W,大大减少缓存数据量。

3.2.4 维表关联延迟 join

维表关联中,有很多业务场景,在维表数据新增数据之前,主流数据已经发生 join 操作,会出现关联不上的情况。因此,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟 join。

4. 应用案例

4.1.实时数仓

4.1.1实时数据入仓

采用 Flink SQL 统一入仓方案以后,我们可以获得的收益:可解决以前 Flume 方案不稳定的问题,而且用户可自助入仓,大大降低入仓任务的维护成本。提升了离线数仓的时效性,从小时级降低至 5min 粒度入仓。

4.1.2 实时指标计算

以往指标计算通常采用 Storm 方式,需要通过 api 定制化开发,采用这样 Flink 方案以后,我们可以获得的收益:将计算逻辑切到 Flink SQL 上,降低计算任务口径变化快,修改上线周期慢等问题。切换至 Flink SQL 可以做到快速修改,快速上线,降低维护成本。

4.1.3 实时离线一体化 ETL 数据集成

UV 近似计算示例:

Step 1: Spark SQL 生成 HLL 对象

insert overwrite dws_goods_uv partition (dt='{dt}',hm='{hm}') AS select goods_id, estimate_prepare(mid) as pre_hll from dwd_table_goods group by goods_id where dt = {dt} and hm ={hm}

Step 2: Spark SQL 通过 goods_id 维度的 HLL 对象 merge 成品牌维度

insert overwrite dws_brand_uv partition (dt='{dt}',hm='{hm}') AS select b.brand_id, estimate_merge(pre_hll) as merge_hll from dws_table_brand A left join dim_table_brand_goods B on A.goods_id = B.goods_id where dt = {dt} and hm ={hm}

Step 3: Spark SQL 查询品牌维度的 UV

select brand_id, estimate_compute(merge_hll ) as uv from dws_brand_uv where dt = ${dt}

Step 4: presto merge 查询 park 生成的 HLL 对象

select brand_id,cardinality(merge(cast(merge_hll AS HyperLogLog))) uv from dws_brand_uv group by brand_id

所以基于实时离线一体化ETL数据集成的架构,我们能获得的收益:

4.2 实验平台(Flink 实时数据入 OLAP)

唯品会实验平台是通过配置多维度分析和下钻分析,提供海量数据的 A/B-test 实验效果分析的一体化平台。一个实验是由一股流量(比如用户请求)和在这股流量上进行的相对对比实验的修改组成。实验平台对于海量数据查询有着低延迟、低响应、超大规模数据(百亿级)的需求。整体数据架构如下:

通过 Flink SQL 将 kafka 里的数据清洗解析展开等操作之后,通过 redis 维表关联商品属性,通过分布式表写入到 clickhouse,然后通过数据服务 adhoc 查询。业务数据流如下:

我们通过 Flink SQL redis connector,支持 redis 的 sink 、source 维表关联等操作,可以很方便的读写 redis,实现维表关联,维表关联内可配置 cache ,极大提高应用的 TPS。通过 Flink SQL 实现实时数据流的 pipeline,最终将大宽表 sink 到 CK 里,并按照某个字段粒度做 murmurHash3_64 存储,保证相同用户的数据都存在同一 shard 节点组内,从而使得 ck 大表之间的 join 变成 local 本地表之间的 join,减少数据 shuffle 操作,提升 join 查询效率。

5. 未来规划

5.1 提高 Flink SQL 易用性

当前我们的 Flink SQL 调试起来很有很多不方便的地方,对于做离线 hive 用户来说还有一定的使用门槛,例如手动配置 kafka 监控、任务的压测调优,如何能让用户的使用门槛降低至最低,是一个比较大的挑战。将来我们考虑做一些智能监控告诉用户当前任务存在的问题,尽可能自动化并给用户一些优化建议。

5.2数据湖 CDC 分析方案落地

目前我们的 VDP binlog 消息流,通过 Flink SQL 写入到 hive ods 层,以加速 ods 层数据源的准备时间,但是会产生大量重复消息去重合并。我们会考虑 Flink + 数据湖的 cdc 入仓方案来做增量入仓。此外,像订单打宽之后的 kafka 消息流、以及聚合结果都需要非常强的实时 upsert 能力,目前我们主要是用 kudu,但是 kudu 集群,比较独立小众,维护成本高,我们会调研数据湖的增量 upsert 能力来替换 kudu 增量 upsert 场景。

上一篇 下一篇

猜你喜欢

热点阅读