Kafka和RabbitMQ比较之Consumer接收消息部分
今天写的是Kafka和RabbitMQ的Consumer消费者部分,感觉写起来比Producer还费劲,不过通过网上找资料,读源码,问朋友,最终还是写出来了。
Kafka Consumer接收消息流程
1.配置消费者客户端参数,服务器地址,反序列化器要与Producer的序列化器对应,GroupId配置(同一个partition同一个groupId下的消费者消费消息是抢占式的,不会被共同消费)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
2.创建消费者实例。
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
3.订阅一个或者多个主题。
//订阅一个主题
consumer.subscribe(Collections.singletonList(this.topic));
//订阅多个主题
consumer.subscribe(Arrays.asList("TOPIC1","TOPIC2"));
4.拉取消息并消费。每条消息的类型为ConsumerRecord,它与ProducerRecord相对应。
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
5.拉取消息首先是从本地存储的completedFetches取出消息消费,当completedFetches没有消息时候会向Broker发送请求获取消息,并存入到completedFetches中。
6.提交消费位移,将已经消费完毕的位移提交给Broker。这个是非常隐蔽的。源码中下面的这个updateAssignmentMetadataIfNeeded方法内部对消费位移进行了提交
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
7.关闭消费者实例。
RabbitMQ Consumer接收消息流程
1.消费者连接RabbitMQ服务器,建立TCP连接Connection,创建信道Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// 设置RabbitMQ地址
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
// 建立到代理服务器连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
2.声明exchange交换器,绑定queue队列
// 声明交换器
String exchange = "test-exchange";
channel.exchangeDeclare(exchange, "direct", true);
// 声明队列
String queue = channel.queueDeclare().getQueue();
String routingKey = "test-routingKey";
// 绑定队列
channel.queueBind(queue, exchange, routingKey);
3.通过绑定的队列,routingKey到Broker获取消息,这个有两种方式,一种为订阅,另一种为主动拉取消息的方式。订阅方式Consumer向服务器注册,服务器通过长连接主动发送消息给Consumer;主动拉取得方式就是通过rpc直接向服务器获取消息。
4.关闭信道关闭连接。
总结一下
1.Kafka的源码读起来更可读一些,RabbitMQ的读起来费劲。这一点没什么用。
2.Kafka消息会先存到completedFetches中,单次请求超时会从completedFetches返回一部分,剩下的可以下一次消费再取出。RabbitMQ有订阅(服务器推送消息给Consumer)和主动拉取消息(Consumer主动向服务器查询消息)的方式,而Kafka只有主动拉取消息的方式。
3.Kafka通过Topic,partition,groupId的方式获取消息,而RabbitMQ通过routingKey和exchange找到对应的queue。