软件篇-kafka(over)-研究-深入研究生产消费模型

2021-05-13  本文已影响0人  秃头猿猿

1.生产者

1.1 架构

当需要往broker发送消息时,则需要创建一个或者多个生产者往broker发布消息,虽然借助SpringBootbroker里面发送消息的API比较简单

如果借助SprinBoot发送消息,后面章节会阐述到

但是很多时候不同的业务场景会出现不同的问题,例如:

不同的场景下,光是知道API的使用肯定是满足不了的,因此在使用API之前还需了解发送消息的原理。

发送消息原理图如下:

image-20210513100249431

关于上述原理图解释如下:

1.2 topic

当消息发送到topic时,其实消息是发送到topicpartition上,而在物理上一个partition就是对应的就是一个目录

例如:在kafka-eagle上创建wangzhtopic,且分区数为3,副本数为 1,如下:

image-20210513102003513

查看该topic详情可知,三个分区其中131上的partition-0leader,其他的如下:

image-20210513102046056

同时取查看131机器上的数据/var/data/kafka(这个目录是当时安装时指定的数据存储目录)

image-20210513102509413

所以一个分区在物理上对应的就是一个目录

1.3 存储

当发送消息时到topicpartitions上,分区会消息写入segment文件上,一个partitions由多个segment文件组成,如下:

image-20210513110017402

每个segment文件默认存储数据大小为1G,当然也可以通过修改kafka参数调整

# 单个segment存储数据大小
log.segment.bytes=具体内容

# 当超过一定的时间(默认七天),写入segment文件的数据还没有达到1G(默认大小)
# 也会重新创建新的segment文件
log.segment.ms=时间

从上图中看出,第一个segment文件的偏移量一定是从0开始的,而下一个segment文件则是从上个segment文件偏移量开始的


同时segment文件分为.index.log文件,如下:

image-20210513110632406

其中.log用来存储真正的数据,.index是索引文件

假如如果想要消费偏移量为197的文件,如果没有索引则需要从头到位去寻找,而有了索引文件就完全可以提高查询速度

其中前面一大串代表文件名,第一个segment文件肯定是从0开始,第二个segment文件命名则是以上个文件偏移量+1命名,如下:

第一个segment文件命名

0000000000000000.index

0000000000000000.log

当上一个文件偏移量为1679898是,那么下个segment文件命名为

00000000001679899.index

00000000001679899.log

以此类推

1.4 发送

经过上面的消息,已经知道生产者发送原理,接下来就借助SpringBootbroker发送消息。如下:

1.4.1 创建

先创建springboot项目kafka-springboot-test,并且导入kafka依赖,其pom.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml增加以下配置

spring:
  kafka:
   # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
    # 如果没有至少一个配置的主题,则容器是否应无法启动
    # false 代表关闭此功能
      missing-topics-fatal: false
    producer:
    # 发布消息时,key的序列化器,这里是kafka提供的序列化器
    # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
    # 生产者通过该序列化器将消息的key值序列化为字节数组
    # 后面会讲述如何自定义序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    
    # 发布消息时,value的序列化器,这里是kafka提供的序列化器
    # 当发送消息的key不是字符串时,需要自己写自定义序列化器
    # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
    # 生产者通过该序列化器将消息的key值序列化为字节数组
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

1.4.2 发布

当配置完成即可发布消息,发消息先创建topic,上文中已经创建了test_topic这里就不再创建了

发布消息则是借助org.springframework.kafka.core.KafkaTemplate发布消息,直接注入即可,代码如下:

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
@Slf4j
class DemoApplicationTests {

    /**
     * 第一个泛型为 key值的数据类型
     * 第二个泛型为 value值的数据类型
     */
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

       @Test
    void contextLoads() throws Exception {
        ListenableFuture<SendResult<String, String>> resultListenableFuture =
                kafkaTemplate.send("wangzh", "test-key", "test-topic");
        
        log.info("元数据信息:" + resultListenableFuture.get());

        log.info("发送消息完毕");

       // 关闭连接
       kafkaTemplate.destroy();
    }

}

执行代码成功后,就在kafka-eagle看到消息发送结果,如下

image-20210513113951216

当然也可以指定partition发送消息

1.4.3 acks

