Motan refer 处理流程

2016-11-20  本文已影响0人  _流浪的猫_

Motan refer 简要处理流程

// 调用本类的私有方法refer
Object value = refer(reference, method.getParameterTypes()[0]);
// refer方法中调用RefererConfigBean父类的getRef方法
return referenceConfig.getRef();
// getRef()只是做了个检测,主要还在其调用的initRef方法中
// 先获取 SimpleConfigHandler
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
// 然后创建集群支持
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);
// 最后通过configHandler完成refer操作
// 这里的proxy是代理方式,Motan好像只支持jdk的代理,所以proxy的值基本上就是 jdk了
ref = configHandler.refer(interfaceClass, clusters, proxy);
// 对于createClusterSupport中一大片代码为了获取 regUrl,实际处理还是在configHandler中
return configHandler.buildClusterSupport(interfaceClass, regUrls);
// SimpleConfigHandler中的处理
public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
        // 创建集群支持类示例
        ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);
        // 初始化
        clusterSupport.init();
        return clusterSupport;
    }
// clusterSupport.init() 处理
// 先是获取集群、负载均衡、HA策略,默认配置获取的是:
// ClusterSpi  ActiveWeightLoadBalance FailoverHaStrategy
prepareCluster();
// 获取url motan://192.168.3.13:0/com.weibo.motan.demo.service.MotanDemoService?group=motan-demo-rpc
URL subUrl = toSubscribeUrl(url);
// 然后注册
// client 注册自己,同时订阅service列表
// 这里是通过ZookeeperRegistryFactory获取ZookeeperRegistry (Registry接口的实现类之一)
Registry registry = getRegistry(ru); 
// 开始订阅
registry.subscribe(subUrl, this);
// 获取注册url:zookeeper://127.0.0.1:2181/com.weibo.api.motan.registry.RegistryService
String registryUri = getRegistryUri(url);
// 如果不存在则通过ZookeeperRegistryFactory createRegistry来创建注册中心
// 这里同时创建了ZkClient
ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout);
// 创建并返回zk注册中心
return new ZookeeperRegistry(registryUrl, zkClient);
// 创建注册中心时,注册状态变化监听事件
IZkStateListener zkStateListener = new IZkStateListener() {
     @Override
     public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        // do nothing
    }

    @Override
    public void handleNewSession() throws Exception {
        LoggerUtil.info("zkRegistry get new session notify.");
        reconnectService();
        reconnectClient();
    }
};
zkClient.subscribeStateChanges(zkStateListener);
// url:motan://192.168.3.13:0/com.weibo.motan.demo.service.MotanDemoService?group=motan-demo-rpc
    protected void doSubscribe(URL url, final NotifyListener listener) {
        LoggerUtil.info("CommandFailbackRegistry subscribe. url: " + url.toSimpleString());
        URL urlCopy = url.createCopy();
        CommandServiceManager manager = getCommandServiceManager(urlCopy);
        manager.addNotifyListener(listener);
// 这里订阅节点变化事件:即服务节点(zkClient.subscribeChildChanges(serverTypePath, zkChildListener))
// /motan/motan-demo-rpc/com.weibo.motan.demo.service.MotanDemoService/server
        subscribeService(urlCopy, manager);
//  这里订阅数据变化事件:即命令节点(zkClient.subscribeDataChanges(commandPath, zkDataListener);)
// /motan/motan-demo-rpc/command
        subscribeCommand(urlCopy, manager);
// 这里会去主动获取服务节点下的所有服务列表
// /motan/motan-demo-rpc/com.weibo.motan.demo.service.MotanDemoService/server
        List<URL> urls = doDiscover(urlCopy);
        if (urls != null && urls.size() > 0) {
            this.notify(urlCopy, listener, urls);
        }
    }
// ClusterSupport的notify方法
// refer处理,这里的protocol被ProtocolFilterDecorator包装了一层,实际是DefaultRpcProtocol
referer = protocol.refer(interfaceClass, refererURL, u);
// DefaultRpcProtocol 中创建createReferer
return new DefaultRpcReferer<T>(clz, url, serviceUrl);
// 下面就是传输层的处理了
// 在DefaultRpcReferer中获取NettyEndpointFactory,并创建NettyClient客户端
// 然后执行referer(DefaultRpcReferer)初始化
// 调用client.open(); 完成NettyClient的各种配置初始化等
    @Override
    public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
    // Motan这里只有JdkProxyFactory
    ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
    // 通过JdkProxyFactory获取代理
    return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
    }
// 获取代理处理
    private void init() {
        // 这里获取一个开关服务,用于在invoke代理时实现降级功能,实现类LocalSwitcherService
        String switchName =
            this.clusters.get(0).getUrl().getParameter(URLParamType.switcherService.getName(), URLParamType.switcherService.getValue());
        switcherService = ExtensionLoader.getExtensionLoader(SwitcherService.class).getExtension(switchName);
    }

简单示意图

client.png
上一篇下一篇

猜你喜欢

热点阅读