【Ovirt 笔记】engine 与 vdsm 之间的调用分析之
分析整理的版本为 Ovirt 3.4.5 版本。
xml-rpc 是 XML Remote Procedure Call 远程过程调用的分布式计算协议,通过 xml 将调用函数封装,并使用 HTTP 作为传送协议。
Ovrit 通过 xml-rpc 远程调用 vdsm-api 完成配置主机,网络和共享存储。
平台中 xml-rpc 相关的连接都封装在 XmlRpcUtils 类中。
平台中有两处地方创建了连接,分别对应了 主机 与 SPM 的 xml-rpc 的 HTTP 连接。
engine 为 xml-rpc 服务端,vdsm 为 客户端。
注册主机时,会在 VdsManager 构造方法中通过 XmlRpcUtils 类,创建 xml-rpc 连接。
private void InitVdsBroker() {
......
Pair<VdsServerConnector, HttpClient> returnValue =
XmlRpcUtils.getConnection(_vds.getHostName(),
_vds.getPort(),
clientTimeOut,
connectionTimeOut,
clientRetries,
VdsServerConnector.class,
Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication));
_vdsProxy = new VdsServerWrapper(returnValue.getFirst(), returnValue.getSecond());
}
平台中所有针对 vdsm 的操作,都是通过 _vdsProxy 调用接口完成。
创建 xml-rpc 连接
- 建立 engine 与 vdsm 的 xml-rpc 服务连接。对 Class 为 type 的接口进行 两次 代理。返回的 asyncConnector 中的 connectory 就是代理类 ,通过执行代理类的各方法,ovirt 实现 异步 的远程调用与底层的 vdsm 通信。
private static <T> Pair<T, HttpClient> getHttpConnection(URL serverUrl, int clientTimeOut,
int connectionTimeOut, int clientRetries, Class<T> type) {
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(serverUrl);
config.setConnectionTimeout(connectionTimeOut);
config.setReplyTimeout(clientTimeOut);
XmlRpcClient xmlRpcClient = new XmlRpcClient();
xmlRpcClient.setConfig(config);
XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);
HttpClient httpclient = createHttpClient(clientRetries);
transportFactory.setHttpClient(httpclient);
xmlRpcClient.setTransportFactory(transportFactory);
ClientFactory clientFactory = new ClientFactory(xmlRpcClient);
T connector = (T) clientFactory.newInstance(Thread.currentThread().getContextClassLoader(), type, null);
T asyncConnector = (T) AsyncProxy.newInstance(connector, clientTimeOut);
Pair<T, HttpClient> returnValue =
new Pair<T, HttpClient>(asyncConnector, httpclient);
return returnValue;
}
- 定义参数:设置服务 URL,连接时间,响应时间等。
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(serverUrl);
config.setConnectionTimeout(connectionTimeOut);
config.setReplyTimeout(clientTimeOut);
- 创建 XmlRpcClient 实例,该类做为 xml-rpc 客户端的核心, 所有的请求通过其处理。
XmlRpcClient xmlRpcClient = new XmlRpcClient();
- 定义传输策略。虽然使用的是 HTTP 协议传输,但实现方式有多种,其决定客户端与服务端通信的实现方式。
- Ovirt 中自定义了一个 CustomXmlRpcCommonsTransport 类, 继承自 XmlRpcCommonsTransportFactory,主要的目的是为在 HttpHeader 中加入一个 FlowID 属性,代表每一次远程接口的 id。最终实现的 ovirt 的调用与 vdsm 的实现在日志中能够一一对应起来。
XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);
HttpClient httpclient = createHttpClient(clientRetries);
transportFactory.setHttpClient(httpclient);
xmlRpcClient.setTransportFactory(transportFactory);
如果开发者没有指定 Factory,则会采用 XmlRpcSunHttpTransportFactory 作为默认的策略工厂。
- XmlRpcSunHttpTransportFactory 通过 java.net.URLConnection 完成 HTTP 请求。
import java.net.URLConnection;
- Ovirt 使用的 XmlRpcCommonsTransportFactory 是通过 Apache 实现 HttpClient,进行有效管理资源,可以降低内存资源的使用。
import org.apache.commons.httpclient.HttpClient;
- HttpClient 的构造,初始化多线程连接管理变量。
HttpClient client = new HttpClient(new MultiThreadedHttpConnectionManager());
- 注册了 http.method.retry-handler 属性。
int retries = Config.<Integer> getValue(ConfigValues.vdsRetries);
HttpMethodRetryHandler handler = new DefaultHttpMethodRetryHandler(retries, false);
HttpClientParams parameters = client.getParams();
parameters.setParameter(HttpMethodParams.RETRY_HANDLER, handler);
- ClientFactory 类,实现了对 Class 为 type 的接口的 第一次 代理。
T connector = (T) clientFactory.newInstance(Thread.currentThread().getContextClassLoader(), type, null);
public Object newInstance(ClassLoader pClassLoader, final Class pClass, final String pRemoteName) {
return Proxy.newProxyInstance(pClassLoader, new Class[]{pClass}, new InvocationHandler(){
public Object invoke(Object pProxy, Method pMethod, Object[] pArgs) throws Throwable {
if (isObjectMethodLocal() && pMethod.getDeclaringClass().equals(Object.class)) {
return pMethod.invoke(pProxy, pArgs);
}
final String methodName;
if (pRemoteName == null || pRemoteName.length() == 0) {
methodName = pMethod.getName();
} else {
methodName = pRemoteName + "." + pMethod.getName();
}
Object result;
try {
result = client.execute(methodName, pArgs);
} catch (XmlRpcInvocationException e) {
Throwable t = e.linkedException;
if (t instanceof RuntimeException) {
throw t;
}
Class[] exceptionTypes = pMethod.getExceptionTypes();
for (int i = 0; i < exceptionTypes.length; i++) {
Class c = exceptionTypes[i];
if (c.isAssignableFrom(t.getClass())) {
throw t;
}
}
throw new UndeclaredThrowableException(t);
}
TypeConverter typeConverter = typeConverterFactory.getTypeConverter(pMethod.getReturnType());
return typeConverter.convert(result);
}
});
}
- 实现的 InvocationHandler 中,将 pPxoy 以 client 进行了替代。
本地方法:
return pMethod.invoke(pProxy, pArgs);
其他情况:就是 xmlRpcClient 执行 execute 方法。
result = client.execute(methodName, pArgs);
- 进行 第二次 代理,目的是实现异步的分布式调用。
T asyncConnector = (T) AsyncProxy.newInstance(connector, clientTimeOut);
@Override
public Object invoke(Object proxy, final Method m, final Object[] args)
throws Throwable {
Object result;
FutureTask<Object> future;
FutureCall annotation = m.getAnnotation(FutureCall.class);
if (annotation != null) {
future =
new FutureTask<Object>(createCallable(obj,
getMethod(m, annotation, proxy),
args,
ThreadLocalParamsContainer.getCorrelationId()));
ThreadPoolUtil.execute(future);
return future;
} else {
future =
new FutureTask<Object>(createCallable(obj,
m,
args,
ThreadLocalParamsContainer.getCorrelationId()));
ThreadPoolUtil.execute(future);
try {
result = future.get(timeoutInMilisec, TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (e instanceof TimeoutException) {
future.cancel(true);
}
throw new UndeclaredThrowableException(e);
}
return result;
}
}
- 其中执行的 createCallable 方法,其实就是对 ClientFactory 代理类的执行。
二次代理除了实现异步的分布式调用,还处理了 FutureCall 注解情况。进行了 @FutureCall 注解且实现了方法 delegeteTo,那么对有该注解的方法调用真正要执行的 delegeteTo 的方法名。非注解情况的方法,需要立即执行,并且指定了超时时间。
result = future.get(timeoutInMilisec, TimeUnit.MILLISECONDS);
通过两次代理,执行远程方法,最终调用的就是 xmlRpcClient 的 execute(String pMethodName, Object[] pParams) 方法。
public Object execute(XmlRpcRequest pRequest) throws XmlRpcException {
return getWorkerFactory().getWorker().execute(pRequest);
}
- xmlRpcClient 默认的 workerFactory 是 XmlRpcClientWorkerFactory。
protected XmlRpcWorkerFactory getDefaultXmlRpcWorkerFactory() {
return new XmlRpcClientWorkerFactory(this);
}
- xmlRpcClient 父类 XmlRpcController 具有一个重要的属性 maxThreads,可以通过 setMaxThreads 方法进行设置。
private int maxThreads;
- XmlRpcWorkerFactory 类的同步方法 getWorker 实现了线程池中 Work 的获取,当 maxThreads 为 0 时,返回单例。
public synchronized XmlRpcWorker getWorker() throws XmlRpcLoadException {
int max = controller.getMaxThreads();
if (max > 0 && numThreads == max) {
throw new XmlRpcLoadException("Maximum number of concurrent requests exceeded: " + max);
}
if (max == 0) {
return singleton;
}
++numThreads;
if (pool.size() == 0) {
return newWorker();
} else {
return (XmlRpcWorker) pool.remove(pool.size() - 1);
}
}
- 同步方法 releaseWorker 实现了 Work 的释放。
public synchronized void releaseWorker(XmlRpcWorker pWorker) {
--numThreads;
int max = controller.getMaxThreads();
if (pWorker == singleton) {
// Do nothing, it's the singleton
} else {
if (pool.size() < max) {
pool.add(pWorker);
}
}
}
- 获取的 Work 默认为 XmlRpcClientWorker,执行 execute 方法,实际就是 Transport 的 sendRequest。
public Object execute(XmlRpcRequest pRequest)
throws XmlRpcException {
try {
XmlRpcClient client = (XmlRpcClient) getController();
return client.getTransportFactory().getTransport().sendRequest(pRequest);
} finally {
factory.releaseWorker(this);
}
}
- 之前 xml-rpc 连接创建时,有设置过 transportFactory。
XmlRpcCommonsTransportFactory transportFactory = new CustomXmlRpcCommonsTransportFactory(xmlRpcClient);
......
xmlRpcClient.setTransportFactory(transportFactory);
- 每一个 transport,都重写了 initHttpHeaders 方法,额外增加了一些需要传递给 vdsm 的信息。
protected void initHttpHeaders(XmlRpcRequest pRequest) throws XmlRpcClientException {
config = (XmlRpcHttpClientConfig) pRequest.getConfig();
method = newPostMethod(config);
super.initHttpHeaders(pRequest);
if (config.getConnectionTimeout() != 0)
client.getHttpConnectionManager().getParams().setConnectionTimeout(config.getConnectionTimeout());
if (config.getReplyTimeout() != 0)
client.getHttpConnectionManager().getParams().setSoTimeout(config.getReplyTimeout());
method.getParams().setVersion(HttpVersion.HTTP_1_1);
}
protected void initHttpHeaders(XmlRpcRequest pRequest) throws XmlRpcClientException {
XmlRpcHttpClientConfig config = (XmlRpcHttpClientConfig) pRequest.getConfig();
setRequestHeader("Content-Type", "text/xml");
if(config.getUserAgent() != null)
setRequestHeader("User-Agent", config.getUserAgent());
else
setRequestHeader("User-Agent", getUserAgent());
setCredentials(config);
setCompressionHeaders(config);
}
方法参数信息的传递
- 任何的 VDSCommand 执行时,实际调用的是 IVdsServer 接口的方法,也就是之前所说的 _vdsProxy。
_vdsProxy = new VdsServerWrapper(returnValue.getFirst(), returnValue.getSecond());
- VdsServerWrapper 类,实现了 IVdsServer 接口的所有方法。在每一个方法中,又都调用了 VdsServerConnector 的方法,其实就是两次代理的远程方法。例如:
public OneVmReturnForXmlRpc create(Map createInfo) {
try {
Map<String, Object> xmlRpcReturnValue = vdsServer.create(createInfo);
OneVmReturnForXmlRpc wrapper = new OneVmReturnForXmlRpc(xmlRpcReturnValue);
return wrapper;
} catch (UndeclaredThrowableException ute) {
throw new XmlRpcRunTimeException(ute);
}
}
因此通过 VDSCommand 方法,传递的有一定格式的参数信息,也就通过两次代理的方法设置 XmlRpcClient 对象中,通过 execute 方法做了封装。
public Object execute(XmlRpcClientConfig pConfig, String pMethodName, Object[] pParams) throws XmlRpcException {
return execute(new XmlRpcClientRequestImpl(pConfig, pMethodName, pParams));
}
xml-rpc 信息传递
- Transport 的 sendRequest 方法。
public Object sendRequest(XmlRpcRequest pRequest) throws XmlRpcException {
XmlRpcStreamRequestConfig config = (XmlRpcStreamRequestConfig) pRequest.getConfig();
boolean closed = false;
try {
ReqWriter reqWriter = newReqWriter(pRequest);
writeRequest(reqWriter);
InputStream istream = getInputStream();
if (isResponseGzipCompressed(config)) {
istream = new GZIPInputStream(istream);
}
Object result = readResponse(config, istream);
closed = true;
close();
return result;
} catch (IOException e) {
throw new XmlRpcException("Failed to read server's response: "
+ e.getMessage(), e);
} catch (SAXException e) {
Exception ex = e.getException();
if (ex != null && ex instanceof XmlRpcException) {
throw (XmlRpcException) ex;
}
throw new XmlRpcException("Failed to generate request: "
+ e.getMessage(), e);
} finally {
if (!closed) { try { close(); } catch (Throwable ignore) {} }
}
}
其中 pRequest 包含了所有需要传递给 vdsm 的信息。
- 执行 writeRequest 方法,最终进入一个 for 死循环,
不断执行 client.executeMethod(method)。
protected void writeRequest(final ReqWriter pWriter) throws XmlRpcException {
method.setRequestEntity(new RequestEntity(){
public boolean isRepeatable() { return true; }
public void writeRequest(OutputStream pOut) throws IOException {
try {
/* Make sure, that the socket is not closed by replacing it with our
* own BufferedOutputStream.
*/
OutputStream ostream;
if (isUsingByteArrayOutput(config)) {
// No need to buffer the output.
ostream = new FilterOutputStream(pOut){
public void close() throws IOException {
flush();
}
};
} else {
ostream = new BufferedOutputStream(pOut){
public void close() throws IOException {
flush();
}
};
}
pWriter.write(ostream);
} catch (XmlRpcException e) {
throw new XmlRpcIOException(e);
} catch (SAXException e) {
throw new XmlRpcIOException(e);
}
}
public long getContentLength() { return contentLength; }
public String getContentType() { return "text/xml"; }
});
try {
int redirectAttempts = 0;
for (;;) {
client.executeMethod(method);
if (!isRedirectRequired()) {
break;
}
if (redirectAttempts++ > MAX_REDIRECT_ATTEMPTS) {
throw new XmlRpcException("Too many redirects.");
}
resetClientForRedirect();
}
} catch (XmlRpcIOException e) {
Throwable t = e.getLinkedException();
if (t instanceof XmlRpcException) {
throw (XmlRpcException) t;
} else {
throw new XmlRpcException("Unexpected exception: " + t.getMessage(), t);
}
} catch (IOException e) {
throw new XmlRpcException("I/O error while communicating with HTTP server: " + e.getMessage(), e);
}
}
- 传递的 ReqWriter 对象,最终执行 ReqWriterImpl 的 write 方法。
public void write(OutputStream pStream)
throws XmlRpcException, IOException, SAXException {
final XmlRpcStreamConfig config = (XmlRpcStreamConfig) request.getConfig();
try {
ContentHandler h = getClient().getXmlWriterFactory().getXmlWriter(config, pStream);
XmlRpcWriter xw = new XmlRpcWriter(config, h, getClient().getTypeFactory());
xw.write(request);
pStream.close();
pStream = null;
} finally {
if (pStream != null) { try { pStream.close(); } catch (Throwable ignore) {} }
}
}
- XmlRpcWriter 类中对信息进行了格式封装。
public void write(XmlRpcRequest pRequest) throws SAXException {
handler.startDocument();
boolean extensions = pRequest.getConfig().isEnabledForExtensions();
if (extensions) {
handler.startPrefixMapping("ex", XmlRpcWriter.EXTENSIONS_URI);
}
handler.startElement("", "methodCall", "methodCall", ZERO_ATTRIBUTES);
handler.startElement("", "methodName", "methodName", ZERO_ATTRIBUTES);
String s = pRequest.getMethodName();
handler.characters(s.toCharArray(), 0, s.length());
handler.endElement("", "methodName", "methodName");
handler.startElement("", "params", "params", ZERO_ATTRIBUTES);
int num = pRequest.getParameterCount();
for (int i = 0; i < num; i++) {
handler.startElement("", "param", "param", ZERO_ATTRIBUTES);
writeValue(pRequest.getParameter(i));
handler.endElement("", "param", "param");
}
handler.endElement("", "params", "params");
handler.endElement("", "methodCall", "methodCall");
if (extensions) {
handler.endPrefixMapping("ex");
}
handler.endDocument();
}
此处是按照 xml-rpc 的格式组装 xml,通用格式如下:
<?xml version="1.0" encoding="UTF-8"?>
<methodCall xmlns:ex="http://ws.apache.org/xmlrpc/namespaces/extensions">
<methodName>Person.SetAge</methodName>
<params>
<param>
<value>
<i4>20</i4>
</value>
</param>
</params>
</methodCall>