Kafka入门与SpringBoot集成使用

2022-06-23  本文已影响0人  elijah777

kafka

内容大纲

1、简单介绍认识

2、安装与使用

3、与springboot集成

一、简单介绍认识

介绍:

1、Kafka 是一套流处理系统,可以让后端服务轻松的相互沟通,是微服务架构中常用的组件。

异步处理、服务解耦、流量控制

异步处理

随着业务的不断增加,通常会在原有的服务上添加上新服务,这样会出现请求链路越来越长,链路latency也就逐步增加。例如:最开始的电商项目,可能就是简简单单的扣库存、下单。慢慢地又加上了积分服务、短信服务等。链路增长不可避免的latency增加。

相对于扣库存、下单,积分和短信没必要恢复的那么及时。所以只需要在下单结束的时候结束那个流程,把消息传给消息队列中就可以直接返回响应了。而且短信服务和积分服务可以并行的消费这条消息。这样响应的速度更快,用户体验更好服务异步执行,系统整体latency(相对使用同步机制而言)也下降了

服务解耦

上面说的新加了短信服务和积分服务,现在又需要添加数据分析服务、以后可能又加一个策略服务等。可以发现订单的后续链路一直在增加,为了适配这些功能,就需要不断的修改订单服务,下游任何一个服务的接口改变都可能会影响到订单服务。

这个时候可以采用消息队列来解耦,订单服务只需要把消息塞到消息队列里面,下游服务谁要这个消息谁就订阅响应的topic。这样订单服务就不用被拿捏住了!!

流量治理

后端服务相对而言是比较脆弱的,因为业务较重,处理时间长。如果碰到高QPS情况,很容易顶不住。比如说题库数据写入到ES索引中,数据都是千万级别的。这个时候使用中间件来做一层缓冲,消息队列是个很不错的选择。

变更的消息先存放到消息队列中,后端服务尽自己最大的努力去消费队列中消费数据。

同时,对于一些不需要及时地响应处理,且业务处理逻辑复杂、流程长,那么数据放到消息队列中,消费者按照自己的消费节奏走,也是很不错的选择。

上述分别对应着 生产者生产过快消费者消费过慢 两种情况,使用消息队列都能很好的起到缓冲作用。

  1. Producer : 消息生产者,就是向 Kafka发送数据 ;

  2. Consumer : 消息消费者,从 Kafka broker 取消息的客户端;

  3. Consumer Group (CG): 消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

  4. Broker :经纪人 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。

  5. Topic : 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic;

  6. Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;如果一个topic中的partition有5个,那么topic的并发度为5.

  7. Replica: 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

  8. Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

  9. Follower: 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。

  10. Offset: 每个Consumer 消费的信息都会有自己的序号,我们称作当前队列的offset。即

    消费点位标识消费到的位置

    每个消费组都会维护订阅的Topic 下每个队列的offset

发布/订阅模式:

为了解决 一条消息能被多个消费者消费的问题 ,发布/订阅模式是个很不错的选择。生产者将消息塞到消息队列对应的topic中,所有订阅了这个topic的消费者都能消费这条消息。

kafkaRocketMQ使用的是发布订阅模式,而RabbitMQ使用的是队列模式。

kafka文件存储方式

由于生产者生产的消息会不断追加到 log 文件末尾, 为防止 log 文件过大导致数据定位效率低下, Kafka 采取了分片索引机制,将每个 partition 分为多个 segment。

每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic 名称+分区序号。例如, first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。

  1. 生产者消费者

生产者服务 Producer 向 Kafka 发送消息,消费者服务 Consumer 监听 Kafka 接收消息。

一个服务可以同时为生产者和消费者。

****3. Topics 主题****

Topic 是生产者发送消息的目标地址,是消费者的监听目标。

一个服务可以监听、发送多个 Topics。

Kafka 中有一个【consumer-group(消费者组)】的概念。这是一组服务,扮演一个消费者。

如果是消费者组接收消息,Kafka 会把一条消息路由到组中的某一个服务。这样有助于消息的负载均衡,也方便扩展消费者。

Topic 扮演一个消息的队列。

一条消息发送,这条消息被记录和存储在这个队列中,不允许被修改。

消息会被发送给此 Topic 的消费者,这条消息并不会被删除,会继续保留在队列中。

消息会发送给消费者、不允许被改动、一直呆在队列中。(消息在队列中能呆多久,可以修改 Kafka 的配置)

4. Partitions 分区

把 Topic 看做了一个队列,实际上,一个 Topic 是由多个队列组成的,被称为【Partition(分区)】。这样可以便于 Topic 的扩展。

生产者发送消息的时候,这条消息会被路由到此 Topic 中的某一个 Partition。

消费者监听的是所有分区。

生产者发送消息时,默认是面向 Topic 的,由 Topic 决定放在哪个 Partition,默认使用轮询策略。

也可以配置 Topic,让同类型的消息都在同一个 Partition。

例如,处理用户消息,可以让某一个用户所有消息都在一个 Partition。

例如,用户1发送了3条消息:A、B、C,默认情况下,这3条消息是在不同的 Partition 中(如 P1、P2、P3)。

在配置之后,可以确保用户1的所有消息都发到同一个分区中,为了提供消息的【有序性】。

消息在不同的 Partition 是不能保证有序的,只有一个 Partition 内的消息是有序的。

5. 架构

Kafka 是集群架构的,ZooKeeper是重要组件。

ZooKeeper 管理者所有的 Topic 和 Partition。

Topic 和 Partition 存储在 Node 物理节点中,ZooKeeper负责维护这些 Node。

Topic A 的 Partition #1 有3份,分布在各个 Node 上。

这样可以增加 Kafka 的可靠性和系统弹性。

