消息中间件消息中间件RocketMQ专辑

消息过滤

2018-05-21  本文已影响5人  BlackManba_24

在MQ模型中,一般都会有Topic模型,Topic表示一类消息的集合。

在实际应用中,往往对一个Topic下的消息还会有不同的细分,消费方会根据细分的类型消费Topic中特定的一部分消息,这就涉及到了消息过滤。

比如对于交易的Topic,内部可能有下单消息、支付消息。其中支付系统只希望消费到交易Topic下的支付消息,面对这个需求,我们应该如何在自己的MQ中去满足呢?

image

(图片引用自阿里云)

业界实现

RocketMQ

RocketMQ支持在发送消息的时候给消息增加Tag(Tag可以理解为sub-topic,即在Topic下再对消息类型进行区分)。

大多数场景下tag使用起来非常简单,如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Consumer将收到Topic下Tag包含TAGA或TAGB或TAGC的消息。

对于Tag过滤的限制是一条消息只能有一个Tag,这在一些复杂场景下可能没办法满足需求。

RocketMQ提供另一种过滤方式:SQL92

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

仅消费消息属性包含a,且a的值在0-3之间的消息。

RocketMQ对消息过滤的支持比较完善了,通过SQL92这种方式可以满足各种复杂场景的需求了。

Kafka

Kafka目前并没有支持消息过滤,即没有在Topic下提供细分的类型来区分消息。

用户可以在Kafka Streams中实现过滤。

问题分析

大致了解消息过滤的定义和业界的支持情况之后,回头再思考一下,为什么MQ需要做消息过滤、MQ的过滤应该做到什么程度(用使者需要怎么样的过滤方式呢)?试着回答以下的一些问题来弄清楚需求,划清楚问题边界。

1. 为什么需要消息过滤?

业务方(MQ使用方)过滤数据的需求是天然存在的,比如Topic模型也是一种过滤,从众多的数据中订阅自己需要的一部分数据。

在上面这个前提下,逆向考虑这个问题:如果MQ不支持消息过滤(这里的过滤只Topic下的消息细分)但使用方又有过滤的需求,那么会出现什么情况?或者说业务方会怎么去解决这个问题?

可以猜想大致会出现以下两种情况:

  1. 细分Topic,即将Topic再拆分的细一些,把二级类型直接作为Topic

  2. 在Consumer的消费逻辑中根据消息的属性或者内容决定是否过滤消息

第一种情况在一些场景下实际上是无法做到的,比如本文开头的交易场景的例子。一旦对Topic进行了拆分,那么细分后的数据之间的消息顺序就无法保证了,但对于一个订单,它的下单、支付等操作显然是需要顺序被处理的。

对于第二种情况,这也是业务方唯一能做的事情了。当然,也是最灵活的过滤方式了,业务方可以根据自己的需求制定过滤策略。但是带来的问题是所有的消息都需要从服务端先取回到客户端,这里的带宽浪费是比较严重的(取了大量客户端不需要的数据)。

2. MQ对于过滤的需求需要支持到什么程度呢?

对于这个问题,我在思考的时候考虑的是以下几个点:

  1. 业务方的过滤需求有哪些类型,是否可以穷举

  2. MQ的过滤功能能否覆盖掉用户的所有需求

  3. 以及支持消息过滤的成本

显然,用户的过滤需求难以穷举,且业务在不断的变化。但是通过像SQL这样的方式,我们可以认为覆盖了用户所有的过滤需求(就像查MySQL数据,可以组合各种SQL来完成目标数据的获取)。然而还需要去考虑成本的问题,比如机器成本、过滤对消息RT的影响等等。

所以在MQ的消息过滤中,我们期望能在成本和过滤能力之间找到一个平衡点,既能较好的支撑业务的过滤需求同时付出的成本在可接受范围内。

上面这句话的具体含义可以这样理解:

  1. 对消息的写入和消费的RT影响可以忽略

  2. 没有额外的资源需求(业务量不变的情况下,过滤功能不需要额外的机器资源投入)

  3. 覆盖业务的日常过滤需求(满足业务方90%以上的过滤需求)

站在巨人的肩膀上

在理解完需求且清除的知道我们要做什么之后,再来看一下业界“大佬”是怎么解决这个问题的。

RocketMQ Tag过滤

Message包含一个Tag属性,String类型,发送方可以进行设置,通常我们称为打标。

服务端在进行消息存储时,会将消息的Tag属性添加到消息索引中。Rocket的索引结构如下图:

image.png

索引元素包含三项内容:

为什么这里存的是Tag的哈希值而不是Tag本身的值呢?

索引本身是为了加快消息的查询速度,所以它的元素是定长的,这就决定了无法在索引中直接存储Tag的值。

因为索引中存储了Tag的哈希值,那么在进行消息读取时就可以根据用户的订阅请求进行消息匹配(可以在不读取存储文件的情况下完成消息的匹配,且开销可以不计)。

