Apache Kafkakafka Stream

kafka stream 拓扑 任务 分片 的关系疑惑以及YY

2018-08-23  本文已影响17人  pcgreat
网状网络拓扑结构.png
在看完 kafka stream 上一个example 后 Example 有4个分区 ,这里只画2个, 自己yy了一个音乐排行 处理的拓扑图
实际拓扑和我想的差不多 , 但是拓扑的分布是完全不同的 。 我的想法是 2个instance 均匀的获得整套 拓扑节点 。 但是kafka stream 不是这么做的 。
-----------Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [play-events6])
      --> KSTREAM-FILTER-0000000003
    Processor: KSTREAM-FILTER-0000000003 (stores: [])
      --> KSTREAM-MAP-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAP-0000000004 (stores: [])
      --> KSTREAM-FILTER-0000000006
      <-- KSTREAM-FILTER-0000000003
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-MAP-0000000004
    Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000004-repartition)
      <-- KSTREAM-FILTER-0000000006

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000004-repartition])
      --> KSTREAM-LEFTJOIN-0000000008
    Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [all-songs6])
      --> KSTREAM-KEY-SELECT-0000000009
      <-- KSTREAM-SOURCE-0000000007
    Processor: KSTREAM-KEY-SELECT-0000000009 (stores: [])
      --> KSTREAM-FILTER-0000000012
      <-- KSTREAM-LEFTJOIN-0000000008
    Processor: KSTREAM-FILTER-0000000012 (stores: [])
      --> KSTREAM-SINK-0000000011
      <-- KSTREAM-KEY-SELECT-0000000009
    Source: KSTREAM-SOURCE-0000000001 (topics: [song-feed6])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000011 (topic: song-play-count6-repartition)
      <-- KSTREAM-FILTER-0000000012
    Processor: KTABLE-SOURCE-0000000002 (stores: [all-songs6])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000013 (topics: [song-play-count6-repartition])
      --> KSTREAM-AGGREGATE-0000000010
    Processor: KSTREAM-AGGREGATE-0000000010 (stores: [song-play-count6])
      --> KTABLE-SELECT-0000000014, KTABLE-SELECT-0000000018
      <-- KSTREAM-SOURCE-0000000013
    Processor: KTABLE-SELECT-0000000014 (stores: [])
      --> KSTREAM-SINK-0000000015
      <-- KSTREAM-AGGREGATE-0000000010
    Processor: KTABLE-SELECT-0000000018 (stores: [])
      --> KSTREAM-SINK-0000000019
      <-- KSTREAM-AGGREGATE-0000000010
    Sink: KSTREAM-SINK-0000000015 (topic: top-five-songs-by-genre6-repartition)
      <-- KTABLE-SELECT-0000000014
    Sink: KSTREAM-SINK-0000000019 (topic: top-five-songs6-repartition)
      <-- KTABLE-SELECT-0000000018

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000016 (topics: [top-five-songs-by-genre6-repartition])
      --> KTABLE-AGGREGATE-0000000017
    Processor: KTABLE-AGGREGATE-0000000017 (stores: [top-five-songs-by-genre6])
      --> none
      <-- KSTREAM-SOURCE-0000000016

  Sub-topology: 4
    Source: KSTREAM-SOURCE-0000000020 (topics: [top-five-songs6-repartition])
      --> KTABLE-AGGREGATE-0000000021
    Processor: KTABLE-AGGREGATE-0000000021 (stores: [top-five-songs6])
      --> none
      <-- KSTREAM-SOURCE-0000000020

实际 子拓扑 有5个 。 共起来20个任务 每个任务对应一个或多个分片 , Sub-topology: 0 对应其中的4个任务 具体如下图


image.png

0_0 第一个0为拓扑分组id , 第二0 为任务号

也就是说 拓扑 0 有 0 ,1, 2,3 任务 , 拓扑 1 拓扑2 拓扑3 类似
任务 作为 跨节点的 执行 的一个独立单位 , 必须要保证 co-partitioning , 简单说 , topic 1 , topic 2 , 具有相同分片数 ,相同分片规则 。那么
topic 1 分片 1 一定是 可以与 topic 2 分片 1 相关联的 。 也就是说 一个任务 涉及到的 多个topic 分片 一定是 co-partitioning 的 。 如果不是 那显然是有问题的 。
实际的拓扑类似于下图


网状网络拓扑结构 (1).png

上图 感觉上 很复杂 ,简单说下 拓扑0 的4个任务 可能都在instance1 ,以任务纬度分配任务 , 以kafka 分区 作为数据解耦,任何一个任务 在任何一个实例上都无所谓 ,他作为消费者 ,他只需要关心它的source 对应的分区就可以了 。 当然还有他的statechangelog 以及状态存储db 相关的设计 。
kafka stream 默认 是 rocksdb 。也就是不同instance 会在 不同机器的指定目录上 为 不同任务创建存储 。如同下图 就是某一个instance rocksdb 根据任务的存储目录

xiaoheideMacBook-Pro:kafka-music-charts6 xiaohei$ ll
total 0
drwxr-xr-x  9 xiaohei  wheel  306 Aug 28 13:00 ./
drwxr-xr-x  3 xiaohei  wheel  102 Aug 24 12:20 ../
drwxr-xr-x  4 xiaohei  wheel  136 Aug 28 14:12 0_3/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 1_0/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 1_1/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 1_2/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 2_0/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 2_1/
drwxr-xr-x  5 xiaohei  wheel  170 Aug 28 14:12 3_0/

当某个instance down 掉 后 ,同一台机器剩下instance 会获取down 掉 instance 的task ,当然包括他的数据库访问 ,down 掉 instance 正在处理任务的数据会怎么样呢 , 其实也不会丢失 。任务对应的分片消费数据应该是没有被提交的 ,获取task的instance 会重新消费这些数据
有个问题
不同机器上 ,instance 获取到了该任务 , 如果task 是有状态 ,而这台机器是没有task之前的数据的。这时候 怎么办呢statechangelog 分区 就发挥作用了 ,从statechangelog 任务分区 获取数据 ,重建本地状态 。如果数据量很大 ,只怕也是需要很长的时间恢复的吧
以上有些地方 是我yy的 以后有时间 会有源码解析部分 推出的

上一篇下一篇

猜你喜欢

热点阅读