kafka stream 拓扑解释

2018-12-15  本文已影响0人  panpanliuBJ

目的

我在初次接触kafka stream 拓扑感觉很简单,但是代码中加上group,aggregate 等操作时,会有repartition和 local store等操作,此时的topology 就稍稍复杂了。国内外关于此方面知识比较少,所以只能阅读官方文档,希望此分享对读者有所帮助。

基本概念

ps:基本概念就直接搬官方文档,翻译反而变味,抱歉啊,各位!!!

  • A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.

Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

image.png

实例详解

看懂kafka stream应用,首先需要看懂topology结构。
下面我们以一个具体实例来分析,代码见github (欢迎star)

StreamsStarter: kafka stream 启动器

@Slf4j
@Component
public class StreamsStarter {

   // 省略......


    @PostConstruct
    public void startUp() {
        final StreamsBuilder builder = new StreamsBuilder();
        buildTopology(builder);
        final Topology topology = builder.build();
        log.info("topology={}", topology.describe());
        kafkaStreams = new KafkaStreams(topology, kafkaProperties.getStreamProperties());
        run();
    }

    private void buildTopology(StreamsBuilder builder) {

        UserShareStream userShareStream = new UserShareStream(kafkaProperties, coinInfo);
        userShareStream.buildTopology(builder);
    }

    private void run() {
        try {
            kafkaStreams.start();
        } catch (Exception ex) {
            log.error("kafkaStreams start error", ex);
        }
    }

   ......
}

UserShareStream: stream topology 构建,

@Slf4j
public class UserShareStream {


    void buildTopology(StreamsBuilder builder) {

        final String flattenTopic = String.format("%s-flatten-share", coinInfo.getCoinType());

        try {
            final MiningTopologyMeta flattenMeta = MiningTopologyMeta.builder()
                .sourceTopic(kafkaProperties.getTopic())
                .toTopic(flattenTopic)
                .build();
            //user share 铺平
            final FlattenTopology flattenTopology = new FlattenTopology(flattenMeta);
            flattenTopology.buildTopology(builder);

            final MiningTopologyMeta user5MinutesMeta = MiningTopologyMeta.builder()
                .sourceTopic(flattenMeta.getToTopic())
                .timeUnit(TimeUnit.MINUTES)
                .timeValue(5)
                .coinType(coinInfo.getCoinType())
                .granularityType(GranularityType.MIN5)
                .build();
           //聚合5min share
            final UserSubTopology user5MinSubTopology = new UserSubTopology(user5MinutesMeta);
            //订阅flatten topo,是其下游
            flattenTopology.addTopology(user5MinSubTopology);

            final MiningTopologyMeta user15MinutesMeta = MiningTopologyMeta.builder()
                .sourceTopic(flattenMeta.getToTopic())
                .timeUnit(TimeUnit.MINUTES)
                .timeValue(15)
                .coinType(coinInfo.getCoinType())
                .granularityType(GranularityType.MIN15)
                .build();
           //聚合15min share
            final UserSubTopology user15MinSubTopology = new UserSubTopology(user15MinutesMeta);
            flattenTopology.addTopology(user15MinSubTopology);

            final MiningTopologyMeta userHoursMeta = MiningTopologyMeta.builder()
                .sourceTopic(flattenMeta.getToTopic())
                .timeUnit(TimeUnit.HOURS)
                .timeValue(1)
                .coinType(coinInfo.getCoinType())
                .granularityType(GranularityType.HOUR)
                .build();
           //聚合1hour share
            final UserSubTopology userHoursSubTopology = new UserSubTopology(userHoursMeta);
            flattenTopology.addTopology(userHoursSubTopology);

            flattenTopology.notifyAll(builder);

        } catch (Exception ex) {
            log.error("fail to build topology|exception:", ex);
        }
    }
}

FlattenTopology: 将数据铺平

@Slf4j
public class FlattenTopology implements TopologyObservable {

    protected final MiningTopologyMeta meta;
    private List<TopologyObserver> topoObservers;
    private KStream<String, MiningData> stream;


    public FlattenTopology(MiningTopologyMeta meta) {
        this.meta = meta;
        topoObservers = Lists.newArrayList();
    }

    @Override
    public void addTopology(TopologyObserver observer) {
        topoObservers.add(observer);
    }

