kafka——Stream API
一、Kafka 核心 API
下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型
Kafka的五类客户端API类型如下:
- AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似。
- Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API。
- Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API。
- Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景。
- Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB。
Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。
本文将从流式计算出发,之后介绍Kafka Streams的特点,最后探究Kafka Streams的架构。
二、Kafka Stream是什么
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署。
- 除了Kafka外,无任何外部依赖。
- 充分利用Kafka分区机制实现水平扩展和顺序性保证。
- 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义。
- 提供记录级的处理能力,从而实现毫秒级的低延迟。
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
二、什么是流式计算
流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果,也即计算结果在时间上也是无界的。(批量计算是全量的:拿到一批数据,计算一个结果;流式计算是增量的:数据持续输入,持续计算最新的结果)。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
批量处理模型中,一般先有全量数据集,然后定义计算逻辑,并将计算应用于全量数据。特点是全量计算,并且计算结果一次性全量输出。
举个例子,统计电商网站一天中不同地区的订单量:
- 批量计算的方式:在一天过去之后(产生了固定的输入),扫描所有的订单,按照地区group并计数。
- 流式计算的方式:每产生一个订单,根据订单的地区进行计数。
流式计算相对于批量计算会有更好的实时性,倾向于先确定计算目标,在数据到来之后将计算逻辑应用到数据上。
流式计算和实时计算
流式计算的实时性较高,有时候容易和实时计算混淆。
流式计算对比的对象应该是批量计算,而实时计算对应离线计算。
流式计算强调的是计算的方式,而事实计算则强调计算结果的响应时间。
比如统计订单量,流式计算的方式是有一个计数,每来一笔订单就对这个计数加1。实时计算则是在在某个时刻计算一次当前时刻之前已经产生的所有订单量,比如在MySQL中执行一次Count操作。
三、Kafka Streams是什么
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.
Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异(得益于Kafka Streams是一个客户端类库且运行只依赖与Kafka环境),可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特点:
- 被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中。
- 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性。
- 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations)。
- 支持exactly-once语义。
- 支持纪录级的处理,实现毫秒级的延迟。
- 提供High-Level的Stream DSL和Low-Level的Processor API。
四、为什么要有Kafka Stream
当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?笔者认为主要有如下原因。
第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。更为重要的是,Kafka Stream充分利用了Kafka的分区机制和Consumer的Rebalance机制,使得Kafka Stream可以非常方便的水平扩展,并且各个实例可以使用不同的部署方式。具体来说,每个运行Kafka Stream的应用程序实例都包含了Kafka Consumer实例,多个同一应用的实例之间并行处理数据集。而不同实例之间的部署方式并不要求一致,比如部分实例可以运行在Web容器中,部分实例可运行在Docker或Kubernetes中。
第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。
第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。
第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。
五、Kafka Stream架构
Kafka Stream整体架构
Kafka Stream的整体架构图如下所示。
目前(Kafka 0.11.0.0)Kafka Stream的数据源只能如上图所示是Kafka。但是处理结果并不一定要如上图所示输出到Kafka。实际上KStream和Ktable的实例化都需要指定Topic。
KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");
另外,上图中的Consumer和Producer并不需要开发者在应用中显示实例化,而是由Kafka Stream根据参数隐式实例化和管理,从而降低了使用门槛。开发者只需要专注于开发核心业务逻辑,也即上图中Task内的部分。
六、KStream和Ktable演示
6.1、导入相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
public class StreamSample {
private static final String INPUT_TOPIC = "yibo_stream_in";
private static final String OUTPUT_TOPIC = "yibo_stream_out";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//如何构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
//构建wordcount processor
wordcountStream(builder);
KafkaStreams streams = new KafkaStreams(builder.build(),properties);
streams.start();
}
//如何定义流计算过程
public static void wordcountStream(StreamsBuilder builder){
//KStream 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
KStream<String,String> source = builder.stream(INPUT_TOPIC);
//KTable 是数据集合的抽象对象,
KTable<String, Long> count = source
//flatMapValues 数据拆分,将一行数据拆分为多行数据
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
//合并 按value值进行合并
.groupBy((key, value) -> value)
//统计出现的总数
.count();
//将结果输入到OUTPUT_TOPIC中
count.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long()));
}
//如何定义流计算过程
public static void wordcountStream2(StreamsBuilder builder){
//KStream 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
KStream<String,String> source = builder.stream(INPUT_TOPIC);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value) -> System.out.println(key + " : " + value));
}
}