acks 参数 规则定了必须须要有多少分区副本收到消息,生产者才会认为消息写入是成功的 这个参数对消息丢失 可能性有重要影响,目前该参数配置如下:

acks = 0

生产者在成功写入消息之前不会等待任何来自服务器的响应

意味着生产者不知道消息有没有把消息发送到broker,只要生产者将消息添加到Socket缓冲区就认为消息发送成功,不需要等待服务器 的响应。因此这种方式也可以支持很高的吞吐量

acks=1

只要集群的leader节点收到消息并写入到segment文件,生产者就会收到来自服务器的成功响应,视为发送成功

假如leader数据写入成功,然后宕机,此时所有的副本还没来的及同步数据,那么

刚写入的数据就会丢失

acks=all

集群的leader收到消息并写入到segment中,同时等待所有的副本同步消息成功后才认为消息发送成功

这种模式是最安全的,及时有的leader发生奔溃,那还是可以重新选举leader进行通信

在配置文件的producer里面设置acks即可

image-20210513133303905

2.消费者

2.1 架构

消费者如果订阅了某个主题消息,那么就可以去进行消费,同时一个消费者属于一个消费组,一个消费组里面所有的消费者都订阅同一个主题.如下:

image-20210513141948856

当消费组里面只有一个消费者时,那么这个消费就回去消费所有分区的消息,当然一般开发也就足够了。

但有时候生产者生产消息过快,而消费者消费消息过慢,就会很容易导致消息堆积,从而阻塞,那么就可以在消费者组里面多增加几个消费者,如下:

image-20210513143001268

注意:同一组的消费者是不会消费同一主题的同一分区消息

当然如果消费者的数量超过了分区数,那么超过的消费者就会处于空闲状态

image-20210513143347884

因此不要让消费者的数量超过分区数

一个消息只能被一个组消费一次,例如上图中consumer-1消费了消息A,那么其他的消费者就不能够再次消费A

如果在消费时,手动指定了偏移量,那么就会重复消费消息,这种情况特殊

当然同一个消息可以被多个消费组进行消费,如下图所示:

image-20210513145040997

2.2 分配

如下图,消费组中可以增加,当增加一个消费者,就会分摊之前消费者的消费压力,那么当新增一个消费者是如何将分区分配给消费者的呢

image-20210513143001268

当消费者新增一个消费者时,会提高消费者的高可用和伸缩性,且当加入到消费组之后就会

给新增的消费者分配一个partition,这种操作称为再分配

注意:在再分配期间,消费者会暂停消费消息,直到分配分区完成才会继续消费消息

且当分区分配给再次分配给某个消费者时,消费者的消息可能丢失读取状态

同理当consumer-2消费者退出消费者组时,那么partition-2就会分配到consumer-1,让他去进行消费

那么kafka是如何知道消费组里面需要再分配呢?这主要是借助于组协调器,每个消费组都会由属于自己的组协调器。

每隔消费者都会发送心跳到协调器,用来维护群组关系和分区关系,如下图所示:

image-20210513151828938

这样kafka就知道了每个消费者属于哪个消费组,以及如何去分配partition

协调器就类似于spring cloud里面的注册中心

当消费者因为某些因素突然停止消费,也就是说协调器收不到消费者的心跳,那么协调器会等待几秒,几秒期间还是没有收到心跳,那么协调器就会把该消费者剔除出组,然后实现再分配。

2.3 消费

这里同样借助SpringBoot去消费消息,消费者配置如下:

spring:
  kafka:
    # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
      # 如果没有至少一个配置的主题,则容器是否应无法启动
      # false 代表关闭此功能
      missing-topics-fatal: false
      producer:
        # 发布消息时,key的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
        # 生产者通过该序列化器将消息的key值序列化为字节数组
        # 后面会讲述如何自定义序列化器
        key-serializer: org.apache.kafka.common.serialization.StringSerializer

        # 发布消息时,value的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key不是字符串时,需要自己写自定义序列化器
        # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
        # 生产者通过该序列化器将消息的key值序列化为字节数组
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消费者组id
      group-id: wangzh-group

      # 是否允许自动提交offset
      # 每当消费者消费一个消息就会产生一个偏移量
      # 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
      enable-auto-commit: true

      # 提交偏移量间隔时间数 100ms提交一次
      auto-commit-interval: 100

      # 消费消息时的反序列器
      # 消费消息时会将字节序列反序列化为字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 消费消息时的反序列化器
      # 消费消息时会将字节序反序列化为字符串
      # 如果消息不是字符串时,需要自己写反序列话器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 偏移量配置
      # latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
      # none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
      # 量那么抛出异常
      # earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
      # 从头开始消费
      auto-offset-reset: latest

