rocketMq 消息过滤

2018-12-12  本文已影响0人  圣村的希望

  rocketmq的消息过滤在consumer端和broker端都有过滤,更高级一点的还有通过filterServer来进行高级过滤。

  broker端的过滤:在拉取消息的时候会先去遍历ConsumeQueue的,然后根据tag的hashCode来进行比对看是否符合要求,这里会出现哈希冲突,但是没关系在consumer端会根据tag的字符串进行去比对了,保证了正确性。这里在consumeQueue里面进行比对有个好处,就是避免了对commitLog的访问,不需要再去访问磁盘里的消息了。

  consumer端的过滤:把从服务端获取到的消息根据tag进行字符串比对,这里确保了broker由于哈希冲突导致的过滤不干净的问题。

  更高级些的消息过滤是使用FilterServer进行过滤,在rocketMq的架构中有一个filtersrv模块,他是用来进行消息过滤的,他通过consumer端上传上来的过滤代码进行过滤。
  采用FilterServer过滤的步骤:

  1. 在broker的配置中添加filterServerNumbers=3配置,这样在broker启动的时候会启动3个filter server进程用来进行broker端的高级消息过滤。它是在broker端和consumer端之间的一层,把从broker端获取的消息按consumer端上传的过滤逻辑进行过滤,然后才返回给consumer端。
  2. 实现MessageFilter接口,编写自己的消息过滤逻辑。
  3. 通过consumer.subscribe方法上传实现了MessageFilter接口的类,用于filter server进行过滤。

  这里把本地的类上传到rocketMq远端进行执行,这里是危险的操作,如果本地代码大量创建对象和线程就会很消耗broker机器的资源,必须严格确认自己本地的代码才确认上传到远端broker执行。

上一篇 下一篇

猜你喜欢

热点阅读