ActiveMQ递进插件的制作
2017-11-23 本文已影响45人
MisterCH
在项目中用到了几种自己写的ActiveMQ插件:
- 消息流插件,在消息被发送到MQ上时记录日志
- MQTT消费者添加selector插件,在MQTT协议建立消费者时读取clientID,自动添加一个selector
- MQTT协议流控插件,计算一个用户短时间内的连接次数,如果连接次数超限,就在建立连接时拒绝。
目前在设计一个自己的安全插件,主要有一下几个功能:
- 通过配置文件配置,插件定时读取配置文件并更新权限信息
- 无分组概念,每个用户通过一行配置来配置该用户的用户名(加密)、权限
如:
system=manager queue:qname.abc.>:rw,topic:tname.1.>:rwc
r表示可消费,w表示可生产,c表示可创建。
在实际开发时,我们遇到一个问题,如果在一个BrokerFilter中设置用户准入和用户的读写权限,读写权限不会起作用。
更技术点讲,如果我们在插件中同时重写addConnection方法和addConsumer方法,addConsumer方法中的权限设置会失效。这是因为我们在addConnection方法中已经调用了下一个BrokerFilter,所以本插件中的addConsumer方法不会再被调用。
这就需要使用到ActiveMQ的递进插件配置了,在查看了同样有三层递进关系的AMQ的Shiro插件源码后,这种递进插件配置就很显而易见了。
##ShiroPlugin.java中的installPlugin方法
public class ShiroPlugin extends BrokerPluginSupport {
//这里设置的broker变量表示在shiro下游的broker
private Broker broker;
public Broker installPlugin(Broker broker) throws Exception {
Environment environment = ensureEnvironment();
this.authorizationFilter.setEnvironment(environment);
this.authenticationFilter.setEnvironment(environment);
this.subjectFilter.setEnvironment(environment);
//设置递进关系
this.broker = broker;
this.authorizationFilter.setNext(broker);
this.authenticationFilter.setNext(this.authorizationFilter);
this.subjectFilter.setNext(this.authenticationFilter);
//下一层递进是subjectFilter
Broker next = this.subjectFilter;
//如果插件没被启用,则直接设置next是下游的broker
if (!this.enabled) {
//not enabled at startup - default to the original broker:
next = broker;
}
setNext(next);
return this;
}
}
在我之前写的AMQ插件开发介绍中,其实也介绍了插件通用写法,见AMQ插件开发。其中的源码稍加修改也可以实现递进插件。
//入口类
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageLogPlugin implements BrokerPlugin {
private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageLogPlugin");
return new MessageLog(broker);
}
}
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
public class MessageLog extends BrokerFilter{
public MessageLog(Broker next) {
super(new NextFilter(next));
log.info("initialize Message Log plugin");
}
}
public class NextFilter extends BrokerFilter{
public NextFilter(next) {
super(new NextFilter(next));
log.info("initialize next filter plugin")
}
}
在我的代码中用到的是继承BrokerFilter类,其实还是应该像Shiro一样做成继承BrokerPluginSupport类比较合适。功能更丰富一些。