探究 dubbo接口超时时间的赋值逻辑,包含Consumer,P

2019-10-28  本文已影响0人  xiangR

源码

   
    <bean id="innerStockServiceExport" class="com.alibaba.dubbo.config.spring.ServiceBean">
        <property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
        <property name="ref" ref="innerStockServiceImpl"/>
        <property name="application" ref="dubboApplicationConfig"/>
        <property name="registry" ref="dubboRegistryConfig"/>
        <property name="protocol" ref="dubboProtocolConfig"/>
        <property name="version" value="${dubbo.reference.version}"/>
        <property name="timeout" value="${dubbo.export.timeout}"/>
        <property name="retries" value="0"/>
        <property name="methods">
            <list>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="initVirtualStock"/>
                    <property name="timeout" value="4000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="batchFreezeStock"/>
                    <property name="timeout" value="6000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="batchCancelStock"/>
                    <property name="timeout" value="6000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="batchConsumeStock"/>
                    <property name="timeout" value="6000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="cancelForConsumedOrder"/>
                    <property name="timeout" value="30000"/>
                </bean>
            </list>
        </property>
    </bean>

    <bean id="innerStockService" class="com.alibaba.dubbo.config.spring.ReferenceBean">
        <property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
        <property name="application" ref="dubboApplicationConfig"/>
        <property name="registry" ref="dubboRegistryConfig"/>
        <property name="timeout" value="3000"/>
        <property name="check" value="false"/>
        <property name="version" value="${dubbo.reference.version}"/>
        <property name="methods">
            <list>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="batchFreezeStock"/>
                    <property name="timeout" value="16000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="splitFreezeStock"/>
                    <property name="timeout" value="6000"/>
                </bean>
                <bean class="com.alibaba.dubbo.config.MethodConfig">
                    <property name="name" value="finishStockIn"/>
                    <property name="timeout" value="16000"/>
                </bean>
            </list>
        </property>
    </bean>

加载本地的配置信息构建 URL

    com.alibaba.dubbo.config.ReferenceConfig#init

    private void init() {
        if (initialized) {
            return;
        }
        initialized = true;

        ..... 省略一部分

        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        String prifix = StringUtils.getServiceKey(map);

        // 这里拿到本地 consumer.xml 中 MethodConfig 的信息初始化 key: method.timeout, value: 毫秒数到 map
        if (methods != null && methods.size() > 0) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                appendAttributes(attributes, method, prifix + "." + method.getName());
                checkAndConvertImplicitConfig(method, map, attributes);
            }
        }
        //attributes通过系统context进行存储.
        StaticContext.getSystemContext().putAll(attributes);
        ref = createProxy(map);
    }
    com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl

    /**
     * 合并url参数 顺序为override > -D >Consumer > Provider
     * @param providerUrl
     * @param overrides
     * @return
     */
    private URL mergeUrl(URL providerUrl){
        // 其实我们关注的就是这一部分
        providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
        
        List<Configurator> localConfigurators = this.configurators; // local reference
        if (localConfigurators != null && localConfigurators.size() > 0) {
            for (Configurator configurator : localConfigurators) {
                providerUrl = configurator.configure(providerUrl);
            }
        }
        
        providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // 不检查连接是否成功,总是创建Invoker!
        
        ... 省略一部分

        return providerUrl;
    }

  
    public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
        Map<String, String> map = new HashMap<String, String>();
        Map<String, String> remoteMap = remoteUrl.getParameters();
        
        
        if (remoteMap != null && remoteMap.size() > 0) {
            // 先put remote
            map.putAll(remoteMap);
            
            //线程池配置不使用提供者的
            map.remove(Constants.THREAD_NAME_KEY);
            map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);
            ... 省略一部分
        }
        
        if (localMap != null && localMap.size() > 0) {
            // 再put local          
            map.putAll(localMap);
        }
   
            ... 省略一部分      
        }

        return remoteUrl.clearParameters().addParameters(map);
    }

获取 timeout 的地方

// 重点就是这句话
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke    

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    public int getMethodParameter(String method, String key, int defaultValue) {
        String methodKey = method + "." + key;
        Number n = getNumbers().get(methodKey);
        if (n != null) {
            return n.intValue();
        }
        String value = getMethodParameter(method, key);
        if (value == null || value.length() == 0) {
            return defaultValue;
        }
        int i = Integer.parseInt(value);
        getNumbers().put(methodKey, i);
        return i;
    }

上一篇下一篇

猜你喜欢

热点阅读