SpringBoot集成Kafka消息队列(JSON序列化和反序

2022-05-06  本文已影响0人  木木与呆呆

1.说明

上文SpringBoot集成Kafka消息队列介绍了
SpringBoot集成Kafka的方法,
其生产者和消费者的发送和接收的是字符串,
本文介绍使用JSON序列化和反序列化对象的方法,
即生产者发送一个对象的实例,
消费者能够接收到一个对象的实例。

2.引入依赖

在 pom.xml 中引入Spring Kafka版本,
完整pom如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.yuwen.spring</groupId>
    <artifactId>MessageQueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <artifactId>kafka-json</artifactId>
  <description>Spring Boot使用spring-kafka消息队列,使用JSON序列化和反序列化对象</description>
  
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
</project>

具体的版本号建议通过spring-boot-dependencies管理:

<properties>
    <spring-boot.version>2.3.1.RELEASE</spring-boot.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

3.配置

新建applicaion.yml,
新增如下kafka的相关配置,
完整applicaion.yml配置:

server:
  port: 8028 

spring:
  kafka:
    # kafka连接接地址
    bootstrap-servers: localhost:9092
    # 生产者配置
    producer:
      # 序列化key的类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      # 反序列化value的类
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer 
    # 消费者配置    
    consumer:
      # 反序列化key的类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      # 反序列化value的类  
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer   
      # 消费者所属消息组
      group-id: testGroup 
      # 从头开始消费,配合不同的group id
      auto-offset-reset: earliest
      # 表示接受反序列化任意的类,也可限定包路径
      properties:
        spring:
          json: 
            trusted:
              packages: '*'

注意配置spring.json.trusted.packages受信任的类所在的路径,
即需要发送和接收的对象类所在的包路径。

4.开发代码

新建KafkaMQApplication.java启动类,
注意要新增 @EnableKafka 注解:

package com.yuwen.spring.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class KafkaMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaMQApplication.class, args);
    }
}

用户信息类

新建UserInfo.java类,
作为在Kafka中传输的对象类:

package com.yuwen.spring.kafka.entity;

public class UserInfo {
    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "UserInfo [name=" + name + ", age=" + age + "]";
    }
}

生产者发送消息

Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:

package com.yuwen.spring.kafka.provider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.yuwen.spring.kafka.entity.UserInfo;

@Service
public class ProviderService {
    public static final String TOPIC = "userTopic";

    @Autowired
    private KafkaTemplate<?, UserInfo> kafkaTemplate;

    public void send(UserInfo user) {
        // 发送消息
        kafkaTemplate.send(TOPIC, user);
        System.out.println("Provider= " + user);
    }
}

注意指定 topic ,
以及发送的内容是UserInfo类。

消费者接收消息

新增ConsumerService.java类,
注意使用 @KafkaListener 注解,
接收方法的入参是UserInfo类:

package com.yuwen.spring.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.yuwen.spring.kafka.entity.UserInfo;
import com.yuwen.spring.kafka.provider.ProviderService;

@Service
public class ConsumerService {

    @KafkaListener(topics = ProviderService.TOPIC)
    public void receive(UserInfo user) {
        System.out.println("Consumer= " + user);
    }
}

5.自动产生消息

为了测试生产者产生消息,
编写AutoGenerate.java,
自动生成UserInfo类的实例,
作为生产者向kafka发送消息:

package com.yuwen.spring.kafka.provider;

import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.yuwen.spring.kafka.entity.UserInfo;

@Component
public class AutoGenerate implements InitializingBean {

    @Autowired
    private ProviderService providerService;

    @Override
    public void afterPropertiesSet() throws Exception {
        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {
                int age = 0;
                while (true) {
                    UserInfo user = new UserInfo();
                    user.setName("zhangsan");
                    user.setAge(++age);

                    providerService.send(user);
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        t.start();
    }

}

6.运行服务

运行KafkaMQApplication.java启动类,
输出如下日志,
可以看到生产者产生的随机字符串,
能够被消费者正确获取到:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.1.RELEASE)

2022-05-06 09:58:35.090  INFO 11564 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : Starting KafkaMQApplication on yuwen-asiainfo with PID 11564 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json)
2022-05-06 09:58:35.092  INFO 11564 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : No active profile set, falling back to default profiles: default
2022-05-06 09:58:36.585  INFO 11564 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8029 (http)
2022-05-06 09:58:36.593  INFO 11564 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-05-06 09:58:36.593  INFO 11564 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.36]
2022-05-06 09:58:36.676  INFO 11564 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-05-06 09:58:36.677  INFO 11564 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1550 ms
2022-05-06 09:58:36.805  INFO 11564 --- [     Thread-119] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [10.21.13.14:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2022-05-06 09:58:36.877  INFO 11564 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-05-06 09:58:36.903  INFO 11564 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2022-05-06 09:58:36.904  INFO 11564 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2022-05-06 09:58:36.904  INFO 11564 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1651802316901
2022-05-06 09:58:37.138  INFO 11564 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [10.21.13.14:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = testGroup
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2022-05-06 09:58:37.175  INFO 11564 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2022-05-06 09:58:37.175  INFO 11564 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2022-05-06 09:58:37.175  INFO 11564 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1651802317175
2022-05-06 09:58:37.177  INFO 11564 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): userTopic
2022-05-06 09:58:37.179  INFO 11564 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2022-05-06 09:58:37.197  INFO 11564 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8029 (http) with context path ''
2022-05-06 09:58:37.207  INFO 11564 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : Started KafkaMQApplication in 2.453 seconds (JVM running for 2.824)
2022-05-06 09:58:37.608  INFO 11564 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-05-06 09:58:37.659  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
2022-05-06 09:58:37.660  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
2022-05-06 09:58:37.844  INFO 11564 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
Provider= UserInfo [name=zhangsan, age=1]
2022-05-06 09:58:38.141  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 15: {consumer-testGroup-1-511d5998-e82a-4b8c-a338-5bccf28c92a6=Assignment(partitions=[userTopic-0])}
2022-05-06 09:58:38.270  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 15
2022-05-06 09:58:38.273  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: userTopic-0
2022-05-06 09:58:38.423  INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition userTopic-0 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
2022-05-06 09:58:38.424  INFO 11564 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroup: partitions assigned: [userTopic-0]
Consumer= UserInfo [name=zhangsan, age=1]
Provider= UserInfo [name=zhangsan, age=2]
Consumer= UserInfo [name=zhangsan, age=2]
Provider= UserInfo [name=zhangsan, age=3]
Consumer= UserInfo [name=zhangsan, age=3]

7.参考文章

Spring Boot Kafka - 序列化和反序列化JSON

上一篇 下一篇

猜你喜欢

热点阅读