RocketMQ 7.消息过滤
20180705103335515.pngRocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
1.Tag过滤方式
image.pngconsumer.subscribe("TopicTest", "TagA || TagC || TagD");
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
-
在Broker端进行Message Tag比对,先遍历 Consume Queue,如果存储的Message Tag 不订阅的 MessageTag不符合,则跳过,继续比对下一个,符合则传输给Consumer。
注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。 -
Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是 Hashcode。
为什么过滤要这样做?
- Message Tag 存储 Hashcode,是为了在Consume Queue定长方式存储,节约空间。
- 过滤过程中不会访问 Commit Log 数据,可以保证堆积情况下也能高效过滤。
- 即使存在 Hash 冲突,也可以在 Consumer 端进行修正,保证万无一失。
2. SQL92的过滤方式
这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
RocketMQ支持根据用户自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b ='abc'
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group_name");
// 测试nameserver的地址
producer.setNamesrvAddr(Const.NAMESRV_ADDR);
// 启动生产者
producer.start();
// 生产者生产消息的时候设置tags,在这里可以通过设置不能的tags来获取对应的数据
// tags 设置*表示 所有换消息, 使用 || 表示获取多个
// Message message = new Message("my-topic", "add || update", msg.getBytes("UTF-8"));
for (int i = 0; i < 10; i++) {
String sex = i%2==0?"Male":"Female";
String msg = "sex:"+ sex +" age:" +i;
Message message = new Message("my-topic-filter", "add", msg.getBytes("UTF-8"));
message.putUserProperty("sex", sex);
message.putUserProperty("age", i+"");
SendResult sendResult = producer.send(message);
// System.out.println("消息id:" + sendResult.getMsgId());
// System.out.println("消息队列:" + sendResult.getMessageQueue());
// System.out.println("消息偏移量:" + sendResult.getQueueOffset());
System.out.println(sendResult);
}
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='Female' AND age >= 5"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg: msgs) {
System.out.println("消息:" + new String (msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// System.out.println("接收到的消息" + msgs);
// 返回消息是否消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
先后运行Producer、Consumer
消息:sex:Female age:7
消息:sex:Female age:5
消息:sex:Female age:9