dubbo源码系列之filter的今世
2017-11-30 本文已影响17人
3c69b7c624d9
上一篇描述了ExtensionLoader加载spi以及wrapper的过程。
本篇描述一下整个filter执行链。
filter分为两种
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
默认使用两种group对应服务端和客户端(分别是provider和consumer)
找到对应group的extension
/**
* Get activate extensions.
*
* @see com.alibaba.dubbo.common.extension.Activate
* @param url url
* @param values extension point names
* @param group group
* @return extension list which are activated
*/
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
private boolean isMatchGroup(String group, String[] groups) {
if (group == null || group.length() == 0) {
return true;
}
if (groups != null && groups.length > 0) {
for (String g : groups) {
if (group.equals(g)) {
return true;
}
}
}
return false;
}
涉及到了一个新的注解 Activate
/**
* Activate
* <p />
* 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
* 比如,过滤扩展,有多个实现,使用Activate Annotation的扩展可以根据条件被自动加载。
* <ol>
* <li>{@link Activate#group()}生效的Group。具体的有哪些Group值由框架SPI给出。
* <li>{@link Activate#value()}在{@link com.alibaba.dubbo.common.URL}中Key集合中有,则生效。
* </ol>
*
* <p />
* 底层框架SPI提供者通过{@link com.alibaba.dubbo.common.extension.ExtensionLoader}的{@link ExtensionLoader#getActivateExtension}方法
* 获得条件的扩展。
*
* @author william.liangf
* @author ding.lid
* @export
* @see SPI
* @see ExtensionLoader
* @see ExtensionLoader#getActivateExtension(com.alibaba.dubbo.common.URL, String[], String)
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};
/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p />
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};
/**
* 排序信息,可以不提供。
*/
String[] before() default {};
/**
* 排序信息,可以不提供。
*/
String[] after() default {};
/**
* 排序信息,可以不提供。
*/
int order() default 0;
}
在ExtensionLoader做loadFile的时候同时会Activate的注解也放入map中,根据对应Activate的注解来确定是否应用在指定的调用链上
上述代码可以得出结论当Activate的value为空是此时将不会过滤,排序字段有after before 和order字段共通完成,group字段指明出现的调用链(空表示不过滤否则需要和名称匹配)
具体排序使用Comparator进行排序,决定了调用链的顺序
public class ActivateComparator implements Comparator<Object> {
public static final Comparator<Object> COMPARATOR = new ActivateComparator();
public int compare(Object o1, Object o2) {
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) {
return -1;
}
if (o2 == null) {
return 1;
}
if (o1.equals(o2)) {
return 0;
}
Activate a1 = o1.getClass().getAnnotation(Activate.class);
Activate a2 = o2.getClass().getAnnotation(Activate.class);
if ((a1.before().length > 0 || a1.after().length > 0
|| a2.before().length > 0 || a2.after().length > 0)
&& o1.getClass().getInterfaces().length > 0
&& o1.getClass().getInterfaces()[0].isAnnotationPresent(SPI.class)) {
ExtensionLoader<?> extensionLoader = ExtensionLoader.getExtensionLoader(o1.getClass().getInterfaces()[0]);
if (a1.before().length > 0 || a1.after().length > 0) {
String n2 = extensionLoader.getExtensionName(o2.getClass());
for (String before : a1.before()) {
if (before.equals(n2)) {
return -1;
}
}
for (String after : a1.after()) {
if (after.equals(n2)) {
return 1;
}
}
}
if (a2.before().length > 0 || a2.after().length > 0) {
String n1 = extensionLoader.getExtensionName(o1.getClass());
for (String before : a2.before()) {
if (before.equals(n1)) {
return 1;
}
}
for (String after : a2.after()) {
if (after.equals(n1)) {
return -1;
}
}
}
}
int n1 = a1 == null ? 0 : a1.order();
int n2 = a2 == null ? 0 : a2.order();
return n1 > n2 ? 1 : -1; // 就算n1 == n2也不能返回0,否则在HashSet等集合中,会被认为是同一值而覆盖
}
}
获取了Filter此时通过层层嵌套调用完成调用链的建立
此处以CacheFilter作为样例进行详述
<table>
<tbody>
<tr>
<td> </td>
</tr>
</tbody>
</table>
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;
public void setCacheFactory(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
}
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
if (cache != null) {
String key = StringUtils.toArgumentString(invocation.getArguments());
if (cache != null && key != null) {
Object value = cache.get(key);
if (value != null) {
return new RpcResult(value);
}
Result result = invoker.invoke(invocation);
if (! result.hasException()) {
cache.put(key, result.getValue());
}
return result;
}
}
}
return invoker.invoke(invocation);
}
}
从上文描述可知,CacheFilter是同时作用在客户端和服务端,并且必须在URL中存在Cache的key才会自动激活。可以参考dubbo缓存代码分析
因此我们自定义Filter时可以根据Activate来定义调用链
先在文件夹META-INF/dubbo定义spi文件
com.alibaba.dubbo.rpc.Filter
clientInfoConsumer=com.air.tqb.dubbo.filter.ClientInfoConsumerFilter
package com.air.tqb.dubbo.filter;
import com.air.tqb.rmi.clientInfo.ClientInfo;
import com.air.tqb.rmi.clientInfo.ClientInfoRemoteInvocationFilter;
import com.air.tqb.rmi.clientInfo.RemoteInvocationCallback;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.json.JSON;
import com.alibaba.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Created by qixiaobo on 2017/6/16.
*/
@Activate(group = {Constants.CONSUMER})
public class ClientInfoConsumerFilter implements Filter, ClientInfoRemoteInvocationFilter {
private ThreadLocal<ClientInfo> clientInfoTL = new ThreadLocal<>();
private RemoteInvocationCallback remoteInvocationCallback;
private String from;
private static Logger logger = LoggerFactory.getLogger(ClientInfoConsumerFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
remoteInvocationCallback.beforeCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
if (getClientInfo() != null) {
ClientInfo info = getClientInfo();
if (from != null && info.getFrom() == null) {
info.setFrom(from);
}
try {
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setAttachment(CLIENT_INFO, JSON.json(info));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
clientInfoTL.remove();
}
remoteInvocationCallback.afterCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
return invoker.invoke(invocation);
}
@Override
public ClientInfo getClientInfo() {
return clientInfoTL.get();
}
@Override
public void setClientInfo(ClientInfo clientInfo) {
clientInfoTL.set(clientInfo);
}
@Override
public RemoteInvocationCallback getRemoteInvocationCallback() {
return remoteInvocationCallback;
}
@Override
public void setRemoteInvocationCallback(RemoteInvocationCallback remoteInvocationCallback) {
this.remoteInvocationCallback = remoteInvocationCallback;
}
@Override
public String getFrom() {
return from;
}
@Override
public void setFrom(String from) {
this.from = from;
}
}
这样只要加载了这个jar那么久自然实现了扩展点的启用,在Filter上修改了调用链