Kafka生产和消费

2020-11-15  本文已影响0人  五月笙

在演示生成与消费消息之前,需要创建一个主题作为消息载体。Kafka提供了需要实用的脚本工具,存放在$KAFKA_HOME/bin目录下,其中与主题有关的就是kafka-topics.sh。

创建主题

[#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 1 --partitions 4
Created topic topic-demo.

上面为单机模式下创建了一个分区数为4、副本因子为1的主题topic-demo,关键参数说明:

--zookeeper:Kafka所连接的ZooKeeper服务地址
--topic:所要创建主题的名称
--replication-factor:指定副本因子
--partitions:指定分区个数
--create:创建主题的动作指令
--describe:展示主题更多信息的动作指令

展示主题信息指令:

[#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
Topic: topic-demo   PartitionCount: 4   ReplicationFactor: 1    Configs:
    Topic: topic-demo   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-demo   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-demo   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-demo   Partition: 3    Leader: 0   Replicas: 0 Isr: 0

生产和消费

创建主题topic-demo后,需要检验Kafka是否可以正常的发送和消费消息。相应的,在$KAFKA_HOME/bin目录下提供了收发消息的脚本工具:kafka-console-consumer.sh和kafka-console-producer.sh。
启动生产者(consumer)生产消息:

[#] bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
>Hello,Kafka!
>

启动消费者(producer)订阅主题:

[#] bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo

Hello,Kafka!

发现消费者shell终端出现了刚刚生产者生产的消息 “Hello,Kafka!”,其中关键参数:

--broker-list:指定连接的Kafka地址
--bootstrap-server:同样为指定连接的Kafka地址
--topic:发送或者消费的主题

代码示例

Kafka的脚本工具一般用来做一些测试类的工作,在实际做复杂的的与业务逻辑相关的消息生产与消费的工作,那就需要通过编程手段来实施。

spring init -b=2.2.7.RELEASE -j=1.8 -d=web,kafka --build=gradle my-kafka && cd my-kafka

项目配置修改:

vim src/main/resources/application.properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
server.port=8001 

代码如下:

vim src/main/java/com/example/mykafka/DemoApplication.java
package com.example.mykafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

增加一个接口,方便消息发送:

vim src/main/java/com/example/mykafka/DemoController.java
package com.example.mykafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author by Remer
 * @date on 2020-11-17
 */
@RestController
public class DemoController {

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @GetMapping(path = "/send/msg/{what}")
    public void sendMessage(@PathVariable String what) {
        this.template.send("topic1", new Message(what));
    }
}

消息消费相关逻辑:

vim src/main/java/com/example/mykafka/KafkaConsumer.java
package com.example.mykafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

/**
 * @author by Remer
 * @date on 2020-11-17
 */
@Component
public class KafkaConsumer {

    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private final TaskExecutor exec = new SimpleAsyncTaskExecutor();

    @KafkaListener(id = "messageGroup", topics = "topic1")
    public void listen(Message msg) {
        logger.info("Received: " + msg);
        if (msg.getContent().startsWith("fail")) {
            throw new RuntimeException("failed");
        }
        this.exec.execute(() -> System.out.println("Hit Enter to terminate..."));
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
        return new SeekToCurrentErrorHandler(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2));
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("topic1", 1, (short) 1);
    }

    @Bean
    public NewTopic dlt() {
        return new NewTopic("topic1.DLT", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            System.out.println("Hit Enter to terminate...");
            System.in.read();
        };
    }
}

消息主体:

vim src/main/java/com/example/mykafka/Message.java
package com.example.mykafka;

/**
 * @author by Remer
 * @date on 2020-11-17
 */
public class Message {

    private String content;

    public Message() {

    }

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return this.content;
    }

    @Override public String toString() {
        return "Message{" +
            "content='" + content + '\'' +
            '}';
    }
}

运行:

./gradlew bootrun
curl localhost:8001//send/msg/HelloWorld

查看消息接收:

com.example.mykafka.KafkaConsumer        : Received: Message{content='HelloWorld'}
Hit Enter to terminate...

参考

Spring Boot CLI
spring-kafka

上一篇下一篇

猜你喜欢

热点阅读