3个 Partition #1 中,ZooKeeper 会指定一个 Leader,负责接收生产者发来的消息。

这样,每个 Partition 都含有了全量消息数据。

即使某个 Node 节点出现了故障,也不用担心消息的损坏。

Topic A 和 Topic B 的所有 Partition 分布可能就是这样的:

二、安装与使用

安装:

下载地址, 要下载带bin的, 代码被编译的

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0.tar.gz

解压之后

1、zooleeper文件夹里面新建data目录

2、修改conf 里面配置文件 dataDir 为data的路径

3、修改文件名zoo_sample.cfg为zoo.cfg

4、在zooleeper启动bin下面的zkServer.sh

启动报错与处理

sh ./bin/zkServer.sh start

apache-zookeeper-3.7.0/data/zookeeper_server.pid: No such file or directory FAILED TO WRITE PID****

更换为 ./bin/zkServer.sh start

使用./zkServer.sh start ../conf/zoo.cfg 而不是 sh zkServer.sh start ../conf/zoo.cfg


tar -zxvf /opt/apache-zookeeper-3.6.1-bin.tar.gz -C ./
mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1
cd zookeeper-3.6.1
" 创建 dataDir, 在 zoo.cfg 中修改 dataDir"
mkdir data
cd zookeeper-3.6.1/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
" 修改 dataDir 为上面创建的 data 目录"

Starting zookeeper ... FAILED TO START

查看日志:

错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain

解决方式 在zookeeper-server下编译项目 mvn package

还是不行, 发现自己下载的不是编译的文件

重新下载 apache-zookeeper-3.8.0-bin.tar.gz 解压后重新启动 便可以正常使用

(base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/shenshuaihu/SoftWare/Develop/Apache/tomcat/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
(base) shenshuaihu@Elijah apache-zookeeper-3.8.0-bin % bin/zkCli.sh
Connecting to localhost:2181

下载kafka

官网下载

https://www.apache.org/dyn/closer.cgi#backup

解压, 更新配置路径 server.properties log.dirs

启动

./bin/kafka-server-start.sh ./config/server.properties

进入kafka的bin目录

创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testInfoTopic --partitions 2 --replication-factor 1

查看topic

./kafka-topics.sh --list --bootstrap-server localhost:9092

生产数据

./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic

消费数据

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic

Untitled.png

关闭

关闭  [zookeeper](http://zookeeper-server-stop.sh/)
sh zkEnv.sh

三、 与springboot集成

pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.53</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

配置yml

spring:
  application:
    name: kafka
  kafka:
    consumer:
      bootstrap-servers:
        - 127.0.0.1:9092
      # 消费组
      group-id: myGroup
      # 消费者是否自动提交偏移量,默认为true
      enable-auto-commit: false
      # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
      auto-offset-reset: earliest
      # 单次调用poll方法能够返回的消息数量
      max-poll-records: 50
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者使用


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class ProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafka;

    @RequestMapping("register")
    public String register(User user) {
        String message = JSON.toJSONString(user);
        log.info("接收到用户信息:" + message);
        kafka.send("testInfoTopic", message);
        //kafka.send(String topic, @Nullable V data) {
        return "OK";
    }
   
}

消费者

import com.alibaba.fastjson.JSON;
import kafka.com.dto.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

@Configuration
@Slf4j
public class Consumer {

    @KafkaListener(topics = "testInfoTopic" )
    public void consume(String message) {
        System.out.println("接收到消息:" + message);
        log.info("正在为 " + message + " 办理注册业务...");
        log.info("注册成功");
    }

}

简单的kafka就是如此使用

实际场景应用

通过拦截器获取特殊场景(如删除)的方法, 获取方法的参数组装等业务日志信息,通过Kafka发送到日志系统.

注解标记


import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 删除时发送日志
 */
@Target({ElementType.METHOD, ElementType.TYPE}) //注解的使用范围,方法注解
@Retention(RetentionPolicy.RUNTIME)  //注解的生命周期,运行期
@Documented //在生成javac时显示该注解的信息
public @interface DeleteLog {
    
    String actionName() default ("");

    boolean enabled() default true;

    String[] produces() default {};

}

对应方法添加注解方法

@DeleteLog
public void test(User user) {
// ...
}

@DeleteLog
public void delete(User user) {
    log.info("delete {}", user.getName());
}

拦截器的使用


import kafka.com.annotation.DeleteLog;
import kafka.com.config.Producer;
import kafka.com.dto.User;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;

@Aspect
@Order(-99)
@Component
@Slf4j
public class UserLogInterceptor implements MethodInterceptor{

@Autowired
    private Producer producer;

@Before("@annotation(deleteLog)")
    public void beforeTest(JoinPoint point, DeleteLog deleteLog) throws Throwable {
        System.out.println("beforeTest:" + deleteLog.actionType());
        String type = deleteLog.actionType();
        StringBuilder message = new StringBuilder();
        log.info("delete: {}", deleteLog);
        message.append(type);
        message.append(new Date());
        Object[] args = point.getArgs();
        for (Object obj : args) {
            if (obj instanceof User) {
                message.append(obj);
             }
            if (obj instanceof String) {
                message.append(obj);
            }
        }
        producer.producer("deleteTopic", message.toString());
    }

@After("@annotation(deleteLog)")
    public void afterTest(JoinPoint point, DeleteLog deleteLog) {
        System.out.println("afterTest:" + deleteLog.actionType());
    }

参考文章

1、 Kafka -- 从基础到高级 https://blog.csdn.net/eraining/article/details/115860664

2、 Kafka 顺序消费方案https://blog.csdn.net/qq_38245668/article/details/105900011

3、 图解 Kafka blog.csdn.net/duysh/article/details/116355977

2022/06/23 于成都

上一篇下一篇

猜你喜欢

热点阅读