Ovirt程序员

【Ovirt 笔记】engine 与 vdsm 之间的调用分析之

2017-05-11  本文已影响7人  58bc06151329

分析整理的版本为 Ovirt 3.4.5 版本。

xml-rpcXML Remote Procedure Call 远程过程调用的分布式计算协议,通过 xml 将调用函数封装,并使用 HTTP 作为传送协议。

Ovrit 通过 xml-rpc 远程调用 vdsm-api 完成配置主机,网络和共享存储。

平台中 xml-rpc 相关的连接都封装在 XmlRpcUtils 类中。

平台中有两处地方创建了连接,分别对应了 主机SPMxml-rpcHTTP 连接。

enginexml-rpc 服务端,vdsm 为 客户端。

注册主机时,会在 VdsManager 构造方法中通过 XmlRpcUtils 类,创建 xml-rpc 连接。

private void InitVdsBroker() {

    ...... 
    
    Pair<VdsServerConnector, HttpClient> returnValue =
            XmlRpcUtils.getConnection(_vds.getHostName(),
                    _vds.getPort(),
                    clientTimeOut,
                    connectionTimeOut,
                    clientRetries,
                    VdsServerConnector.class,
                    Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication));
    _vdsProxy = new VdsServerWrapper(returnValue.getFirst(), returnValue.getSecond());
}

平台中所有针对 vdsm 的操作,都是通过 _vdsProxy 调用接口完成。

创建 xml-rpc 连接

private static <T> Pair<T, HttpClient> getHttpConnection(URL serverUrl, int clientTimeOut,
            int connectionTimeOut, int clientRetries, Class<T> type) {
    XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
    config.setServerURL(serverUrl);
    config.setConnectionTimeout(connectionTimeOut);
    config.setReplyTimeout(clientTimeOut);
    XmlRpcClient xmlRpcClient = new XmlRpcClient();
    xmlRpcClient.setConfig(config);

    XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);
    HttpClient httpclient = createHttpClient(clientRetries);
    transportFactory.setHttpClient(httpclient);
    xmlRpcClient.setTransportFactory(transportFactory);

    ClientFactory clientFactory = new ClientFactory(xmlRpcClient);
    T connector = (T) clientFactory.newInstance(Thread.currentThread().getContextClassLoader(), type, null);
    T asyncConnector = (T) AsyncProxy.newInstance(connector, clientTimeOut);

    Pair<T, HttpClient> returnValue =
            new Pair<T, HttpClient>(asyncConnector, httpclient);

    return returnValue;
}
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(serverUrl);
config.setConnectionTimeout(connectionTimeOut);
config.setReplyTimeout(clientTimeOut);
XmlRpcClient xmlRpcClient = new XmlRpcClient();
XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);
HttpClient httpclient = createHttpClient(clientRetries);
transportFactory.setHttpClient(httpclient);
xmlRpcClient.setTransportFactory(transportFactory);

如果开发者没有指定 Factory,则会采用 XmlRpcSunHttpTransportFactory 作为默认的策略工厂。

import java.net.URLConnection;
import org.apache.commons.httpclient.HttpClient;
HttpClient client = new HttpClient(new MultiThreadedHttpConnectionManager());
int retries = Config.<Integer> getValue(ConfigValues.vdsRetries);
HttpMethodRetryHandler handler = new DefaultHttpMethodRetryHandler(retries, false);
HttpClientParams parameters = client.getParams();
parameters.setParameter(HttpMethodParams.RETRY_HANDLER, handler);
T connector = (T) clientFactory.newInstance(Thread.currentThread().getContextClassLoader(), type, null);
public Object newInstance(ClassLoader pClassLoader, final Class pClass, final String pRemoteName) {
   return Proxy.newProxyInstance(pClassLoader, new Class[]{pClass}, new InvocationHandler(){
        public Object invoke(Object pProxy, Method pMethod, Object[] pArgs) throws Throwable {
            if (isObjectMethodLocal()  &&  pMethod.getDeclaringClass().equals(Object.class)) {
                return pMethod.invoke(pProxy, pArgs);
            }
            final String methodName;
            if (pRemoteName == null  ||  pRemoteName.length() == 0) {
                methodName = pMethod.getName();
            } else {
                methodName = pRemoteName + "." + pMethod.getName();
            }
            Object result;
            try {
                result = client.execute(methodName, pArgs);
            } catch (XmlRpcInvocationException e) {
                Throwable t = e.linkedException;
                if (t instanceof RuntimeException) {
                    throw t;
                }
                Class[] exceptionTypes = pMethod.getExceptionTypes();
                for (int i = 0;  i < exceptionTypes.length;  i++) {
                    Class c = exceptionTypes[i];
                    if (c.isAssignableFrom(t.getClass())) {
                        throw t;
                    }
                }
                throw new UndeclaredThrowableException(t);
            }
            TypeConverter typeConverter = typeConverterFactory.getTypeConverter(pMethod.getReturnType());
            return typeConverter.convert(result);
        }
    });
}

本地方法:

return pMethod.invoke(pProxy, pArgs);

其他情况:就是 xmlRpcClient 执行 execute 方法。

