Apache Pulsar 指南

Apache Pulsar 源码走读(五)服务端消息流转逻辑概述

2021-02-21  本文已影响0人  WJL3333

版权归本人所有,如果转载请联系本人

这篇大致说一下消息从写入到读取在Apache Pulsar服务端是怎么串起来的。
( 这篇不会详细说明每个逻辑怎么走的,不过会给读者一个整体的俯瞰印象。)

首先说一下表示业务逻辑的几个对象。(都在org.apache.pulsar.broker.service 这个包里面)

Pulsar服务端的主要逻辑对象

Topic: 这个对象在服务端就表示一个topic。这个类是最上层的,所有逻辑都被组织到这个对象里面。

Subscription 这个对象表示一份数据的消费进度。

上面这2个对象,在存储层对应的就是Topic -> ManagedLedgerSubscription -> ManagedCursor

消息写入

通过找到服务端记录的Producer对象,经过一些逻辑处理(消息去重,加密,状态检查等)
确定Topic 最后写入到ManagedLedger里面。

消息读取

则是客户端通过CommandFlow 告知自己当前可以处理消息的状态,
触发消息的分发流程。通过从Consumer 拿到 Subscription 触发消息的读取和Dispatch的过程。

写入逻辑相对读取逻辑来说比较直观。写入到ManagedLedger 即可。主要说下Subscription

Pulsar的消息派发流程(Dispatcher)

我们先单独看一下一个分区的topic

假设这个topic的数据是一个纸带,如果确认消费好了一个消息就涂黑一段。
写入的话就是不断给纸带增加长度。
我们把那些已经分发出去的消息但是还没有ack的消息认为是灰色的。

服务端会记录每个consumer当前的一个队列容量。
如果consumer的队列可以接受更多消息的时候会主动发送CommandFlow
请求给服务端,来标识自己能接受更多的消息。

C1 -> 10
C2 -> 4
C3 -> 8

有新消息写入的时候,如果为了减少延迟,最好马上能通知每个Subscription 有新的消息到来了。
这样的话,比如说新写入了20条消息,则会触发一个消息分发的动作来把这20条消息读取出来(很可能走的cache)之后按照客户端和消息状态分给这3个consumer。

Dispatcher

这个类是Subscription 对象里面的一个成员变量。

当消息从ManagedLedger 里面成功读取之后,这个类需要按照consumer的状态和消息的一些属性
把消息推送给consumer(客户端)。

根据订阅方式的不同分发的逻辑也有区别

这样一个Dispatcher 需要有以下功能

消费进度跟踪

1. 消息确认方式

消息确认可以是累积确认或者是单独确认的。

2. 单条消息ack状态的维护(需要持久化)

单独确认的话,相当于纸带上有一些位置之前全是黑色的,这种是某个点位之前全都消费完的。
我们称这个点位是deletePoint。

有一些位置是黑色和灰色相间的,这种就是某些消息已经被标记ack了,
有些已经被投递到客户端但是还没有ack。
随着单独确认逐渐累积,这样黑色的部分会慢慢连接起来。
这样的话这个deletePoint就可以推进了。

我们再看一下batch的情况,上面说的消息都是针对单条消息的(一个Entry)
如果这个Entry里面的消息是做了batch发送的,里面会包含多条信息。

客户端可能会单独ack这个batch里面的某个消息。
这样的话我们需要记录这个Entry的batch里面哪些消息是已经被ack的。
对于这种消息,Dispatcher在投递消息的时候会带着一个位图来标记这个Entry里面哪些消息是已经处理的。
这样consumer会按照这个bitmap来进行过滤。

3. 消息重发状态跟踪(非持久化)

灰色的位置我们称为当前的readPoint,标记的是当前读取结尾的位置。如果有新的数据写入的话,dispatcher会从这个位置尝试读取新写入的数据,并推进这个readPoint。

如果有的消息已经被发送给consumer了,但是这个消息consumer又通知服务端说需要重新投递。
则这个时候就需要标记readPoint之前的某个消息需要重新读取,这个重新读取的话不会更改readPoint。
同时因为这个消息没有被ack。所以这个redeliver的消息不需要被持久化。
(重新加载之后就可以根据ack的状态推断出来)

4. 消费进度持久化状态

那么如果这个topic被unload到其他的broker。对于一个Subscription需要加载哪些状态呢?

主要状态就是哪些消息已经ack了。同时确认下一次读取的位置是什么。

需要知道记录单独ack消息的状态和batch ack信息的状态。

ManagedCursor 从 ledger(状态被写到bookkeeper里面)或者zk(写入一直失败的话会fallback到zk上)
读取到这个状态的话,按照相应逻辑恢复即可。


// MLDataFormats.proto

message ManagedCursorInfo {
    // If the ledger id is -1, then the mark-delete position is
    // the one from the (ledgerId, entryId) snapshot below
    required int64 cursorsLedgerId = 1;

    // Last snapshot of the mark-delete position
    optional int64 markDeleteLedgerId = 2;
    optional int64 markDeleteEntryId  = 3;


    // 记录了单独某个消息被ack的状态
    repeated MessageRange individualDeletedMessages = 4;

    // Additional custom properties associated with
    // the current cursor position
    repeated LongProperty properties = 5;

    // 时间戳
    optional int64 lastActive = 6;

    // Store which index in the batch message has been deleted
   
    // batch 的 ack状态记录
    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}
上一篇下一篇

猜你喜欢

热点阅读