    public void buildTopology(StreamsBuilder builder) {
        log.info("build {} topology ...", this.getClass().getSimpleName());
        KStream<String, UserShareData> source = StreamUtil.buildSourceKStream(builder, meta.getSourceTopic(), UserShareData.class);

        source.filter((k, v) -> v != null)
            .flatMapValues(value -> value.getData())
            .filter((k, v) -> v != null)
            .to(meta.getToTopic(), Produced.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)));

        stream = StreamUtil.buildSourceKStream(builder, meta.getToTopic(), MiningData.class);
    }

    public void notifyAll(StreamsBuilder builder) {
        for (TopologyObserver observer : topoObservers) {
            observer.notify(builder, stream);
        }
    }
}

UserSubTopology:不同时间窗口的数据聚合

@Slf4j
public class UserSubTopology extends AbstractTopology<MiningData> implements
    TopologyObserver<MiningData> {

    public static final String USER = "user";

    public UserSubTopology(MiningTopologyMeta meta) {
        super(meta, MiningData.class);
    }

    @Override
    public void notify(StreamsBuilder builder, KStream<String, MiningData> stream) {
        aggregateToTopic(stream, USER);
    }

    @Override
    public KStream<Windowed<String>, MiningData> doAggregate(KStream<String, MiningData> source, String storeName, String toTopic) {
        long windowSizeMs = getWindowSizeMs();
        //已经是groupBy之后的结果
        KStream<Windowed<String>, MiningData> summaryKStream = source.filter((k, v) -> v != null)
            .map((k, v) -> new KeyValue<>(v.getKey(), v))
            .groupBy((k, v) -> k, Serialized.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)))
            .windowedBy(TimeWindows.of(windowSizeMs).advanceBy(windowSizeMs))
            .aggregate(() -> new MiningData(),
                (aggKey, newValue, aggValue) -> aggValue.add(newValue, toMillis(meta.getTimeUnit(), meta.getTimeValue())),
                Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))
            .toStream();
        return summaryKStream;
    }

    @Override
    public void postAggregate(KStream<Windowed<String>, MiningData> stream, String toTopic) {
        stream.foreach((key, value) -> log.info("toTopic={}|key={}|value={}", toTopic, key, value));
    }

}

省略部分代码
MiningData.java
UserShareData

2018-12-15 10:16:55.941|INFO |main|c.f.f.s.k.t.StreamsStarter|37|topology=Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [btc-share])
      --> KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-FLATMAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: [])
      --> KSTREAM-FILTER-0000000003
      <-- KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-FLATMAPVALUES-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: btc-flatten-share)
      <-- KSTREAM-FILTER-0000000003

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
      --> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-MAP-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-MAP-0000000017
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-FILTER-0000000026 (stores: [])
      --> KSTREAM-MAP-0000000027
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000008
      <-- KSTREAM-FILTER-0000000006
    Processor: KSTREAM-MAP-0000000017 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000018
      <-- KSTREAM-FILTER-0000000016
    Processor: KSTREAM-MAP-0000000027 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000028
      <-- KSTREAM-FILTER-0000000026
    Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000011
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-KEY-SELECT-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000021
      <-- KSTREAM-MAP-0000000017
    Processor: KSTREAM-KEY-SELECT-0000000028 (stores: [])
      --> KSTREAM-FILTER-0000000031
      <-- KSTREAM-MAP-0000000027
    Processor: KSTREAM-FILTER-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-KEY-SELECT-0000000008
    Processor: KSTREAM-FILTER-0000000021 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-KEY-SELECT-0000000018
    Processor: KSTREAM-FILTER-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000030
      <-- KSTREAM-KEY-SELECT-0000000028
    Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
      <-- KSTREAM-FILTER-0000000011
    Sink: KSTREAM-SINK-0000000020 (topic: user-share-store-min15-repartition)
      <-- KSTREAM-FILTER-0000000021
    Sink: KSTREAM-SINK-0000000030 (topic: user-share-store-hour-repartition)
      <-- KSTREAM-FILTER-0000000031

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000012 (topics: [user-share-store-min5-repartition])
      --> KSTREAM-AGGREGATE-0000000009
    Processor: KSTREAM-AGGREGATE-0000000009 (stores: [user-share-store-min5])
      --> KTABLE-TOSTREAM-0000000013
      <-- KSTREAM-SOURCE-0000000012
    Processor: KTABLE-TOSTREAM-0000000013 (stores: [])
      --> KSTREAM-FOREACH-0000000015, KSTREAM-SINK-0000000014
      <-- KSTREAM-AGGREGATE-0000000009
    Processor: KSTREAM-FOREACH-0000000015 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000013
    Sink: KSTREAM-SINK-0000000014 (topic: btc-user-share-min5)
      <-- KTABLE-TOSTREAM-0000000013

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000022 (topics: [user-share-store-min15-repartition])
      --> KSTREAM-AGGREGATE-0000000019
    Processor: KSTREAM-AGGREGATE-0000000019 (stores: [user-share-store-min15])
      --> KTABLE-TOSTREAM-0000000023
      <-- KSTREAM-SOURCE-0000000022
    Processor: KTABLE-TOSTREAM-0000000023 (stores: [])
      --> KSTREAM-FOREACH-0000000025, KSTREAM-SINK-0000000024
      <-- KSTREAM-AGGREGATE-0000000019
    Processor: KSTREAM-FOREACH-0000000025 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000023
    Sink: KSTREAM-SINK-0000000024 (topic: btc-user-share-min15)
      <-- KTABLE-TOSTREAM-0000000023

  Sub-topology: 4
    Source: KSTREAM-SOURCE-0000000032 (topics: [user-share-store-hour-repartition])
      --> KSTREAM-AGGREGATE-0000000029
    Processor: KSTREAM-AGGREGATE-0000000029 (stores: [user-share-store-hour])
      --> KTABLE-TOSTREAM-0000000033
      <-- KSTREAM-SOURCE-0000000032
    Processor: KTABLE-TOSTREAM-0000000033 (stores: [])
      --> KSTREAM-FOREACH-0000000035, KSTREAM-SINK-0000000034
      <-- KSTREAM-AGGREGATE-0000000029
    Processor: KSTREAM-FOREACH-0000000035 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000033
    Sink: KSTREAM-SINK-0000000034 (topic: btc-user-share-hour)
      <-- KTABLE-TOSTREAM-0000000033