但是因为这里比较的是HashCode,所以在消息返回到Consumer之后需要再进行一次真实值得比较,以避免消费到非期望的数据。

那么增加了Tag之后,消息的读取流程如下:

  1. 获取用户读取消息的请求中期望的Tag的HashCode(可以是多个且进行||或者&&的运算)

  2. 读取索引元素,对比HashCode是否满足用户的过滤需求

  3. 从存储文件读取满足HashCode过滤条件的消息内容返回给Consumer

  4. Consumer反序列化消息,对比Tag值进一步确认消息是否期望数据

RocketMQ SQL92过滤

image.png
  1. Broker通过Consumer的心跳,在ConsumerFilterManager组件中保存Consumer的过滤信息(Expression)

  2. 当Consumer尝试读取消息时,Broker构造MessageFilter来过滤需要的数据

RocketMQ SQL92过滤文档

Tag VS SQL92

|
| Tag过滤 | SQL过滤 |
| --- | --- | --- |
| 覆盖场景 | 支持简单过滤(消息单Tag,可以订阅多Tag或按逻辑运算订阅Tag) | 支持复杂过滤 |
| 实现成本 | 实现简单 | 实现复杂,涉及到SQL解析等 |
| 对服务端的影响 | 服务端只进行简单的long值比较,代价低 | 服务端需要复杂的计算,代价高 |
| 用户的使用成本 | 简单的Tag运算逻辑,对用户要求低 | 用户需要掌握一些SQL语法,相对来说复杂一些 |

结论:

所以在开发资源有限的情况下(比如没有足够的人手)要实现MQ中的过滤功能的话,Tag方式是一个更好的选择,SQL则作为不断完善的一个补充(没有也可以接受,有就最好了)。

“万恶”的业务方

“消息能不能支持多个Tag,这样发送的时候一条消息从不同的维度打上Tag来实现灵活的过滤需求”——from业务方

比如一条订单消息可以按照支付方式打标,也可以按照商品品类打表,这样订阅时可以灵活的过滤出目标数据。

message0.setTags(TagA);
message1.setTags(TagB);
message2.setTags(TagA,TagB,TagC);

consumer0.subscribe(topic, TagA&&TagB);==>message2
consumer1.subscribe(topic, TagA||TagC);==>message0, message2

一是用户提出了这样的需求,二是在支持Tag之后我们也会去考虑Tag的方式还有没有优化空间。

能不能支持一条消息有多个Tag?

消息多Tag的问题其实和索引中无法存储Tag原始值的问题是一致的,都是导致索引结构的变化:索引存Tag值或者存多个Tag的HashCode都会导致索引元素的长度不固定,进而无法快速定位消息。

此时最容易想到的方案就是扩展索引。

扩展实现多Tag

扩展索引的方式能保持消息索引依旧是定长的,把Tag相关的数据单独存储,只在有必要的时候读取Tag信息(用户有过滤需求时),如下图所示:

image.png

这种方式实现多Tag需求是最直观的,缺陷也是最明显的:读写操作都多了一次Tag的操作。

更进一步,有没有办法在多Tag的情况下避免掉这一次Tag的读写操作呢?

不定长索引实现多Tag

既然不能独立出Tag的存储文件,那么只能直接扩展原来的索引文件了,直接将多个Tag的HashCode存到索引中。

面临的问题也非常清晰:不定长索引如何解决读取消息时索引的定位问题?

image

因为每个索引元素的长度是不确定的,当用户需要读Msg2时,就无法通过2*element size来计算索引位置。只能遍历索引了吗?但是遍历显然又是无法接受的!

思考一下写消息的过程,我们是怎么确定消息在文件中的写入位置的呢?——追加到末尾。追加的过程其实是记录上一条消息写入后的位置,那么当前的消息就从之前的位置继续写。

其实消息读取的过程也是一样的,虽然索引是不定长的,但是只要知道了上一条索引的位置和大小,就能定位到下一条消息索引的位置了。

那么,在读取第N条消息时其实只要知道第N-1条消息的索引位置就能快速定位出第N条消息的索引。

而在消息的场景中,99.999%的情况下读完第N条消息时,下一次都会读取第N+1条。

只有在少数异常的情况下需要修改offset的信息来读取之前或者之后的消息(而这种异常的场景下,可以通过一些优化的手段减少扫描的索引未见的数量来查找速度)。

不定长索引的寻址过程如下:

image

此方案虽然解决了上一种扩展索引的方案带来的问题,但是并不能做到和RocketMQ通过SQL的方式支持灵活的过滤需求。

总结

本文从消息过滤的问题出发,介绍了RocketMQ的过滤功能实现,分析了消息过滤的需求,然后总结了不同的多Tag功能的实现方案。

对于消息过滤的实现,没有哪一种方案是完美的,我们应当从自身的场景出发,考虑现实面对的成本等问题,综合考虑,选择一种最适用于自身业务场景的方案。

欢迎关注我的公众号MessageQueue交流MQ相关内容。

image
上一篇下一篇

猜你喜欢

热点阅读