消费消,利用org.springframework.kafka.annotation.KafkaListener注解即可消费消息,如下:

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Consumer {

    /**
     * topics 为 topic名字,可以填写多个topic的名字
     * ConsumerRecord 为 消息记录,包含了一条消息大部分数据
     */
    @KafkaListener(topics = {"wangzh"})
    public void consumer(ConsumerRecord<String,String> record) {
      log.info("消息key:" + record.key());
      log.info("消息value:" + record.value());
      log.info("消息偏移量:" + record.offset());
      log.info("消息topic" + record.topic());
    }
}

启动项目即可看到消费的消息,如下:

image-20210513154435765

2.4 批量

上次消费消息时一条一条消费,也就是当一条消息消费完成,才会去消费下一条,这肯定不大合理,因此在数据量大的情况下需要去进行批量消费

批量消费设置如下:

spring:
  kafka:
    # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
      # 如果没有至少一个配置的主题,则容器是否应无法启动
      # false 代表关闭此功能
      missing-topics-fatal: false
      producer:
        # 发布消息时,key的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
        # 生产者通过该序列化器将消息的key值序列化为字节数组
        # 后面会讲述如何自定义序列化器
        key-serializer: org.apache.kafka.common.serialization.StringSerializer

        # 发布消息时,value的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key不是字符串时,需要自己写自定义序列化器
        # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
        # 生产者通过该序列化器将消息的key值序列化为字节数组
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 设置为批量消费,默认为单条消费
      type: batch
    consumer:
      # 消费者组id
      group-id: wangzh-group

      # 是否允许自动提交offset
      # 每当消费者消费一个消息就会产生一个偏移量
      # 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
      enable-auto-commit: true

      # 提交偏移量间隔时间数 100ms提交一次
      auto-commit-interval: 100

      # 消费消息时的反序列器
      # 消费消息时会将字节序列反序列化为字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 消费消息时的反序列化器
      # 消费消息时会将字节序反序列化为字符串
      # 如果消息不是字符串时,需要自己写反序列话器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 偏移量配置
      # latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
      # none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
      # 量那么抛出异常
      # earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
      # 从头开始消费
      auto-offset-reset: latest

      # 批量消费时,最多一次消费多少条数据
      max-poll-records: 1000

image-20210513155124595 image-20210513155139439

同时还需要修改接受消息的参数,修改如下:

@KafkaListener(topics = {"wangzh"})
public void consumer(List<ConsumerRecord<String,String>> records) {
    records.forEach(record -> {
        log.info("消息key:" + record.key());
        log.info("消息value:" + record.value());
        log.info("消息偏移量:" + record.offset());
        log.info("消息topic" + record.topic());
    });
}
image-20210513155347649

2.5 指定

通过之前的学习知道,消费者每消费一条消息就会提交一次偏移量,下次消费时从偏移量后面开始消费,这样保证消息不会重复消费。

有时候有一种特殊情况,需要指定偏移量去进行消费,那么之前普通消费并不能满足,因此需要自定义操作


package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class Consumer {

    /**
     * 每次消费 {"0","1","2"} 消息偏移量从1开始消费
     * @param records
     */
    @KafkaListener(topicPartitions = {
            @TopicPartition(topic = "wangzh",partitions = {"0","1","2"},partitionOffsets = @PartitionOffset(initialOffset =  "1",partition = "*"))
    })
    public void consumer(List<ConsumerRecord<String,String>> records) {
        records.forEach(record -> {
            log.info("消息key:" + record.key());
            log.info("消息value:" + record.value());
            log.info("消息偏移量:" + record.offset());
            log.info("消息topic" + record.topic());
        });
    }
}
上一篇下一篇

猜你喜欢

热点阅读