Kafka生产和消费
2020-11-15 本文已影响0人
五月笙
在演示生成与消费消息之前,需要创建一个主题作为消息载体。Kafka提供了需要实用的脚本工具,存放在$KAFKA_HOME/bin目录下,其中与主题有关的就是kafka-topics.sh。
创建主题
[#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 1 --partitions 4
Created topic topic-demo.
上面为单机模式下创建了一个分区数为4、副本因子为1的主题topic-demo,关键参数说明:
--zookeeper:Kafka所连接的ZooKeeper服务地址
--topic:所要创建主题的名称
--replication-factor:指定副本因子
--partitions:指定分区个数
--create:创建主题的动作指令
--describe:展示主题更多信息的动作指令
展示主题信息指令:
[#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
Topic: topic-demo PartitionCount: 4 ReplicationFactor: 1 Configs:
Topic: topic-demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-demo Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-demo Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-demo Partition: 3 Leader: 0 Replicas: 0 Isr: 0
生产和消费
创建主题topic-demo后,需要检验Kafka是否可以正常的发送和消费消息。相应的,在$KAFKA_HOME/bin目录下提供了收发消息的脚本工具:kafka-console-consumer.sh和kafka-console-producer.sh。
启动生产者(consumer)生产消息:
[#] bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
>Hello,Kafka!
>
启动消费者(producer)订阅主题:
[#] bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
Hello,Kafka!
发现消费者shell终端出现了刚刚生产者生产的消息 “Hello,Kafka!”,其中关键参数:
--broker-list:指定连接的Kafka地址
--bootstrap-server:同样为指定连接的Kafka地址
--topic:发送或者消费的主题
代码示例
Kafka的脚本工具一般用来做一些测试类的工作,在实际做复杂的的与业务逻辑相关的消息生产与消费的工作,那就需要通过编程手段来实施。
spring init -b=2.2.7.RELEASE -j=1.8 -d=web,kafka --build=gradle my-kafka && cd my-kafka
项目配置修改:
vim src/main/resources/application.properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
server.port=8001
代码如下:
vim src/main/java/com/example/mykafka/DemoApplication.java
package com.example.mykafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
增加一个接口,方便消息发送:
vim src/main/java/com/example/mykafka/DemoController.java
package com.example.mykafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* @author by Remer
* @date on 2020-11-17
*/
@RestController
public class DemoController {
@Autowired
private KafkaTemplate<Object, Object> template;
@GetMapping(path = "/send/msg/{what}")
public void sendMessage(@PathVariable String what) {
this.template.send("topic1", new Message(what));
}
}
消息消费相关逻辑:
vim src/main/java/com/example/mykafka/KafkaConsumer.java
package com.example.mykafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;
/**
* @author by Remer
* @date on 2020-11-17
*/
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private final TaskExecutor exec = new SimpleAsyncTaskExecutor();
@KafkaListener(id = "messageGroup", topics = "topic1")
public void listen(Message msg) {
logger.info("Received: " + msg);
if (msg.getContent().startsWith("fail")) {
throw new RuntimeException("failed");
}
this.exec.execute(() -> System.out.println("Hit Enter to terminate..."));
}
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
return new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2));
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public NewTopic topic() {
return new NewTopic("topic1", 1, (short) 1);
}
@Bean
public NewTopic dlt() {
return new NewTopic("topic1.DLT", 1, (short) 1);
}
@Bean
public ApplicationRunner runner() {
return args -> {
System.out.println("Hit Enter to terminate...");
System.in.read();
};
}
}
消息主体:
vim src/main/java/com/example/mykafka/Message.java
package com.example.mykafka;
/**
* @author by Remer
* @date on 2020-11-17
*/
public class Message {
private String content;
public Message() {
}
public Message(String content) {
this.content = content;
}
public String getContent() {
return this.content;
}
@Override public String toString() {
return "Message{" +
"content='" + content + '\'' +
'}';
}
}
运行:
./gradlew bootrun
curl localhost:8001//send/msg/HelloWorld
查看消息接收:
com.example.mykafka.KafkaConsumer : Received: Message{content='HelloWorld'}
Hit Enter to terminate...