Dubbo之EchoFilter源码分析及改造
前言
EchoFilter在dubbo中用于提供回声测试功能,也就是检测服务是否可用。
我们看下官方文档中对这块的描述。
也就是说,我们不需要调用服务的某个具体接口来检测服务是否可用。框架提我们强制实现了一个echo接口。
源码的实现也比较简单。
在消费者端,生成代理的时候,把EchoService也代理进去。
在提供者端,在解析报文,实际调用invoker之前,通过Filter拦截链中的EchoFilter判断方法是否为echo,如果是echo,不进行invoker调用,走EchoFilter内部的调用逻辑。
由于我们的系统除了对服务是否可用之外还需要做版本以及系统配置的监控,所以我们会改造这个Filter来达到我们的需求。
源码分析
首先看消费者端生成代理的代码
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter(Constants.INTERFACES);
//创建proxy时增加EchoService接口
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// TODO can we load successfully for a different classloader?.
interfaces[i + 2] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
//如果本身实现的接口没有实现GenericService,创建proxy时增加GenericService接口
if (!GenericService.class.isAssignableFrom(invoker.getInterface()) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = com.alibaba.dubbo.rpc.service.GenericService.class;
}
return getProxy(invoker, interfaces);
}
在客户端生成代理的时候,会强制加上EchoService接口,也就是在dubbo中,默认所有服务都是启动回声检测的。
下面来看下提供者端如何处理EchoService,我们看下EchoFilter,这个拦截前会在invoker实际调用前先执行,EchoFilter代码如下
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
}
EchoFilter 会判断方法名是否是$echo,并且参数为一个,如果符合,向消费者返回我们入参,让消费者确认当前服务正常。
改造
首先确认我们的需求,我们需要确认每个提供者是否可用,并且返回每个提供者应用的相关配置。
这边有两个点,第一,每台服务器,第二,返回相关配置。
先说下第二点,比较简单,我们直接改造EchoFilter的源码即可,原先是直接返回我们的入参,我们可以将我们的配置转换为json输出。
在我实际的开发中,我没有选择去改造EchoFilter源码,这需要将我们dubbo源码重新打包,第一,侵入性太强,第二,改坏了到时候应用全崩了。我选择的方式,新建一个项目,以插件包的方式让所有项目生效这个功能。
具体代码如下:
@Slf4j
@Activate(group = Constants.PROVIDER, order = -110001)
public class JEchoFilter implements Filter{
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if(inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 ){
Properties properties = new Properties();
InputStream is = null;
try {
File file = new File("ctms.properties");
if(file.exists()){
is = new FileInputStream(file);
}else{
is = getClass().getClassLoader().getResourceAsStream("ctms.properties");
}
properties.load(is);
log.debug("ctms.properties加载成功");
} catch (Exception e) {
log.error("找不到ctms.properties对应配置文件",e);
}finally {
IOUtils.closeQuietly(is);
}
String zookeeperUrl =(String) properties.get("zookeeper.uri");
//disconf配置从环境变量取
String disconfServer = System.getProperty("disconf.conf_server_host");
String version = System.getProperty("publish.version");
Map<String,Object> result = new HashMap<>();
result.put("interface",invoker.getUrl().getServiceInterface());
result.put("address",invoker.getUrl().getAddress());
result.put("message",inv.getArguments()[0]);
result.put("version",version);
result.put("zookeeper",zookeeperUrl);
result.put("disconfServer",disconfServer);
try {
return new RpcResult(JSON.json(result));
} catch (IOException e) {
log.error("JSON序列化失败",e);
return new RpcResult(inv.getArguments()[0]);
}
}
return invoker.invoke(inv);
}
}
设置order = -110001,让我的JEchoFilter在EchoFilter之前执行,相当于屏蔽了EchoFilter。
要让这个Filter生效,我们还需要在classpath下的META-INF/dubbo新建一个com.alibaba.dubbo.rpc.Filter文件,内容为jecho=com.xxx.xxx.dubbo.filter.JEchoFilter
这就是dubbo SPI。
再来说下第一点的实现。
通过一开始dubbo文档中的调用方式,其实我们调用的是提供者中的一台服务器。也许你认为可以通过Cluster的BroadCast模式进行调用,但是这种模式只会返回最后一个成功调用者的结果,之前的结果会被忽略,满足不了我们的需求。代码如下
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List)invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker: invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}
所以我们就需要另辟蹊径了。通过Dubbo引用服务的源码分析,我们可以知道,我们只要获取了所有提供者的url,然后我们再使用protocol的refer方法将url转换为invoker,接下去使用ProxyFactory把invoker转换为proxy,然后就能调用了。
所以现在的关键点是,获取所有提供者的urls。通过Registry模块的分析,我们可以通过RegistryService的subscribe对zookeeper的dubbo路径进行监听,每次都会全量返回节点数据。取得所有提供者的urls是轻轻松松的。
那么怎么获取RegistryService呢。
我们可以通过
<dubbo:reference id="registryService" interface="com.alibaba.dubbo.registry.RegistryService" check="false" />
来获取。
Dubbo框架会自动帮我们生成这个服务。源码在RegistryProtocl的refer方法内
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
在得到registryService后,我们可以通过实现NotifyListener接口,并且进行向registryService进行subscribe来获取全量的providers。
我是怎么得到这个思路的。因为我机制的想到了我想获取所有providers这个功能,dubboamin里面不是有实现么。所有看了dubboamin的源码之后,我发现它这么做了。相当的巧妙。
NotifyListener的notify接口实现如下
// 收到的通知对于 ,同一种类型数据(override、subcribe、route、其它是Provider),同一个服务的数据是全量的
public void notify(List<URL> urls) {
if(urls == null || urls.isEmpty()) {
return;
}
// Map<category, Map<servicename, Map<Long, URL>>>
final Map<String, Map<String, Map<Long, URL>>> categories = new HashMap<String, Map<String, Map<Long, URL>>>();
for(URL url : urls) {
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY);
if(Constants.EMPTY_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 注意:empty协议的group和version为*
ConcurrentMap<String, Map<Long, URL>> services = registryCache.get(category);
if(services != null) {
String group = url.getParameter(Constants.GROUP_KEY);
String version = url.getParameter(Constants.VERSION_KEY);
// 注意:empty协议的group和version为*
if (! Constants.ANY_VALUE.equals(group) && ! Constants.ANY_VALUE.equals(version)) {
services.remove(url.getServiceKey());
} else {
for (Map.Entry<String, Map<Long, URL>> serviceEntry : services.entrySet()) {
String service = serviceEntry.getKey();
if (Tool.getInterface(service).equals(url.getServiceInterface())
&& (Constants.ANY_VALUE.equals(group) || StringUtils.isEquals(group, Tool.getGroup(service)))
&& (Constants.ANY_VALUE.equals(version) || StringUtils.isEquals(version, Tool.getVersion(service)))) {
services.remove(service);
}
}
}
}
} else {
Map<String, Map<Long, URL>> services = categories.get(category);
if(services == null) {
services = new HashMap<String, Map<Long,URL>>();
categories.put(category, services);
}
String service = url.getServiceKey();
Map<Long, URL> ids = services.get(service);
if(ids == null) {
ids = new HashMap<Long, URL>();
services.put(service, ids);
}
ids.put(ID.incrementAndGet(), url);
}
}
for(Map.Entry<String, Map<String, Map<Long, URL>>> categoryEntry : categories.entrySet()) {
String category = categoryEntry.getKey();
ConcurrentMap<String, Map<Long, URL>> services = registryCache.get(category);
if(services == null) {
services = new ConcurrentHashMap<String, Map<Long,URL>>();
registryCache.put(category, services);
}
services.putAll(categoryEntry.getValue());
}
}
上面的代码全是从dubboadmin拷贝而来,以后有其他需求我们可以参照这个实现。
在获取了所有provider的url之后,就简单了。针对每个生成代理进行调用即可。代码如下
@RequestMapping("/check")
public List<DubboApplicationStatus> checkDubboService() throws ParseException {
List<DubboApplicationStatus> result = new ArrayList<>();
ConcurrentMap<String, ConcurrentMap<String, Map<Long, URL>>> cache = registryServerSync.getRegistryCache();
ConcurrentMap<String, Map<Long, URL>> totalProviders = cache.get("providers");
//Map<Long, URL> providers = totalProviders.get(TtmsService.class.getName());
for (Map.Entry<String, Map<Long, URL>> provides : totalProviders.entrySet()) {
for (Map.Entry<Long, URL> provider : provides.getValue().entrySet()) {
if("com.alibaba.dubbo.monitor.MonitorService".equals(provider.getValue().getServiceKey())){
continue;
}
URL invokerUrl = provider.getValue();
String interfaceName = provider.getValue().getServiceInterface();
//.addParameter("timeout", 30000);
String response = null;
try {
//Class clazz = this.getClass().getClassLoader().loadClass(interfaceName);
Invoker invoker = DubboProtocol.getDubboProtocol().refer(FakeInterface.class, invokerUrl);
EchoService echoService = (EchoService) proxyFactory.getProxy(invoker);
response = (String) echoService.$echo("OK");
log.info(response);
if ("OK".equals(response)) {
DubboApplicationStatus ttmsServiceStatus = DubboApplicationStatus
.builder()
.name(provider.getValue().getServiceKey())
.ip(provider.getValue().getAddress())
.description("Dubbo服务不是最新版本")
.build();
result.add(ttmsServiceStatus);
continue;
}
JSONObject jsonObject = (JSONObject) JSON.parse((String) echoService.$echo("OK"));
DubboApplicationStatus ttmsServiceStatus = DubboApplicationStatus
.builder()
.name(provider.getValue().getServiceKey())
.ip(provider.getValue().getAddress())
.disconfServer((String) jsonObject.get("disconfServer"))
.version((String) jsonObject.get("version"))
.zookeeperUrl((String) jsonObject.get("zookeeper"))
.build();
result.add(ttmsServiceStatus);
} catch (Exception ex) {
log.error("dubbo服务调用调用失败", ex);
DubboApplicationStatus ttmsServiceStatus = DubboApplicationStatus
.builder()
.name(provider.getValue().getServiceKey())
.ip(provider.getValue().getAddress())
.description("Dubbo服务调用失败")
.build();
result.add(ttmsServiceStatus);
}
}
}
log.info("接口请求完毕");
return result;
}
上面的代码会对zookeeper中注册的每个provider进行回声测试,看代码是很头疼的,这个项目我也放在我github上面了,大家可以学习下。
项目地址(https://github.com/shengchaojie/dubbo-healthcheck)
总结
通过这次实践,证明看源码还是有好处的。你没看过源码,如果要对应用做监控,难道每个应用都去起一个http端口?这个项目只是针对dubbo提供者的。消费者那是真的通过起http服务实现的。不过我也通过Servert3.0的注解做成了插件式接口,只要依赖了我这个包,这些都会自动实现。
最后
希望大家关注下我的公众号
image