rocketmq

过滤消息

2021-06-19  本文已影响0人  念䋛

过滤消息
消息的过滤分为两种
Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔,tag模式的粒度大.
SQL92过滤方式:利用sql的方式更精确的过滤消息.
RocketMQ只定义了一些基本语法来支持这个特性。
SQL语法
数值比较,比如:>,>=,<,<=,BETWEEN,=; 字符比较,比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来; NULL,特殊的常量 布尔值,TRUE 或 FALSE 使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
过滤是broker服务端的行为,不需要消费端额外的过滤,这样减少了消费者服务器的负担
TAG过滤

public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    //获取TagA  TagC两个tag的消息
    consumer.subscribe("TagFilterTest", "TagA || TagC");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

SQL模式
生产者

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC"};

    for (int i = 0; i < 15; i++) {
        Message msg = new Message("SqlFilterTest",
            tags[i % tags.length],
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        //sql用a is not null and a between 0 and 3
        //sql模式针对消息中的property属性,在消费端可以用a属性做过滤,也可以属性联合TAG做过滤
        msg.putUserProperty("a", String.valueOf(i));
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }

    producer.shutdown();
}

消费者

public static void main(String[] args) throws Exception {

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    //tag在TagA和TagB中,属性a不为空并且在0和3之间,包含0和3 左闭 右闭
    consumer.subscribe("SqlFilterTest",
        MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
            "and (a is not null and a between 0 and 3)"));
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

上一篇 下一篇

猜你喜欢

热点阅读