apache pulsar笔记
2020-07-16 本文已影响0人
生活在于创作
基础:
基本单元:消息
概念:producer,consumer,broker,bookie,zookeeper,
topic类型:exclusive,failover,share
Messages
消息是Pulsar的基础单元。 消息就是producer发给topic的内容,以及consumer从topic消费的内容(消息处理完成后发送确认)。 消息类似于邮政系统中的信件。
Component | 作用 |
---|---|
Value / data payload | 消息携带的数据。所有pulsar的消息携带原始bytes,但是消息数据也需要遵循数据shcema |
Key | 消息可以被Key打标签。这对诸如topic压缩之类的事情有作用 |
Properties | 可选的,用户定义属性 的key/value map |
Producer名称 | 生产消息的producer名称(producer被自动赋予默认名称,但你也可以自己指定) |
序列ID | Topic中,每个Pulsar消息属于一个有序的序列。消息的序列ID是他在序列中的次序。 |
发布时间 | 消息发布的时间戳(producer自动附上) |
事件时间 | 可选的时间戳,应用可以附在消息上,代表某个事件发生的时间,例如,消息被处理时间。如果没有明确的设置,那么事件时间为0。 |
Topics
和其他的发布订阅系统一样,Pulsar中的topic是被命名的通道,用做从producer到 consumer传输消息。 Topic的名称为符合良好结构的URL。
{persistent|non-persistent}://tenant/namespace/topic
| Topic名称组成 | 说明: |
|:------------------------------- |:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `persistent` / `non-persistent` | 定义了topic类型。 Pulsar支持两种不同topic:[持久](/docs/zh-CN/concepts-architecture-overview#persistent-storage)和 [非持久](#non-persistent-topics)(默认是持久类型,如果你没有指明类型,topic将会是持久类型)。 持久topic的所有消息都会[](/docs/zh-CN/concepts-architecture-overview#persistent-storage)持久化</a>到硬盘上(这意味着多块硬盘,除非是单机模式的broker),反之,[非持久](#non-persistent-topics)topic的数据不会存储到硬盘上。 |
| `tenant` | 实例中topic的租户。tenant是Pulsar多租户的基本要素,可以被跨集群的传播。 |
| `namespace` | Topic的管理单元,充当关联topic组的管理机制。 大多数的topic配置在[namespace](#namespaces)层面生效。 每个tenant可以有多个namespace。 |
| `topic` | 名称的最后组成部分,topic的命名很自由,没有什么特殊的含义。 |
> #### 不需要显式的创建topic
>
> 你并不需要显式的创建topic。 如果客户端尝试从一个还不存在的topic写或者接受消息,pulsar将会按在[topic名称](#topics)提供的[namespace](#namespaces)下自动创建topic。
## 命名空间
命名空间是租户内部逻辑上的命名术语。 一个租户可以通过[admin API](/docs/zh-CN/admin-api-namespaces#create)创建多个命名空间。 例如,一个对接多个应用的租户,可以为每个应用创建不同的namespace。 Namespace使得程序可以以层级的方式创建和管理topic Topic`my-tenant/app1` ,它的namespace是`app1`这个应用,对应的租户是 `my-tenant`。 你可以在namespace下创建任意数量的[topic](#topics)。
## 订阅模型
订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar有三种订阅模式:[exclusive](#exclusive),[shared](#shared),[failover](#failover)。 下图展示了这三种模式:

### Exclusive
*独占*模式,只能有一个消费者绑定到订阅上。 如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。
上面的图中,只有**Consumer A**可以消费。
> Exclusive模式为默认订阅模式。

### Shared(共享)
*shared*或者*round robin*模式中,多个消费者可以绑定到同一个订阅上。 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
第一幅图中,**Consumer-B-1**和**Consumer-B-2**都可以订阅主题,其实**Consumer-C-1**或者其它Consumer也可以订阅。
> #### Shared模式的限制
>
> 使用shared模式时,需要重点注意以下两点: * 消息的顺序无法保证。 * 你不可以使用累积确认。

### Failover(灾备)
*Failover*模式中,多个consumer可以绑定到同一个subscription。 Consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。 这个consumer被称为*master consumer*。
当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。
第一幅图中,Consumer-C-1是master consumer,当Consumer-C-1断开连接时,由于Consumer-C-2在队列中下一个位置,那么它将会开始接收消息。

## 多主题订阅
当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:`persistent://public/default/my-topic`。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:
* 通过最基础的 [正则表达式](https://en.wikipedia.org/wiki/Regular_expression)(regex),例如 `persistent://public/default/finance-.*`
* 通过明确指定的topic列表
> 通过正则订阅多主题时,所有的主题必须在同一个[namespace](#namespaces)。
当订阅多主题时,Pulsar客户端会自动调用Pulsar的API来发现匹配表达式或者列表的所有topic,然后全部订阅。 如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。
> #### 不能保证顺序性
>
> 当消费者订阅多主题时,Pulsar所提供对单一主题订阅的顺序保证,就hold不住了。 如果你在使用Pulsar的时候,遇到必须保证顺序的需求,我们强烈建议不要使用此特性。
下面是多主题订阅在java中的例子:
```java
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient pulsarClient = // Instantiate Pulsar client object
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = pulsarClient.subscribe(allTopicsInNamespace, "subscription-1");
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer someTopicsConsumer = pulsarClient.subscribe(someTopicsInNamespace, "subscription-1");