dubbo源码系列之filter的今世

2017-11-30  本文已影响17人  3c69b7c624d9

上一篇描述了ExtensionLoader加载spi以及wrapper的过程。

本篇描述一下整个filter执行链。

filter分为两种

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
     
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

默认使用两种group对应服务端和客户端(分别是provider和consumer)

找到对应group的extension

    /**
     * Get activate extensions.
     *
     * @see com.alibaba.dubbo.common.extension.Activate
     * @param url url
     * @param values extension point names
     * @param group group
     * @return extension list which are activated
     */
    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            getExtensionClasses();
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Activate activate = entry.getValue();
                if (isMatchGroup(group, activate.group())) {
                    T ext = getExtension(name);
                    if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                            && isActive(activate, url)) {
                        exts.add(ext);
                    }
                }
            }
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i ++) {
           String name = names.get(i);
            if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                  && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
               if (Constants.DEFAULT_KEY.equals(name)) {
                  if (usrs.size() > 0) {
                  exts.addAll(0, usrs);
                  usrs.clear();
                  }
               } else {
               T ext = getExtension(name);
               usrs.add(ext);
               }
            }
        }
        if (usrs.size() > 0) {
           exts.addAll(usrs);
        }
        return exts;
    }
     
    private boolean isMatchGroup(String group, String[] groups) {
        if (group == null || group.length() == 0) {
            return true;
        }
        if (groups != null && groups.length > 0) {
            for (String g : groups) {
                if (group.equals(g)) {
                    return true;
                }
            }
        }
        return false;
    }

涉及到了一个新的注解 Activate

    /**
     * Activate
     * <p />
     * 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
     * 比如,过滤扩展,有多个实现,使用Activate Annotation的扩展可以根据条件被自动加载。
     * <ol>
     * <li>{@link Activate#group()}生效的Group。具体的有哪些Group值由框架SPI给出。
     * <li>{@link Activate#value()}在{@link com.alibaba.dubbo.common.URL}中Key集合中有,则生效。
     * </ol>
     *
     * <p />
     * 底层框架SPI提供者通过{@link com.alibaba.dubbo.common.extension.ExtensionLoader}的{@link ExtensionLoader#getActivateExtension}方法
     * 获得条件的扩展。
     *
     * @author william.liangf
     * @author ding.lid
     * @export
     * @see SPI
     * @see ExtensionLoader
     * @see ExtensionLoader#getActivateExtension(com.alibaba.dubbo.common.URL, String[], String)
     */
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE, ElementType.METHOD})
    public @interface Activate {
        /**
         * Group过滤条件。
         * <br />
         * 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
         * <br />
         * 如没有Group设置,则不过滤。
         */
        String[] group() default {};
     
        /**
         * Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
         * <p />
         * 示例:<br/>
         * 注解的值 <code>@Activate("cache,validatioin")</code>,
         * 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
         * <br/>
         * 如没有设置,则不过滤。
         */
        String[] value() default {};
     
        /**
         * 排序信息,可以不提供。
         */
        String[] before() default {};
     
        /**
         * 排序信息,可以不提供。
         */
        String[] after() default {};
     
        /**
         * 排序信息,可以不提供。
         */
        int order() default 0;
    }

在ExtensionLoader做loadFile的时候同时会Activate的注解也放入map中,根据对应Activate的注解来确定是否应用在指定的调用链上

上述代码可以得出结论当Activate的value为空是此时将不会过滤,排序字段有after before 和order字段共通完成,group字段指明出现的调用链(空表示不过滤否则需要和名称匹配)

具体排序使用Comparator进行排序,决定了调用链的顺序

    public class ActivateComparator implements Comparator<Object> {
         
        public static final Comparator<Object> COMPARATOR = new ActivateComparator();
     
        public int compare(Object o1, Object o2) {
            if (o1 == null && o2 == null) {
                return 0;
            }
            if (o1 == null) {
                return -1;
            }
            if (o2 == null) {
                return 1;
            }
            if (o1.equals(o2)) {
                return 0;
            }
            Activate a1 = o1.getClass().getAnnotation(Activate.class);
            Activate a2 = o2.getClass().getAnnotation(Activate.class);
            if ((a1.before().length > 0 || a1.after().length > 0 
                    || a2.before().length > 0 || a2.after().length > 0)
                    && o1.getClass().getInterfaces().length > 0
                    && o1.getClass().getInterfaces()[0].isAnnotationPresent(SPI.class)) {
                ExtensionLoader<?> extensionLoader = ExtensionLoader.getExtensionLoader(o1.getClass().getInterfaces()[0]);
                if (a1.before().length > 0 || a1.after().length > 0) {
                    String n2 = extensionLoader.getExtensionName(o2.getClass());
                    for (String before : a1.before()) {
                        if (before.equals(n2)) {
                            return -1;
                        }
                    }
                    for (String after : a1.after()) {
                        if (after.equals(n2)) {
                            return 1;
                        }
                    }
                }
                if (a2.before().length > 0 || a2.after().length > 0) {
                    String n1 = extensionLoader.getExtensionName(o1.getClass());
                    for (String before : a2.before()) {
                        if (before.equals(n1)) {
                            return 1;
                        }
                    }
                    for (String after : a2.after()) {
                        if (after.equals(n1)) {
                            return -1;
                        }
                    }
                }
            }
            int n1 = a1 == null ? 0 : a1.order();
            int n2 = a2 == null ? 0 : a2.order();
            return n1 > n2 ? 1 : -1; // 就算n1 == n2也不能返回0,否则在HashSet等集合中,会被认为是同一值而覆盖
        }
     
    }

获取了Filter此时通过层层嵌套调用完成调用链的建立

此处以CacheFilter作为样例进行详述

<table>
<tbody>
<tr>
<td> </td>
</tr>
</tbody>
</table>

    @Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
    public class CacheFilter implements Filter {
     
        private CacheFactory cacheFactory;
     
        public void setCacheFactory(CacheFactory cacheFactory) {
            this.cacheFactory = cacheFactory;
        }
     
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
                Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
                if (cache != null) {
                    String key = StringUtils.toArgumentString(invocation.getArguments());
                    if (cache != null && key != null) {
                        Object value = cache.get(key);
                        if (value != null) {
                            return new RpcResult(value);
                        }
                        Result result = invoker.invoke(invocation);
                        if (! result.hasException()) {
                            cache.put(key, result.getValue());
                        }
                        return result;
                    }
                }
            }
            return invoker.invoke(invocation);
        }
     
    }

从上文描述可知,CacheFilter是同时作用在客户端和服务端,并且必须在URL中存在Cache的key才会自动激活。可以参考dubbo缓存代码分析

因此我们自定义Filter时可以根据Activate来定义调用链

先在文件夹META-INF/dubbo定义spi文件

com.alibaba.dubbo.rpc.Filter

clientInfoConsumer=com.air.tqb.dubbo.filter.ClientInfoConsumerFilter
    package com.air.tqb.dubbo.filter;
      
    import com.air.tqb.rmi.clientInfo.ClientInfo;
    import com.air.tqb.rmi.clientInfo.ClientInfoRemoteInvocationFilter;
    import com.air.tqb.rmi.clientInfo.RemoteInvocationCallback;
    import com.alibaba.dubbo.common.Constants;
    import com.alibaba.dubbo.common.extension.Activate;
    import com.alibaba.dubbo.common.json.JSON;
    import com.alibaba.dubbo.rpc.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
      
    import java.io.IOException;
      
    /**
     * Created by qixiaobo on 2017/6/16.
     */
    @Activate(group = {Constants.CONSUMER})
    public class ClientInfoConsumerFilter implements Filter, ClientInfoRemoteInvocationFilter {
       private ThreadLocal<ClientInfo> clientInfoTL = new ThreadLocal<>();
       private RemoteInvocationCallback remoteInvocationCallback;
       private String from;
      
       private static Logger logger = LoggerFactory.getLogger(ClientInfoConsumerFilter.class);
      
       @Override
       public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
      
           remoteInvocationCallback.beforeCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
           if (getClientInfo() != null) {
               ClientInfo info = getClientInfo();
               if (from != null && info.getFrom() == null) {
                   info.setFrom(from);
               }
               try {
                   RpcContext.getContext()
                           .setInvoker(invoker)
                           .setInvocation(invocation)
                           .setAttachment(CLIENT_INFO, JSON.json(info));
               } catch (IOException e) {
                   logger.error(e.getMessage(), e);
               }
               clientInfoTL.remove();
           }
           remoteInvocationCallback.afterCreateRemoteInvocation(this, invocation.getMethodName(), invocation.getArguments());
           return invoker.invoke(invocation);
       }
      
       @Override
       public ClientInfo getClientInfo() {
           return clientInfoTL.get();
       }
      
       @Override
       public void setClientInfo(ClientInfo clientInfo) {
           clientInfoTL.set(clientInfo);
       }
      
       @Override
       public RemoteInvocationCallback getRemoteInvocationCallback() {
           return remoteInvocationCallback;
       }
      
       @Override
       public void setRemoteInvocationCallback(RemoteInvocationCallback remoteInvocationCallback) {
           this.remoteInvocationCallback = remoteInvocationCallback;
       }
      
       @Override
       public String getFrom() {
           return from;
       }
      
       @Override
       public void setFrom(String from) {
           this.from = from;
       }
    }

这样只要加载了这个jar那么久自然实现了扩展点的启用,在Filter上修改了调用链

上一篇 下一篇

猜你喜欢

热点阅读