八、RocketMQ Filter Example 属性过滤消费
2019-04-11 本文已影响44人
ASD_92f7
一、概述
参考链接:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
https://blog.csdn.net/u010690828/article/details/84337688
简单来讲,就是在发送消息的时候设置一些属性,然后在消费消息的时候根据一些判断条件去筛选这些属性
RocketMQ的filter支持SQL92标准,一些简单的逻辑判断
二、FilterProducer 生产者
与一般生产者不同之处在于,这里设置了一个属性 a
msg.putUserProperty("a", String.valueOf(i));
package com.asd.rocket.controller.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 10:08
*/
public class FilterProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("filterGroup");
// Specify name server addresses.
producer.setNamesrvAddr("10.1.11.155:9876");
producer.setSendMsgTimeout(13000);
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//Launch the instance.
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
/*
* Create a message instance, specifying topic, tag and message body.
* 三个参数
* Topic、Tag、MessageBody
*/
Message msg = new Message("qqq","TagA" , "keu1",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("a", String.valueOf(i));
messages.add(msg);
}
SendResult sendResult = producer.send(messages);
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
三、消费者
与一般消费者不同的是,这里没有用tag或者* 来注册监听,而是采用了一个MessageSelector
consumer.subscribe("qqq", MessageSelector.bySql("a between 0 and 3"));
只消费属性 a 在 0到3之间的消息
应该还有其他的selector,后续遇到了补充
package com.asd.rocket.controller.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 10:08
*/
public class FilterConsumer {
public static void main(String[] args) throws Exception{
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");
// Specify name server addresses.
consumer.setNamesrvAddr("10.1.11.155:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("qqq", MessageSelector.bySql("a between 0 and 3"));
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(m-> System.out.println(new String(m.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
四、一个坑
在启动FilterConsumer的时候,报错:The broker does not support consumer to filter message by SQL92
这个时候可以参考头部的第二个链接的解决办法:
在 ~/rocket安装目录/conf/broker.conf 中添加一行配置:
enablePropertyFilter = true