kafka kafka消费者
起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。
1. Kafka消费方式分析
kafka里consumer采用的是pull的方式从broker里取数据
-
push推的方式很难适应消费速率不同的消费者,消息发送速率是有broker决定的,典型的问题表现是消费端拒绝访问和网络堵塞
-
pull的方式的消费速率是由consumer来确定,如果kafka的topic里没有数据,consumer会长期获取空数据,kafka会在消费时传入一个timeout,如果拉取没有数据,就会等待timeout时长后再返回
2. Kafka消费分区访问策略
一个consumer group中有多个consumer,一个topic里有多个partition,这就涉及了partition的分配问题,确定那个partition由哪个consumer来消费
kafka有三种分配策略:range(范围模式,默认的),roundrobin(均衡),sticky(粘性方式v0.11新增)
-
range:默认的分区消费策略
无论多少个分区,只有一个消费者,那么所有分区都分配给这个消费者
每次新的消费者加入消费者组都会触发新的分配
分配策略:
-
按照topic进行一组来分配给订阅了这个topic的consumer group中的consumer
-
n=分区数/消费者数量,m=分区数%消费者数量,第一个消费者分配n+m个分区,后面的分配n个分区
[图片上传失败...(image-8e5c6-1602662975455)]
-
# 例1,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有4个分区
c0: t0p0,t0p1,t1p0,t1p1
c1: t0p2,t0p3,t1p2,t1p3
# 例2,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p1,t1p0,t1p1
c1: t0p2,t1p2
-
roundrobin:负载均衡的方式
按照消费者组里的消费者进行平均分配
可以通过配置:partition.assignment.strategy
class org.apache.kafka.clients.consumer.RoundRobinAssignor
负载均衡也要看是否订阅了这个topic
每次新的消费者加入消费者组都会触发新的分配
# 例1: 假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p2,t1p1
c1: t0p1,t1p0,t1p2
# 例2: 3个消费者c0,c1,c2, 有三个topic,每个topic有3个分区,对于消费者而言,c0订阅的t0,c1订阅的t0和t1,c2订阅的t0,t1,t2
c0: t0p0
c1: t0p1,t1p0,t1p2
c2: t0p2,t1p1,t2p0,t2p1,t2p2
-
Sticky:粘性策略
kafka的v0.11版本引入的:class org.apache.kafka.clients.consumer.StickyAssignor
主要实现的目录
-
分区的分配要尽可能的均匀
-
分区的分配尽可能的和上次分配保持一致
-
当两者冲突时,第一个目标优先第二个目标
-
# 例1:三个消费者c0,c1,c2,都订阅了4个主题,t0,t1,t2,t3,每个topic有两个分区
c0: t0p0,t1p1,t3p0
c1: t0p1,t2p0,t3p1
c2: t1p0,t2p1
# 这个分配很像负载均衡
如果c1退出消费者组
# roundrobin策略下
c0: t0p0,t1p0,t2p0,t3p0
c2: t0p1,t1p1,t2p1,t3p1
# sticky策略下
c0: t0p0,t1p1,t3p0,t2p0
c2: t1p0,t2p1,t0p1,t3p1
3. Springboot整合Kafka进行消息收发
3.1. Producer发送端
首先引入POM依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置producer的yaml
spring:
kafka:
bootstrap-servers: 39.99.222.44:9092
producer:
retries: 3 #发送消息的重试次数
batch-size: 16384
acks: 1 #等待partition leader落盘完成
buffer-memory: 33554432 #设置生产者内存缓存的大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
发送代码编写
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
@Component
public class KafkaProducerService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
public void sendMessage(String topic,String key,Object object){
ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,object);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("********消息发送成功:"+throwable.toString());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("=========消息发送成功:"+result.toString());
}
});
}
}
调用测试
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.icodingedu.producter.service.KafkaProducerService;
@SpringBootTest
class KafkaProducterApplicationTests {
@Autowired
KafkaProducerService kafkaProducerService;
@Test
void contextLoads() {
String topic = "topicfirst";
for(int i=0;i<10;i++){
kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
}
System.out.println("======发送完成======");
}
}
在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下
# 读取的值
hello kafka 1
hello kafka 6
hello kafka 2
hello kafka 7
hello kafka 3
hello kafka 8
hello kafka 4
hello kafka 9
hello kafka 0
hello kafka 5
Partition: 0 1 2 3 4
1 6 2 7 3 8 4 9 0 5
不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!