springboot整合kafka
2020-04-27 本文已影响0人
小草莓子桑
简单总结下在springboot整合kafka的过程
springboot整合kafka
首先pom引入spring对kafka的支持
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.5.RELEASE</version>
</dependency>
spring kafka
配置文件
配置文件的解释就直接写到代码注释里面了
spring:
kafka:
##### 生产者配置
producer:
# 指定kafka地址,可以多个,用逗号隔开
bootstrap-servers: ip1:port1,ip2:port2,ip3:port3
#生产者等待服务器ack数量
#acks:0 生产者发送消息不会等待服务器ack
#acks:1 生产者发送消息后,会等待服务器ack,不用等待服务器所有副本,只要有一个副本ack即可
#acks = all 生产者发送消息后,会等待服务器ack,需要等到所有副本ack
# #可以设置的值为:all, 0, 1
acks: 1
# 生产者批量处理的大小字节数
batch-size: 15654
#生产者缓存区的大小,默认值为33554432字节
buffer-memory: 33554432
#生产者的id,服务端会拿这个id
client-id: 123
#key的序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#失败重试的次数
retries: 3
#生产者生成的数据压缩格式 可以设置为 gzip,snappy等
#uncompressed表示没有压缩
#producer表示保留生产者原始设置 默认为producer
compression-type: producer
##### 消费者配置
consumer:
#指定kafka地址,可以多个,用逗号隔开
bootstrap-servers: ip1:port1,ip2:port2,ip3:port3
#消费者是否定期自动提交偏移量
enable-auto-commit: true
#消费者自动提交偏移量的频率 单位是毫秒
auto-commit-interval: 3000
#消费者的id,服务端会拿这个id
client-id: 123
#消费者拉取的最小数据量,单位字节
fetch-min-size: 1
#在满足fetch-min-size设置的值,阻塞的最长时间,单位是毫秒
fetch-max-wait: 3000
#服务器上不再存在当前偏移量,默认值为latest,表示自动将偏移重置为最新的偏移量
auto-offset-reset: latest
#用于标识消费者的唯一标识
group-id: consumer1
#心跳时间,单位为毫秒
heartbeat-interval: 3000
#消费者使用的key反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#消费者使用的value反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#poll操作时返回的最大记录数,默认值为500
max-poll-records: 5000
生产者代码
主要使用的KafkaTemplate中的api,进行消息发送,这里KafkaTemplate里面的方法就不一一说了,有时间在统一说吧,就来一段简单的代码
/**
* 生产者
* @date 2020/4/27
*/
@Component
public class Product {
//注入KafkaTemplate对象
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMsg(String topic,Object msg){
//这里发送的消息可以直接使用Object
//因为我们在配置文件中已经制定序列化的类
kafkaTemplate.send(topic,msg);
}
}
消费代码
主要使用的KafkaTemplate中的api,进行消息消费,这里KafkaTemplate里面的方法就不一一说了,有时间在统一说吧,就来一段简单的代码
/**
* 消费者代码
* @date 2020/4/27
*/
@Component
public class Consumer {
@KafkaListener(topics = "生产者生产的topic")
public void consumer(ConsumerRecord record){
Optional<Object> optional = Optional.ofNullable(record.value());
if(optional.isPresent()){
//o对象就是我们接收到要消费的消息
//o对象已经使用配置文件中配置反序列化类反序列化了
Object o = optional.get();
}
}
}
- 这里说一下@KafkaListener(topics = "")这个注解,topics属性就是设置要消费的topic
- 再说一下Optional对象的isPresent()方法与get()方法,当Optional中value存在时,isPresent()返回true,get()可以得到具体的值
springboot整合kafka的过程就为大家简单分享到这里,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!