Kafka入门与SpringBoot集成使用
kafka
内容大纲
1、简单介绍认识
2、安装与使用
3、与springboot集成
一、简单介绍认识
介绍:
1、Kafka 是一套流处理系统,可以让后端服务轻松的相互沟通,是微服务架构中常用的组件。
异步处理、服务解耦、流量控制
异步处理
随着业务的不断增加,通常会在原有的服务上添加上新服务,这样会出现请求链路越来越长,链路latency也就逐步增加。例如:最开始的电商项目,可能就是简简单单的扣库存、下单。慢慢地又加上了积分服务、短信服务等。链路增长不可避免的latency增加。
相对于扣库存、下单,积分和短信没必要恢复的那么及时。所以只需要在下单结束的时候结束那个流程,把消息传给消息队列中就可以直接返回响应了。而且短信服务和积分服务可以并行的消费这条消息。这样响应的速度更快,用户体验更好;服务异步执行,系统整体latency(相对使用同步机制而言)也下降了。
服务解耦
上面说的新加了短信服务和积分服务,现在又需要添加数据分析服务、以后可能又加一个策略服务等。可以发现订单的后续链路一直在增加,为了适配这些功能,就需要不断的修改订单服务,下游任何一个服务的接口改变都可能会影响到订单服务。
这个时候可以采用消息队列来解耦,订单服务只需要把消息塞到消息队列里面,下游服务谁要这个消息谁就订阅响应的topic。这样订单服务就不用被拿捏住了!!
流量治理
后端服务相对而言是比较脆弱的,因为业务较重,处理时间长。如果碰到高QPS情况,很容易顶不住。比如说题库数据写入到ES索引中,数据都是千万级别的。这个时候使用中间件来做一层缓冲,消息队列是个很不错的选择。
变更的消息先存放到消息队列中,后端服务尽自己最大的努力去消费队列中消费数据。
同时,对于一些不需要及时地响应处理,且业务处理逻辑复杂、流程长,那么数据放到消息队列中,消费者按照自己的消费节奏走,也是很不错的选择。
上述分别对应着 生产者生产过快 和 消费者消费过慢 两种情况,使用消息队列都能很好的起到缓冲作用。
-
Producer : 消息生产者,就是向 Kafka发送数据 ;
-
Consumer : 消息消费者,从 Kafka broker 取消息的客户端;
-
Consumer Group (CG): 消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
-
Broker :经纪人 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
-
Topic : 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic;
-
Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;如果一个topic中的partition有5个,那么topic的并发度为5.
-
Replica: 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
-
Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
-
Follower: 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。
-
Offset: 每个Consumer 消费的信息都会有自己的序号,我们称作当前队列的offset。即
消费点位标识消费到的位置
每个消费组都会维护订阅的Topic 下每个队列的offset
发布/订阅模式:
为了解决 一条消息能被多个消费者消费的问题 ,发布/订阅模式是个很不错的选择。生产者将消息塞到消息队列对应的topic中,所有订阅了这个topic的消费者都能消费这条消息。
kafka 和 RocketMQ使用的是发布订阅模式,而RabbitMQ使用的是队列模式。
kafka文件存储方式
由于生产者生产的消息会不断追加到 log 文件末尾, 为防止 log 文件过大导致数据定位效率低下, Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。
每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic 名称+分区序号。例如, first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。
- 生产者消费者
生产者服务 Producer 向 Kafka 发送消息,消费者服务 Consumer 监听 Kafka 接收消息。
一个服务可以同时为生产者和消费者。
****3. Topics 主题****
Topic 是生产者发送消息的目标地址,是消费者的监听目标。
一个服务可以监听、发送多个 Topics。
Kafka 中有一个【consumer-group(消费者组)】的概念。这是一组服务,扮演一个消费者。
如果是消费者组接收消息,Kafka 会把一条消息路由到组中的某一个服务。这样有助于消息的负载均衡,也方便扩展消费者。
Topic 扮演一个消息的队列。
一条消息发送,这条消息被记录和存储在这个队列中,不允许被修改。
消息会被发送给此 Topic 的消费者,这条消息并不会被删除,会继续保留在队列中。
消息会发送给消费者、不允许被改动、一直呆在队列中。(消息在队列中能呆多久,可以修改 Kafka 的配置)
4. Partitions 分区
把 Topic 看做了一个队列,实际上,一个 Topic 是由多个队列组成的,被称为【Partition(分区)】。这样可以便于 Topic 的扩展。
生产者发送消息的时候,这条消息会被路由到此 Topic 中的某一个 Partition。
消费者监听的是所有分区。
生产者发送消息时,默认是面向 Topic 的,由 Topic 决定放在哪个 Partition,默认使用轮询策略。
也可以配置 Topic,让同类型的消息都在同一个 Partition。
例如,处理用户消息,可以让某一个用户所有消息都在一个 Partition。
例如,用户1发送了3条消息:A、B、C,默认情况下,这3条消息是在不同的 Partition 中(如 P1、P2、P3)。
在配置之后,可以确保用户1的所有消息都发到同一个分区中,为了提供消息的【有序性】。
消息在不同的 Partition 是不能保证有序的,只有一个 Partition 内的消息是有序的。
5. 架构
Kafka 是集群架构的,ZooKeeper是重要组件。
ZooKeeper 管理者所有的 Topic 和 Partition。
Topic 和 Partition 存储在 Node 物理节点中,ZooKeeper负责维护这些 Node。
Topic A 的 Partition #1 有3份,分布在各个 Node 上。
这样可以增加 Kafka 的可靠性和系统弹性。
3个 Partition #1 中,ZooKeeper 会指定一个 Leader,负责接收生产者发来的消息。
这样,每个 Partition 都含有了全量消息数据。
即使某个 Node 节点出现了故障,也不用担心消息的损坏。
Topic A 和 Topic B 的所有 Partition 分布可能就是这样的:
二、安装与使用
安装:
下载地址, 要下载带bin的, 代码被编译的
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0.tar.gz
解压之后
1、zooleeper文件夹里面新建data目录
2、修改conf 里面配置文件 dataDir 为data的路径
3、修改文件名zoo_sample.cfg为zoo.cfg
4、在zooleeper启动bin下面的zkServer.sh
启动报错与处理
sh ./bin/zkServer.sh start
apache-zookeeper-3.7.0/data/zookeeper_server.pid: No such file or directory FAILED TO WRITE PID****
更换为 ./bin/zkServer.sh start
使用./zkServer.sh start ../conf/zoo.cfg
而不是 sh zkServer.sh start ../conf/zoo.cfg
tar -zxvf /opt/apache-zookeeper-3.6.1-bin.tar.gz -C ./
mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1
cd zookeeper-3.6.1
" 创建 dataDir, 在 zoo.cfg 中修改 dataDir"
mkdir data
cd zookeeper-3.6.1/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
" 修改 dataDir 为上面创建的 data 目录"
Starting zookeeper ... FAILED TO START
查看日志:
错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
解决方式 在zookeeper-server下编译项目 mvn package
还是不行, 发现自己下载的不是编译的文件
重新下载 apache-zookeeper-3.8.0-bin.tar.gz 解压后重新启动 便可以正常使用
(base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/shenshuaihu/SoftWare/Develop/Apache/tomcat/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
(base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % bin/zkCli.sh
Connecting to localhost:2181
下载kafka
官网下载
https://www.apache.org/dyn/closer.cgi#backup
解压, 更新配置路径 server.properties log.dirs
启动
./bin/kafka-server-start.sh ./config/server.properties
进入kafka的bin目录
创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testInfoTopic --partitions 2 --replication-factor 1
查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic
消费数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic
Untitled.png
关闭
关闭 [zookeeper](http://zookeeper-server-stop.sh/)
sh zkEnv.sh
三、 与springboot集成
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.53</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
配置yml
spring:
application:
name: kafka
kafka:
consumer:
bootstrap-servers:
- 127.0.0.1:9092
# 消费组
group-id: myGroup
# 消费者是否自动提交偏移量,默认为true
enable-auto-commit: false
# 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
auto-offset-reset: earliest
# 单次调用poll方法能够返回的消息数量
max-poll-records: 50
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者使用
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafka;
@RequestMapping("register")
public String register(User user) {
String message = JSON.toJSONString(user);
log.info("接收到用户信息:" + message);
kafka.send("testInfoTopic", message);
//kafka.send(String topic, @Nullable V data) {
return "OK";
}
}
消费者
import com.alibaba.fastjson.JSON;
import kafka.com.dto.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
@Configuration
@Slf4j
public class Consumer {
@KafkaListener(topics = "testInfoTopic" )
public void consume(String message) {
System.out.println("接收到消息:" + message);
log.info("正在为 " + message + " 办理注册业务...");
log.info("注册成功");
}
}
简单的kafka就是如此使用
实际场景应用
通过拦截器获取特殊场景(如删除)的方法, 获取方法的参数组装等业务日志信息,通过Kafka发送到日志系统.
注解标记
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 删除时发送日志
*/
@Target({ElementType.METHOD, ElementType.TYPE}) //注解的使用范围,方法注解
@Retention(RetentionPolicy.RUNTIME) //注解的生命周期,运行期
@Documented //在生成javac时显示该注解的信息
public @interface DeleteLog {
String actionName() default ("");
boolean enabled() default true;
String[] produces() default {};
}
对应方法添加注解方法
@DeleteLog
public void test(User user) {
// ...
}
@DeleteLog
public void delete(User user) {
log.info("delete {}", user.getName());
}
拦截器的使用
import kafka.com.annotation.DeleteLog;
import kafka.com.config.Producer;
import kafka.com.dto.User;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
@Aspect
@Order(-99)
@Component
@Slf4j
public class UserLogInterceptor implements MethodInterceptor{
@Autowired
private Producer producer;
@Before("@annotation(deleteLog)")
public void beforeTest(JoinPoint point, DeleteLog deleteLog) throws Throwable {
System.out.println("beforeTest:" + deleteLog.actionType());
String type = deleteLog.actionType();
StringBuilder message = new StringBuilder();
log.info("delete: {}", deleteLog);
message.append(type);
message.append(new Date());
Object[] args = point.getArgs();
for (Object obj : args) {
if (obj instanceof User) {
message.append(obj);
}
if (obj instanceof String) {
message.append(obj);
}
}
producer.producer("deleteTopic", message.toString());
}
@After("@annotation(deleteLog)")
public void afterTest(JoinPoint point, DeleteLog deleteLog) {
System.out.println("afterTest:" + deleteLog.actionType());
}
参考文章
1、 Kafka -- 从基础到高级 https://blog.csdn.net/eraining/article/details/115860664
2、 Kafka 顺序消费方案https://blog.csdn.net/qq_38245668/article/details/105900011
3、 图解 Kafka blog.csdn.net/duysh/article/details/116355977
2022/06/23 于成都