kafka stream 拓扑 任务 分片 的关系疑惑以及YY
在看完 kafka stream 上一个example 后 Example 有4个分区 ,这里只画2个, 自己yy了一个音乐排行 处理的拓扑图
实际拓扑和我想的差不多 , 但是拓扑的分布是完全不同的 。 我的想法是 2个instance 均匀的获得整套 拓扑节点 。 但是kafka stream 不是这么做的 。
-----------Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [play-events6])
--> KSTREAM-FILTER-0000000003
Processor: KSTREAM-FILTER-0000000003 (stores: [])
--> KSTREAM-MAP-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAP-0000000004 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-FILTER-0000000003
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-MAP-0000000004
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000004-repartition)
<-- KSTREAM-FILTER-0000000006
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000004-repartition])
--> KSTREAM-LEFTJOIN-0000000008
Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [all-songs6])
--> KSTREAM-KEY-SELECT-0000000009
<-- KSTREAM-SOURCE-0000000007
Processor: KSTREAM-KEY-SELECT-0000000009 (stores: [])
--> KSTREAM-FILTER-0000000012
<-- KSTREAM-LEFTJOIN-0000000008
Processor: KSTREAM-FILTER-0000000012 (stores: [])
--> KSTREAM-SINK-0000000011
<-- KSTREAM-KEY-SELECT-0000000009
Source: KSTREAM-SOURCE-0000000001 (topics: [song-feed6])
--> KTABLE-SOURCE-0000000002
Sink: KSTREAM-SINK-0000000011 (topic: song-play-count6-repartition)
<-- KSTREAM-FILTER-0000000012
Processor: KTABLE-SOURCE-0000000002 (stores: [all-songs6])
--> none
<-- KSTREAM-SOURCE-0000000001
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000013 (topics: [song-play-count6-repartition])
--> KSTREAM-AGGREGATE-0000000010
Processor: KSTREAM-AGGREGATE-0000000010 (stores: [song-play-count6])
--> KTABLE-SELECT-0000000014, KTABLE-SELECT-0000000018
<-- KSTREAM-SOURCE-0000000013
Processor: KTABLE-SELECT-0000000014 (stores: [])
--> KSTREAM-SINK-0000000015
<-- KSTREAM-AGGREGATE-0000000010
Processor: KTABLE-SELECT-0000000018 (stores: [])
--> KSTREAM-SINK-0000000019
<-- KSTREAM-AGGREGATE-0000000010
Sink: KSTREAM-SINK-0000000015 (topic: top-five-songs-by-genre6-repartition)
<-- KTABLE-SELECT-0000000014
Sink: KSTREAM-SINK-0000000019 (topic: top-five-songs6-repartition)
<-- KTABLE-SELECT-0000000018
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000016 (topics: [top-five-songs-by-genre6-repartition])
--> KTABLE-AGGREGATE-0000000017
Processor: KTABLE-AGGREGATE-0000000017 (stores: [top-five-songs-by-genre6])
--> none
<-- KSTREAM-SOURCE-0000000016
Sub-topology: 4
Source: KSTREAM-SOURCE-0000000020 (topics: [top-five-songs6-repartition])
--> KTABLE-AGGREGATE-0000000021
Processor: KTABLE-AGGREGATE-0000000021 (stores: [top-five-songs6])
--> none
<-- KSTREAM-SOURCE-0000000020
实际 子拓扑 有5个 。 共起来20个任务 每个任务对应一个或多个分片 , Sub-topology: 0 对应其中的4个任务 具体如下图
image.png
0_0 第一个0为拓扑分组id , 第二0 为任务号
也就是说 拓扑 0 有 0 ,1, 2,3 任务 , 拓扑 1 拓扑2 拓扑3 类似
任务 作为 跨节点的 执行 的一个独立单位 , 必须要保证 co-partitioning , 简单说 , topic 1 , topic 2 , 具有相同分片数 ,相同分片规则 。那么
topic 1 分片 1 一定是 可以与 topic 2 分片 1 相关联的 。 也就是说 一个任务 涉及到的 多个topic 分片 一定是 co-partitioning 的 。 如果不是 那显然是有问题的 。
实际的拓扑类似于下图
网状网络拓扑结构 (1).png
上图 感觉上 很复杂 ,简单说下 拓扑0 的4个任务 可能都在instance1 ,以任务纬度分配任务 , 以kafka 分区 作为数据解耦,任何一个任务 在任何一个实例上都无所谓 ,他作为消费者 ,他只需要关心它的source 对应的分区就可以了 。 当然还有他的statechangelog 以及状态存储db 相关的设计 。
kafka stream 默认 是 rocksdb 。也就是不同instance 会在 不同机器的指定目录上 为 不同任务创建存储 。如同下图 就是某一个instance rocksdb 根据任务的存储目录
xiaoheideMacBook-Pro:kafka-music-charts6 xiaohei$ ll
total 0
drwxr-xr-x 9 xiaohei wheel 306 Aug 28 13:00 ./
drwxr-xr-x 3 xiaohei wheel 102 Aug 24 12:20 ../
drwxr-xr-x 4 xiaohei wheel 136 Aug 28 14:12 0_3/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 1_0/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 1_1/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 1_2/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 2_0/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 2_1/
drwxr-xr-x 5 xiaohei wheel 170 Aug 28 14:12 3_0/
当某个instance down 掉 后 ,同一台机器剩下instance 会获取down 掉 instance 的task ,当然包括他的数据库访问 ,down 掉 instance 正在处理任务的数据会怎么样呢 , 其实也不会丢失 。任务对应的分片消费数据应该是没有被提交的 ,获取task的instance 会重新消费这些数据
有个问题
不同机器上 ,instance 获取到了该任务 , 如果task 是有状态 ,而这台机器是没有task之前的数据的。这时候 怎么办呢statechangelog 分区 就发挥作用了 ,从statechangelog 任务分区 获取数据 ,重建本地状态 。如果数据量很大 ,只怕也是需要很长的时间恢复的吧
以上有些地方 是我yy的 以后有时间 会有源码解析部分 推出的