数据工程师的日常kafka Streamspark||flink||scala

Kafka Streams: KStream和KTable

2018-05-09  本文已影响322人  表现力

KStream和KTable是Kafka Streams里内建的两个最重要的抽象,分别对应数据流和数据库。Kafka Streams作为流处理技术的一大卖点,即是很好地将存储状态的表(table)和作为记录的流(stream)无缝地结合在了一起。

KStream

数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件(stream of events),这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

KStream的构建方法:

builder.stream()

KTable

传统数据库,包含了各种存储了大量状态(state)的表格。

KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(upsert)

KTable的构建方法:

builder.table()

KStream和KTable之间的关系

事务日志记录了所有对数据库的更改。数据库保存了日志中最新的记录。数据库就是日志子集的一个缓存,记录了最新数据的子集。

KStream可以看作是KTable的更新日志(changlog),数据流中的每一个记录对应数据库中的每一次更新。

KTable 则可以看作KStream在某一时间点,每一个key对应的最新value的快照(snapshot)。


KStream和KTable之间的相互转换

将KTable转换成KStream

toStream() 方法

KStream<byte[], String> stream = table.toStream();

将KStream转换成KTable

方法1: groupByKey() + aggregation操作

KTable<String, Long> table = stream.groupByKey()
                                   .count();

方法2: 将KStream写回Kafka,再按KTable的格式读出

stream.to("topic0");

KTable<String, String> table = builder.table("topic0");

KStream和KTable不同的使用场景

将topic中数据经过aggregation操作后 ,用KTable来存储结果。

KTable与日志压缩(log compaction)

日志压缩可以作为性能提升的一种方式。
删除旧的key value 因为不需要了,只保留每个key的最后一次更新。
带来的优势是:可以快速得到最终状态 而不是每次更新 --- 崩溃后也只需恢复少量数据。

只应对KTable使用,不该对KStream使用。KStream中的每条数据都包含了一部分信息,删除会将这部分信息丢失。

需要手动在创建时对某个topic开启日志压缩: --config cleanup.policy=compact

删除不是立刻进行的,需要等待一个delete.retention.ms周期(默认为24小时)。

是一个单独的后台压缩线程,需要一定的内存开销。

上一篇下一篇

猜你喜欢

热点阅读