ps:上述代码堆的有点多,为了讲清楚topo,请谅解啊,各位。
下面我们依次分析各个Sub-topology

Sub-topology: 0

 Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [btc-share])
      --> KSTREAM-FILTER-0000000001     
    Processor: KSTREAM-FILTER-0000000001 (stores: [])  //source.filter((k, v) -> v != null)
      --> KSTREAM-FLATMAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: []) //.flatMapValues(value -> value.getData())
      --> KSTREAM-FILTER-0000000003
      <-- KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000003 (stores: []) //.filter((k, v) -> v != null)
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-FLATMAPVALUES-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: btc-flatten-share) //.to(meta.getToTopic(), Produced.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)));
      <-- KSTREAM-FILTER-0000000003

Source Processor(topic: btc-share) 经过一系列操作,最终到Sink Processor(btc-flatten-share)

以Processor: KSTREAM-FILTER-0000000001为例

 Processor: KSTREAM-FILTER-0000000001 (stores: [])  
      --> KSTREAM-FLATMAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000

这样看,是不是很简单?
如果没有问题,那么我们接下看,稍复杂点的topo

Sub-topology: 1

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
      --> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-MAP-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-MAP-0000000017
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-FILTER-0000000026 (stores: [])
      --> KSTREAM-MAP-0000000027
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000008
      <-- KSTREAM-FILTER-0000000006
    Processor: KSTREAM-MAP-0000000017 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000018
      <-- KSTREAM-FILTER-0000000016
    Processor: KSTREAM-MAP-0000000027 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000028
      <-- KSTREAM-FILTER-0000000026
    Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000011
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-KEY-SELECT-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000021
      <-- KSTREAM-MAP-0000000017
    Processor: KSTREAM-KEY-SELECT-0000000028 (stores: [])
      --> KSTREAM-FILTER-0000000031
      <-- KSTREAM-MAP-0000000027
    Processor: KSTREAM-FILTER-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-KEY-SELECT-0000000008
    Processor: KSTREAM-FILTER-0000000021 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-KEY-SELECT-0000000018
    Processor: KSTREAM-FILTER-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000030
      <-- KSTREAM-KEY-SELECT-0000000028
    Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
      <-- KSTREAM-FILTER-0000000011
    Sink: KSTREAM-SINK-0000000020 (topic: user-share-store-min15-repartition)
      <-- KSTREAM-FILTER-0000000021
    Sink: KSTREAM-SINK-0000000030 (topic: user-share-store-hour-repartition)
      <-- KSTREAM-FILTER-0000000031

还是和分析思路一样,Source Processor:btc-flatten-share, Sink Processor分别为:user-share-store-min5-repartition,user-share-store-min15-repartition,user-share-store-hour-repartition
什么?发生了什么?在这里,大家可以思考下如下问题:

