kafka kafka消费者

2020-10-17  本文已影响0人  dylan丶QAQ

起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,在运用过之后,也去学一学kafka,了解一下他们之间的差别,吃一吃架构方面的相关内容,提升自己。


1. Kafka消费方式分析

kafka里consumer采用的是pull的方式从broker里取数据

2. Kafka消费分区访问策略

一个consumer group中有多个consumer,一个topic里有多个partition,这就涉及了partition的分配问题,确定那个partition由哪个consumer来消费

kafka有三种分配策略:range(范围模式,默认的),roundrobin(均衡),sticky(粘性方式v0.11新增)

# 例1,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有4个分区
c0: t0p0,t0p1,t1p0,t1p1 
c1: t0p2,t0p3,t1p2,t1p3
# 例2,假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p1,t1p0,t1p1
c1: t0p2,t1p2
# 例1: 假设消费者组有两个消费者c0,c1,都订阅了t0和t1,每个topic都有3个分区
c0: t0p0,t0p2,t1p1
c1: t0p1,t1p0,t1p2
# 例2: 3个消费者c0,c1,c2, 有三个topic,每个topic有3个分区,对于消费者而言,c0订阅的t0,c1订阅的t0和t1,c2订阅的t0,t1,t2
c0: t0p0
c1: t0p1,t1p0,t1p2
c2: t0p2,t1p1,t2p0,t2p1,t2p2
# 例1:三个消费者c0,c1,c2,都订阅了4个主题,t0,t1,t2,t3,每个topic有两个分区
c0: t0p0,t1p1,t3p0
c1: t0p1,t2p0,t3p1
c2: t1p0,t2p1
# 这个分配很像负载均衡

如果c1退出消费者组

# roundrobin策略下
c0: t0p0,t1p0,t2p0,t3p0
c2: t0p1,t1p1,t2p1,t3p1
# sticky策略下
c0: t0p0,t1p1,t3p0,t2p0
c2: t1p0,t2p1,t0p1,t3p1

3. Springboot整合Kafka进行消息收发

3.1. Producer发送端

首先引入POM依赖

 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>

配置producer的yaml

spring:
 kafka:
 bootstrap-servers: 39.99.222.44:9092
 producer:
 retries: 3 #发送消息的重试次数
 batch-size: 16384
 acks: 1 #等待partition leader落盘完成
 buffer-memory: 33554432 #设置生产者内存缓存的大小
 key-serializer: org.apache.kafka.common.serialization.StringSerializer
 value-serializer: org.apache.kafka.common.serialization.StringSerializer

发送代码编写

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
​
import javax.annotation.Resource;
​
@Component
public class KafkaProducerService {
​
 @Resource
 private KafkaTemplate<String,Object> kafkaTemplate;
​
 public void sendMessage(String topic,String key,Object object){
 ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,object);
 future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
 @Override
 public void onFailure(Throwable throwable) {
 System.out.println("********消息发送成功:"+throwable.toString());
​
 }
 @Override
 public void onSuccess(SendResult<String, Object> result) {
 System.out.println("=========消息发送成功:"+result.toString());
 }
 });
 }
}

调用测试

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.icodingedu.producter.service.KafkaProducerService;
​
@SpringBootTest
class KafkaProducterApplicationTests {
​
 @Autowired
 KafkaProducerService kafkaProducerService;
​
 @Test
 void contextLoads() {
 String topic = "topicfirst";
 for(int i=0;i<10;i++){
 kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
 }
 System.out.println("======发送完成======");
 }
}

在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下

# 读取的值
hello kafka 1
hello kafka 6
hello kafka 2
hello kafka 7
hello kafka 3
hello kafka 8
hello kafka 4
hello kafka 9
hello kafka 0
hello kafka 5
Partition: 0    1    2    3    4 
 1 6  2 7  3 8  4 9  0 5

不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

上一篇下一篇

猜你喜欢

热点阅读