result = client.execute(methodName, pArgs);
T asyncConnector = (T) AsyncProxy.newInstance(connector, clientTimeOut);
@Override
public Object invoke(Object proxy, final Method m, final Object[] args)
        throws Throwable {
    Object result;
    FutureTask<Object> future;
    FutureCall annotation = m.getAnnotation(FutureCall.class);
    if (annotation != null) {
        future =
                new FutureTask<Object>(createCallable(obj,
                        getMethod(m, annotation, proxy),
                        args,
                        ThreadLocalParamsContainer.getCorrelationId()));
        ThreadPoolUtil.execute(future);
        return future;
    } else {
        future =
                new FutureTask<Object>(createCallable(obj,
                        m,
                        args,
                        ThreadLocalParamsContainer.getCorrelationId()));
        ThreadPoolUtil.execute(future);
        try {
            result = future.get(timeoutInMilisec, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            if (e instanceof TimeoutException) {
                future.cancel(true);
            }
            throw new UndeclaredThrowableException(e);
        }

        return result;
    }
}

二次代理除了实现异步的分布式调用,还处理了 FutureCall 注解情况。进行了 @FutureCall 注解且实现了方法 delegeteTo,那么对有该注解的方法调用真正要执行的 delegeteTo 的方法名。非注解情况的方法,需要立即执行,并且指定了超时时间。

result = future.get(timeoutInMilisec, TimeUnit.MILLISECONDS);

通过两次代理,执行远程方法,最终调用的就是 xmlRpcClientexecute(String pMethodName, Object[] pParams) 方法。

public Object execute(XmlRpcRequest pRequest) throws XmlRpcException {
    return getWorkerFactory().getWorker().execute(pRequest);
}
protected XmlRpcWorkerFactory getDefaultXmlRpcWorkerFactory() {
    return new XmlRpcClientWorkerFactory(this);
}
private int maxThreads;
public synchronized XmlRpcWorker getWorker() throws XmlRpcLoadException {
    int max = controller.getMaxThreads();
    if (max > 0  &&  numThreads == max) {
        throw new XmlRpcLoadException("Maximum number of concurrent requests exceeded: " + max);
    }
    if (max == 0) {
        return singleton;
    }
    ++numThreads;
    if (pool.size() == 0) {
        return newWorker();
    } else {
        return (XmlRpcWorker) pool.remove(pool.size() - 1);
    }
}
public synchronized void releaseWorker(XmlRpcWorker pWorker) {
    --numThreads;
    int max = controller.getMaxThreads();
    if (pWorker == singleton) {
        // Do nothing, it's the singleton
    } else {
        if (pool.size() < max) {
            pool.add(pWorker);
        }
    }
}
public Object execute(XmlRpcRequest pRequest)
            throws XmlRpcException {
    try {
        XmlRpcClient client = (XmlRpcClient) getController();
        return client.getTransportFactory().getTransport().sendRequest(pRequest);
    } finally {
        factory.releaseWorker(this);
    }
}
XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);

......

xmlRpcClient.setTransportFactory(transportFactory);
protected void initHttpHeaders(XmlRpcRequest pRequest) throws XmlRpcClientException {
    config = (XmlRpcHttpClientConfig) pRequest.getConfig();
    method = newPostMethod(config);
    super.initHttpHeaders(pRequest);
    
    if (config.getConnectionTimeout() != 0)
        client.getHttpConnectionManager().getParams().setConnectionTimeout(config.getConnectionTimeout());
    
    if (config.getReplyTimeout() != 0)
        client.getHttpConnectionManager().getParams().setSoTimeout(config.getReplyTimeout());
    
    method.getParams().setVersion(HttpVersion.HTTP_1_1);
}
protected void initHttpHeaders(XmlRpcRequest pRequest) throws XmlRpcClientException {
    XmlRpcHttpClientConfig config = (XmlRpcHttpClientConfig) pRequest.getConfig();
    setRequestHeader("Content-Type", "text/xml");
    if(config.getUserAgent() != null)
        setRequestHeader("User-Agent", config.getUserAgent());
    else
        setRequestHeader("User-Agent", getUserAgent());
    setCredentials(config);
    setCompressionHeaders(config);
}

方法参数信息的传递

_vdsProxy = new VdsServerWrapper(returnValue.getFirst(), returnValue.getSecond());
public OneVmReturnForXmlRpc create(Map createInfo) {
    try {
        Map<String, Object> xmlRpcReturnValue = vdsServer.create(createInfo);
        OneVmReturnForXmlRpc wrapper = new OneVmReturnForXmlRpc(xmlRpcReturnValue);
        return wrapper;
    } catch (UndeclaredThrowableException ute) {
        throw new XmlRpcRunTimeException(ute);
    }
}

因此通过 VDSCommand 方法,传递的有一定格式的参数信息,也就通过两次代理的方法设置 XmlRpcClient 对象中,通过 execute 方法做了封装。

public Object execute(XmlRpcClientConfig pConfig, String pMethodName, Object[] pParams) throws XmlRpcException {
    return execute(new XmlRpcClientRequestImpl(pConfig, pMethodName, pParams));
}

xml-rpc 信息传递