// UserShareStream中,三个时间窗口UserSubTopology分别订阅了上游的FlattenTopology
Source: KSTREAM-SOURCE-0000000005 (topics: [btc-flatten-share])
      --> KSTREAM-FILTER-0000000006, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000026 

由于三个下游topo都是同一套代码,所以我们挑选其中一个来分析其他topo。
KSTREAM-FILTER-0000000006为例:

//KStream<Windowed<String>, MiningData> summaryKStream = source.filter((k, v) -> v != null)
Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-MAP-0000000007
      <-- KSTREAM-SOURCE-0000000005

//.map((k, v) -> new KeyValue<>(v.getKey(), v))
Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000008
      <-- KSTREAM-FILTER-0000000006
//.groupBy((k, v) -> k, Serialized.with(Serdes.String(), StreamUtil.jsonSerde(MiningData.class)))
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000011
      <-- KSTREAM-MAP-0000000007

Processor: KSTREAM-FILTER-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-KEY-SELECT-0000000008

Sink: KSTREAM-SINK-0000000010 (topic: user-share-store-min5-repartition)
      <-- KSTREAM-FILTER-0000000011

前面几个都比较好理解,最后groupBy不太好理解

Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey(). (KStream details, KTable details)
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes: Variants of groupBy exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resulting KGroupedStream or KGroupedTable do not match the configured default SerDes.
Note
Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins.
Always causes data re-partitioning: groupBy always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.

一般groupBy后,紧接着是aggregate 操作。
groupBy操作后,将topic btc-flatten-share多个partition的数据,根据key分组后,将数据相同key的数据写回同一partition,这也是上面问题3的答案。

分析到现在, Sub-topology: 1 与代码对应关系已经分析完了,大家有没有疑问?
为什么此topology 就这么结束了? 为什么aggregate操作没执行?
不要着急,我们下面继续分析!

题外话

笔者,之前生产遇到问题,怀疑是add方法未加锁,从而引起线程安全问题,上面的描述就消除此猜想,事实证明是另外消息时间超过窗口保留时间,从而引起stream 停止工作,我们以后再分析此问题。

        //...
            .aggregate(() -> new MiningData(),
                (aggKey, newValue, aggValue) -> aggValue.add(newValue, toMillis(meta.getTimeUnit(), meta.getTimeValue())),
                Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))```
```JAVA
public MiningData add(MiningData data, Long timeSpace) {
        if (data == null) {
            return this;
        }

        if (userId == null) {
            userId = data.getUserId();
        }

        if (createTime == null) {
            createTime = data.getCreateTime() / timeSpace * timeSpace + timeSpace;
        }

        this.share1Count = this.share1Count.add(data.getShare1Count());
        return this;
    }

Sub-topology: 2


  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000012 (topics: [user-share-store-min5-repartition])
      --> KSTREAM-AGGREGATE-0000000009
// .aggregate(() -> new MiningData(),
 //               (aggKey, newValue, aggValue) -> aggValue.add(newValue, //toMillis(meta.getTimeUnit(), meta.getTimeValue())),
//                Materialized.<String, MiningData, WindowStore<Bytes, byte[]>>as(storeName).withValueSerde(StreamUtil.jsonSerde(MiningData.class)))
    Processor: KSTREAM-AGGREGATE-0000000009 (stores: [user-share-store-min5])
      --> KTABLE-TOSTREAM-0000000013
      <-- KSTREAM-SOURCE-0000000012
// .toStream();
    Processor: KTABLE-TOSTREAM-0000000013 (stores: [])
      --> KSTREAM-FOREACH-0000000015, KSTREAM-SINK-0000000014
      <-- KSTREAM-AGGREGATE-0000000009
//postAggregate(...)
    Processor: KSTREAM-FOREACH-0000000015 (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000013
    Sink: KSTREAM-SINK-0000000014 (topic: btc-user-share-min5)
      <-- KTABLE-TOSTREAM-0000000013

Sub-topology 3 ,Sub-topology 4与Sub-topology 2类似
上面抛出的三个问题,在分析过程中已经解释过了,各位

总结

Reference

http://kafka.apache.org/21/documentation/streams/core-concepts
http://kafka.apache.org/21/documentation/streams/architecture.html#streams_architecture_state
http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#aggregating
http://kafka.apache.org/21/documentation/#streamsconfigs
http://kafka.apache.org/21/documentation/streams/developer-guide/running-app#state-restoration-during-workload-rebalance

上一篇 下一篇

猜你喜欢

热点阅读