大数据 爬虫Python AI Sql大数据大数据,机器学习,人工智能

深入Kafka系列(一) Producer 开发

2019-01-14  本文已影响3人  北邮郭大宝

Kafka是目前业界最经典的消息引擎,之前在学习工作中只是对基本的原理和使用方法有一点了解。寒假稍微有一点空余时间,想对Kafka有一个更深入的了解。

本系列是对胡夕《Apache Kafka实战》一书的学习笔记,会跟随作者的安排,逐章对Kafka的设计细节做介绍。《Apache Kafka实战》这本书写的很棒,一看就是行家用心写出来的。如果想深入学习Kafka的同学可以考虑入手一本。

言归正传,基本的Kafka入门知识和概念这里就不赘述。欢迎参考我的同学沙利民关于Kafka的入门知识总结。《Kafka从入门到实践》

本系列直接从Producer开发开始。Kafka自0.9.0.0版本后,就启用了新设计的Java版的Producer取代旧的Scala版本的Producer,本文介绍的也是新版本。

1. Producer工作流程

Producer的工作流程可以简单概述为三步:

Producer需要完成的任务就是对消息进行分区,以及确定分区的leader。

2. 构造Producer

2.1 代码实例

image

基本的Producer构造代码如图所示,基本步骤有:

2.2 详细解释

构造Properties对象:使用Properties指定参数,有

还有其他的参数,自己看官网吧。

构造KafkaProducer对象:在Properties里完成参数设置后,就可以构造KafkaProducer对象了。

构造ProducerRecord对象:需要将topic和value信息包装在ProducerRecord对象中,key可选。

发送消息:Kafka发送消息有两种方式:

关闭producer:producer占用了大量系统资源,使用完后必须关闭。

3. 消息分区策略

3.1 默认的分区策略

当消息指定key时,使用murmur2算法计算哈希值,然后由哈希值对总分区数求模后找到目标分区号,此时完成分区操作,相同的key的所有消息分配到相同的分区。

当没有指定key是,partitioner根据轮询的方式确保所有分区均匀。

3.2 自定义分区实例

image

这个自定义分区的实例实现了,当key包含"audit"字符串时,该消息发送到最后一个分区,其他消息按照随机的策略发送到其他分区。

4. producer拦截器

4.1 拦截器

拦截器interceptor可以实现消息发送前、producer回调逻辑前对消息做一些定制化需求,比如修改消息等。interceptor通过接口ProducerInterceptor实现,主要有两个方法:

4.2 实例

4.1.2 onSend实例

image

上图是自定义了一个拦截器,在消息发送之前对消息做了修改,在value值添加了时间戳。然后需要在Properties参数配置中按下图的方式添加。

image

结果:

image

4.1.2 onAcknowledgement实例

image

该拦截器实现了消息发送后更新”发送成功消息数“和“发送失败消息数”。添加方式一样,此时可以构成双interceptor的拦截链。

image

结果:

image

在《Apache Kafka实战》中,还介绍了消息序列化、无消息丢失配置、消息压缩等关于Kafka Producer的介绍,需要的同学可以参阅。

上一篇 下一篇

猜你喜欢

热点阅读