public Object sendRequest(XmlRpcRequest pRequest) throws XmlRpcException {
    XmlRpcStreamRequestConfig config = (XmlRpcStreamRequestConfig) pRequest.getConfig();
    boolean closed = false;
    try {
        ReqWriter reqWriter = newReqWriter(pRequest);
        writeRequest(reqWriter);
        InputStream istream = getInputStream();
        if (isResponseGzipCompressed(config)) {
            istream = new GZIPInputStream(istream);
        }
        Object result = readResponse(config, istream);
        closed = true;
        close();
        return result;
    } catch (IOException e) {
        throw new XmlRpcException("Failed to read server's response: "
                + e.getMessage(), e);
    } catch (SAXException e) {
        Exception ex = e.getException();
        if (ex != null  &&  ex instanceof XmlRpcException) {
            throw (XmlRpcException) ex;
        }
        throw new XmlRpcException("Failed to generate request: "
                + e.getMessage(), e);
    } finally {
        if (!closed) { try { close(); } catch (Throwable ignore) {} }
    }
}

其中 pRequest 包含了所有需要传递给 vdsm 的信息。

protected void writeRequest(final ReqWriter pWriter) throws XmlRpcException {
    method.setRequestEntity(new RequestEntity(){
        public boolean isRepeatable() { return true; }
        public void writeRequest(OutputStream pOut) throws IOException {
            try {
                /* Make sure, that the socket is not closed by replacing it with our
                 * own BufferedOutputStream.
                 */
                OutputStream ostream;
                if (isUsingByteArrayOutput(config)) {
                    // No need to buffer the output.
                    ostream = new FilterOutputStream(pOut){
                        public void close() throws IOException {
                            flush();
                        }
                    };
                } else {
                    ostream = new BufferedOutputStream(pOut){
                        public void close() throws IOException {
                            flush();
                        }
                    };
                }
                pWriter.write(ostream);
            } catch (XmlRpcException e) {
                throw new XmlRpcIOException(e);
            } catch (SAXException e) {
                throw new XmlRpcIOException(e);
            }
        }
        public long getContentLength() { return contentLength; }
        public String getContentType() { return "text/xml"; }
    });
    try {
        int redirectAttempts = 0;
        for (;;) {
            client.executeMethod(method);
            if (!isRedirectRequired()) {
                break;
            }
            if (redirectAttempts++ > MAX_REDIRECT_ATTEMPTS) {
                throw new XmlRpcException("Too many redirects.");
            }
            resetClientForRedirect();
        }
    } catch (XmlRpcIOException e) {
        Throwable t = e.getLinkedException();
        if (t instanceof XmlRpcException) {
            throw (XmlRpcException) t;
        } else {
            throw new XmlRpcException("Unexpected exception: " + t.getMessage(), t);
        }
    } catch (IOException e) {
        throw new XmlRpcException("I/O error while communicating with HTTP server: " + e.getMessage(), e);
    }
}
public void write(OutputStream pStream)
        throws XmlRpcException, IOException, SAXException {
    final XmlRpcStreamConfig config = (XmlRpcStreamConfig) request.getConfig();
    try {
        ContentHandler h = getClient().getXmlWriterFactory().getXmlWriter(config, pStream);
        XmlRpcWriter xw = new XmlRpcWriter(config, h, getClient().getTypeFactory());
        xw.write(request);
        pStream.close();
        pStream = null;
    } finally {
        if (pStream != null) { try { pStream.close(); } catch (Throwable ignore) {} }
    }
}
public void write(XmlRpcRequest pRequest) throws SAXException {
    handler.startDocument();
    boolean extensions = pRequest.getConfig().isEnabledForExtensions();
    if (extensions) {
        handler.startPrefixMapping("ex", XmlRpcWriter.EXTENSIONS_URI);
    }
    handler.startElement("", "methodCall", "methodCall", ZERO_ATTRIBUTES);
    handler.startElement("", "methodName", "methodName", ZERO_ATTRIBUTES);
    String s = pRequest.getMethodName();
    handler.characters(s.toCharArray(), 0, s.length());
    handler.endElement("", "methodName", "methodName");
    handler.startElement("", "params", "params", ZERO_ATTRIBUTES);
    int num = pRequest.getParameterCount();
    for (int i = 0;  i < num;  i++) {
        handler.startElement("", "param", "param", ZERO_ATTRIBUTES);
        writeValue(pRequest.getParameter(i));
        handler.endElement("", "param", "param");
    }
    handler.endElement("", "params", "params");
    handler.endElement("", "methodCall", "methodCall");
    if (extensions) {
        handler.endPrefixMapping("ex");
    }
    handler.endDocument();
}

此处是按照 xml-rpc 的格式组装 xml,通用格式如下:

<?xml version="1.0" encoding="UTF-8"?> 
<methodCall xmlns:ex="http://ws.apache.org/xmlrpc/namespaces/extensions"> 
 <methodName>Person.SetAge</methodName> 
    <params>
        <param>
            <value>
                <i4>20</i4>
            </value>
        </param>
    </params> 
</methodCall>
上一篇下一篇

猜你喜欢

热点阅读