RocketMQ

八、RocketMQ Filter Example 属性过滤消费

2019-04-11  本文已影响44人  ASD_92f7

一、概述

参考链接:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
https://blog.csdn.net/u010690828/article/details/84337688
简单来讲,就是在发送消息的时候设置一些属性,然后在消费消息的时候根据一些判断条件去筛选这些属性
RocketMQ的filter支持SQL92标准,一些简单的逻辑判断

二、FilterProducer 生产者

与一般生产者不同之处在于,这里设置了一个属性 a
msg.putUserProperty("a", String.valueOf(i));

package com.asd.rocket.controller.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 10:08
 */
public class FilterProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("filterGroup");
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.11.155:9876");
        producer.setSendMsgTimeout(13000);
        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        //Launch the instance.
        producer.start();
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            /*
             * Create a message instance, specifying topic, tag and message body.
             * 三个参数
             * Topic、Tag、MessageBody
             */
            Message msg = new Message("qqq","TagA" , "keu1",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.putUserProperty("a", String.valueOf(i));
            messages.add(msg);
        }
        SendResult sendResult = producer.send(messages);
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

三、消费者

与一般消费者不同的是,这里没有用tag或者* 来注册监听,而是采用了一个MessageSelector
consumer.subscribe("qqq", MessageSelector.bySql("a between 0 and 3"));
只消费属性 a 在 0到3之间的消息
应该还有其他的selector,后续遇到了补充

package com.asd.rocket.controller.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 10:08
 */
public class FilterConsumer {
        public static void main(String[] args) throws Exception{
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");
            // Specify name server addresses.
            consumer.setNamesrvAddr("10.1.11.155:9876");
            // Subscribe one more more topics to consume.
            consumer.subscribe("qqq", MessageSelector.bySql("a between 0 and 3"));
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    msgs.forEach(m-> System.out.println(new String(m.getBody())));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //Launch the consumer instance.
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
}

四、一个坑

在启动FilterConsumer的时候,报错:The broker does not support consumer to filter message by SQL92
这个时候可以参考头部的第二个链接的解决办法:
在 ~/rocket安装目录/conf/broker.conf 中添加一行配置:
enablePropertyFilter = true

上一篇下一篇

猜你喜欢

热点阅读