程序员开源框架-SpringBoot系列程序员小天地

springboot整合kafka

2020-04-27  本文已影响0人  小草莓子桑

简单总结下在springboot整合kafka的过程

springboot整合kafka

首先pom引入spring对kafka的支持

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.4.5.RELEASE</version>
        </dependency>
spring kafka

配置文件

配置文件的解释就直接写到代码注释里面了

spring:
  kafka:
    ##### 生产者配置
    producer:
      # 指定kafka地址,可以多个,用逗号隔开
      bootstrap-servers: ip1:port1,ip2:port2,ip3:port3
      #生产者等待服务器ack数量
      #acks:0 生产者发送消息不会等待服务器ack
      #acks:1 生产者发送消息后,会等待服务器ack,不用等待服务器所有副本,只要有一个副本ack即可
      #acks = all 生产者发送消息后,会等待服务器ack,需要等到所有副本ack
      #    #可以设置的值为:all, 0, 1
      acks: 1
      # 生产者批量处理的大小字节数
      batch-size: 15654
      #生产者缓存区的大小,默认值为33554432字节
      buffer-memory: 33554432
      #生产者的id,服务端会拿这个id
      client-id: 123
      #key的序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #失败重试的次数
      retries: 3
      #生产者生成的数据压缩格式 可以设置为 gzip,snappy等
      #uncompressed表示没有压缩
      #producer表示保留生产者原始设置 默认为producer
      compression-type: producer
    #####  消费者配置
    consumer:
      #指定kafka地址,可以多个,用逗号隔开
      bootstrap-servers: ip1:port1,ip2:port2,ip3:port3
      #消费者是否定期自动提交偏移量
      enable-auto-commit: true
      #消费者自动提交偏移量的频率 单位是毫秒
      auto-commit-interval: 3000
      #消费者的id,服务端会拿这个id
      client-id: 123
      #消费者拉取的最小数据量,单位字节
      fetch-min-size: 1
      #在满足fetch-min-size设置的值,阻塞的最长时间,单位是毫秒
      fetch-max-wait: 3000
      #服务器上不再存在当前偏移量,默认值为latest,表示自动将偏移重置为最新的偏移量
      auto-offset-reset: latest
      #用于标识消费者的唯一标识
      group-id: consumer1
      #心跳时间,单位为毫秒
      heartbeat-interval: 3000
      #消费者使用的key反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #消费者使用的value反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #poll操作时返回的最大记录数,默认值为500
      max-poll-records: 5000

生产者代码

主要使用的KafkaTemplate中的api,进行消息发送,这里KafkaTemplate里面的方法就不一一说了,有时间在统一说吧,就来一段简单的代码

/**
 * 生产者
 * @date 2020/4/27
 */
@Component
public class Product {

    //注入KafkaTemplate对象
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMsg(String topic,Object msg){
        //这里发送的消息可以直接使用Object
        //因为我们在配置文件中已经制定序列化的类
        kafkaTemplate.send(topic,msg);
    }
}

消费代码

主要使用的KafkaTemplate中的api,进行消息消费,这里KafkaTemplate里面的方法就不一一说了,有时间在统一说吧,就来一段简单的代码

/**
 * 消费者代码
 * @date 2020/4/27
 */
@Component
public class Consumer {

    @KafkaListener(topics = "生产者生产的topic")
    public void consumer(ConsumerRecord record){
        Optional<Object> optional = Optional.ofNullable(record.value());
        if(optional.isPresent()){
            //o对象就是我们接收到要消费的消息
            //o对象已经使用配置文件中配置反序列化类反序列化了
            Object o = optional.get();
        }
    }
}

springboot整合kafka的过程就为大家简单分享到这里,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

上一篇下一篇

猜你喜欢

热点阅读