十.SpringBoot集成Kafka
2019-05-14 本文已影响0人
__元昊__
首先看一下整体项目代码结构:
image.png首先看一下pom.xml的内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.travelsky</groupId>
<artifactId>swagger_mysql_redis_kafka_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>swagger_mysql_redis_kafka_demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.7</version>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.21</version>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.7</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>Generate MyBatis Artifacts</id>
<phase>package</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
<configuration>
<!--允许移动生成的文件 -->
<verbose>true</verbose>
<!-- 是否覆盖 -->
<overwrite>true</overwrite>
<!-- 自动生成的配置 -->
<configurationFile>src/main/resources/generatorConfig.xml</configurationFile>
</configuration>
</plugin>
</plugins>
</build>
</project>
我的springboot版本2.14,跟kafka整个jar包只有一个,我用低版本jar包都报错,这个没问题
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
首先定义一个bean用来发送消息的载体:
public class UserLog {
private String username;
private String userid;
private String state;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@Override
public String toString() {
return "UserLog{" +
"username='" + username + '\'' +
", userid='" + userid + '\'' +
", state='" + state + '\'' +
'}';
}
}
定义消息的发送者:
import com.alibaba.fastjson.JSON;
import com.travelsky.swagger_mysql_redis_kafka_demo.domain.UserLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class UserLogProducer {
@Autowired
KafkaTemplate kafkaTemplate;
public void sendLog(String userid){
UserLog userLog = new UserLog();
userLog.setUsername("jhp");
userLog.setUserid(userid);
userLog.setState("0");
System.err.println("发送用户日志数据:"+userLog);
kafkaTemplate.send("test_kafka", JSON.toJSONString(userLog));
}
}
消息的发送直接使用KafkaTemplate模板即可,都封装好了,直接使用
再看消息的消费者:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class UserLogConsumer {
@KafkaListener(topics = {"test_kafka"})
public void consumer(ConsumerRecord<?,?> consumerRecord){
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
}
}
消费机制是通过监听器实现的,直接使用这个@KafkaListener(topics = {"test_kafka"})
注解接口,它可以根据指定的条件进行消息的监听
写一个启动应用类:
@GetMapping("/kafka")
public void testKafka(){
for (int i = 0; i < 10; i++) {
userLogProducer.sendLog(String.valueOf(i));
}
}
再看一下对应的配置文件:
#============== kafka ===================
#172.24.112.13:9092,172.24.112.14:9092,172.24.112.15:9092
spring.kafka.bootstrap-servers=172.24.112.13:9092,172.24.112.14:9092,172.24.112.15:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer