DUBBO-组件 Invocation,Invoker,Dire

2018-11-03  本文已影响95人  C_99f1

dubbo-重要组件

/**  封装请求实体类的信息和参数 包含Invoker
 * Invocation. (API, Prototype, NonThreadSafe)
 */
public interface Invocation {
    String getMethodName();
    Class<?>[] getParameterTypes();
    Object[] getArguments();
    Map<String, String> getAttachments();
    String getAttachment(String key);
    String getAttachment(String key, String defaultValue);
    Invoker<?> getInvoker();
}

/**  Invocation 基础实现类
 * RpcInvocation. 
 */
public class RpcInvocation implements Invocation, Serializable {
   private String methodName;
   private Class<?>[] parameterTypes;
   private Object[] arguments;
   private Map<String, String> attachments;
   private transient Invoker<?> invoker;
   public RpcInvocation() 
   public RpcInvocation(Invocation invocation, Invoker<?> invoker) 
   public RpcInvocation(Invocation invocation)
   public RpcInvocation(Method method, Object[] arguments)
   public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment) 
   public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments) 
   public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] 
   arguments,Map<String,String> attachments) 
   public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, 
   String> attachments, Invoker<?> invoker) 
}


/**  RpcInvocation 基础实现类 增加一个功能decode  decode请求实体
 * RpcInvocation. 
 */
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {

    private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);
    private Channel channel;
    private byte serializationType;
    private InputStream inputStream;
    private Request request;
    private volatile boolean hasDecoded;
    public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id)   
    public void decode() throws Exception { }
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException 
     
}




/**  
 *一个节点
 */
interface Node extends Node
     URL getUrl(); 
     boolean isAvailable();
     void destroy();

/**  
 *Invoker   =  protocol  ref(Class<T> type, URL url)
 */
interface Invoker{
    Class getInterface();
    Result invoke(Invocation invocation)throws RpcException;
}
/**  
 *Invoker 基础实现类
 */
abstract class AbstractInvoker  implements Invoker<T>   
    public AbstractInvoker(Class<T> type, URL url)
    public AbstractInvoker(Class<T> type, URL url, String[] keys)
    public AbstractInvoker(Class<T> type, URL url, Map<String, String> attachment) 
   private static Map<String, String> convertAttachment(URL url, String[] keys) { }
    @Override
    public Class<T> getInterface() 
    @Override
    public URL getUrl() {   }
    @Override
    public boolean isAvailable() { }
    protected void setAvailable(boolean available)
    @Override
    public void destroy() {  }
    public boolean isDestroyed() {return destroyed.get();    }
   @Override
    public String toString() {  return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString());    }
    @Override
    public Result invoke(Invocation inv) throws RpcException { return doInvoke(invocation);    }
    protected abstract Result doInvoke(Invocation invocation) throws Throwable;

 


/**  
 *  包装了URL和Invoker
 */
public class InvokerWrapper<T> implements Invoker<T>  
    private final Invoker<T> invoker;
    private final URL url;
    public InvokerWrapper(Invoker<T> invoker, URL url) 
    @Override
    public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }


/**  
 *  包装了URL和Invoker
 */
public static class InvokerDelegete<T> extends InvokerWrapper<T>
        private final Invoker<T> invoker;
        public InvokerDelegete(Invoker<T> invoker, URL url) {
            super(invoker, url);
            this.invoker = invoker;
        }
        public Invoker<T> getInvoker() {  if (invoker instanceof InvokerDelegete) {
               return ((InvokerDelegete<T>) invoker).getInvoker();
        } else {
                return invoker;}}
   

/**  
 *  集成了AbstractInvoker  AbstractInvoker父类会被调用invoke(子类doinvoke)
 *   protocol ref- getClients(url)  会调用netty 连接服务端
 */
public class DubboInvoker<T> extends AbstractInvoker<T> {
    private final ExchangeClient[] clients;
    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    private final String version;
    private final ReentrantLock destroyLock = new ReentrantLock();
    private final Set<Invoker<?>> invokers;
    public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
        this(serviceType, url, clients, null);}

    public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
        super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});}

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
    }
}



/**  
 *  继承Invoker   AbstractInvoker父类会被调用invoke(子类doinvoke)
 *  
 */
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
    protected final Directory<T> directory;
    private volatile Invoker<T> stickyInvoker = null;

    public AbstractClusterInvoker(Directory<T> directory) {
        this(directory, directory.getUrl());
    }

    public AbstractClusterInvoker(Directory<T> directory, URL url) {
    }
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException 
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException 
    private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
            throws RpcException 

    public Result invoke(final Invocation invocation) throws RpcException {
        return doInvoke(invocation, invokers, loadbalance);    }

  
    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }
}









/**  
 *  继承AbstractClusterInvoker  AbstractClusterInvoker父类会被调用 invoke( doinvoke)
 *  
 */
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
        }
     
    }

}





/**  
 * Directory -目录   
 *  
 */
public interface Directory<T> extends Node {

    /**
     * get service type.
     *
     * @return service type.
     */
    Class<T> getInterface();

    /**
     * list invokers.     
     *
     * @return invokers
     */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

}




/**  
 * AbstractDirectory继承Directory
 *  
 */
public abstract class AbstractDirectory<T> implements Directory<T> {
    private final URL url;
   private volatile boolean destroyed = false;
   private volatile URL consumerUrl;
   private volatile List<Router> routers;
  public AbstractDirectory(URL url) {
        this(url, null);
    } public AbstractDirectory(URL url, List<Router> routers) {
        this(url, url, routers);
    }public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
        if (url == null)
            throw new IllegalArgumentException("url == null");
        this.url = url;
        this.consumerUrl = consumerUrl;
        setRouters(routers);
    }

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
            }
        }
        return invokers;
    }

    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

}


public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
   private static final ConfiguratorFactory configuratorFactory = 
   ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
    private final String serviceKey; // Initialization at construction time, assertion not null
    private final Class<T> serviceType; // Initialization at construction time, assertion not null
    private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
    private final URL directoryUrl; 
    private final String[] serviceMethods;
    private final boolean multiGroup;
    private Protocol protocol; // Initialization at the time of injection, the assertion is not null
    private Registry registry; // Initialization at the time of injection, the assertion is not null
    private volatile boolean forbidden = false;
    private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference


    private volatile Map<String, Invoker<T>> urlInvokerMap; 

    private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the   midway may be assigned to null, please use the local variable reference
    private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference

    public RegistryDirectory(Class<T> serviceType, URL url) {
        super(url);
        if (serviceType == null)
            throw new IllegalArgumentException("service type is null.");
        if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
            throw new IllegalArgumentException("registry serviceKey is null.");
        this.serviceType = serviceType;
        this.serviceKey = url.getServiceKey();
        this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
        String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
        this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
        String methods = queryMap.get(Constants.METHODS_KEY);
        this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
    }


    public static List<Configurator> toConfigurators(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return Collections.emptyList();
        }

        List<Configurator> configurators = new ArrayList<Configurator>(urls.size());
        for (URL url : urls) {
            if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
                configurators.clear();
                break;
            }
            Map<String, String> override = new HashMap<String, String>(url.getParameters());
            //The anyhost parameter of override may be added automatically, it can't change the judgement of changing url
            override.remove(Constants.ANYHOST_KEY);
            if (override.size() == 0) {
                configurators.clear();
                continue;
            }
            configurators.add(configuratorFactory.getConfigurator(url));
        }
        Collections.sort(configurators);
        return configurators;
    }

    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }

 

 
    @Override
    public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                      setRouters(routers);
            }
        }

         List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers
        refreshInvoker(invokerUrls);
    }


    /**
     * Turn urls into invokers, and if url has been refer, will not re-reference.
     *
     * @param urls
     * @return invokers
     */
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }



    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
        }
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
    
}


本篇总结 -dubbo一个底层通信框架代码架构包含很多的包装设计模式和相互依赖 下面简述一下各个组件的作用和依赖关系

Invocation-RpcInvocation -DecodeableRpcInvocation
  1. Invocation 是一个请求实体信息接口 包含了调用的方法 参数等等基础信息, RpcInvocation是invocation的基础实现类 ,DecodeableRpcInvocation 主要多了一个功能decode方法 用于在服务端接收请求的时候把字节反序列到本身的对象的字段上 变成一个 readable的对象 ,同时Invocation又包含invoke 通常invoke 不被序列化
Invoker-AbstractInvoker -DubboInvoker

2.dubbo之间的相互引用特别多也很复杂 只要记清楚invoker 是需要调用服务target类的一个封装 由框架封装
AbstractInvoker 在invoke方法里面调用子类的doinvoke方法
for example dubboInvoke 继承了AbstractInvoker 所以dobboinvoke会有invoke方法 调用invoke的时候会
调子类的doinvoker doinvoke 会在当前invoke 使用 ExchangeClient 进行远程调用 返回一个result

Directory -AbstractDirectory-RegistryDirectory

目录 也可以叫字典 ,和上方同样的设计模式, RegistryDirectory里面包含一个map Map<String, List<Invoker<T>>> methodInvokerMap 通过invocation 参数筛选出合适的invoke

上一篇下一篇

猜你喜欢

热点阅读