程序员虚拟化技术Ovirt

【Ovirt 笔记】engine 与 vdsm 之间的调用分析

2018-05-31  本文已影响10人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 4.2.3 版本。

1. 简介

1.2 约定

2. 请求

{ 
    "jsonrpc" : 2.0,
    "method" : "sayHello", 
    "params" : ["Hello JSON-RPC"], 
    "id" : 1
} 

2.1 请求对象

属性 说明
jsonrpc 定义 JSON-RPC 版本,必须等于 2.0。
method 字符串类型,包含被调用方法的名称。方法名称由 rpc 开头,紧随着一个 . 字符( U+002E 或者 ASCII 46),是 rpc 内部保留的方法和扩展名称,不允许被使用。
params 结构类型值,保存着方法调用时的参数值。此成员可省略。若无参数则为 NULL。
id 由客户端指定的标识,必须包含一个字符串或数字或 NULL 值。如果对象中不包含此成员,则该对象被认作一个通知(notification)。id 值通常不应该为 NULL。而且数字值不应当包含小数部分。如果包含 id 值,则服务端的响应必须包含与其一样的值。此成员用于维护两个对象之间的关联关系。

2.2 参数结构

3. 响应

{
    "jsonrpc" : 2.0,
    "result" : "Hell JSON-RPC",
    "error" : null,
    "id" : 1
}

3.1 响应对象

属性 说明
jsonrpc 定义 JSON-RPC 版本,必须等于 2.0。
result 方法返回值,如果调用成功则必须包含此成员。如果发生错误则必须不包含此成员。其值由被调用的服务端方法决定。
error 此成员在发生错误时是必要的。如果在调用时没有发生错误,则此成员必须不存在。此成员值必须是一个对象。
id 此成员是必要的。它必须与对应请求对象的 id 值相同。如果在获取请求对象的 id 时发生错误,那么它必须为 NULL 值。

4. 错误

{
    "code" : 1,
    "message" : "Nothing found",
    "data":null
}

4.1 错误对象

属性 说明
code 用于指示发生错误的数字。必须是整数。
message 用于简短描述该错误的字符串。该消息内容应当是一个简洁明了的单句。
data 基础类型或者结构类型值,包含更多关于该错误的信息。此成员是可以省略。

4.2 错误码

代码 错误 含义
-32700 解析错误 服务器接收到无效的 JSON,服务器解析 JSON 文本发生错误。
-32600 无效的请求 发送的 JSON 不是一个有效的请求。
-32601 方法未找到 方法不存在或不可见。
-36602 无效的参数 无效的方法参数。
-36603 内部错误 JSON-RPC 内部错误。
-32000 到 -32099 服务器端错误 保留给具体实现服务器端错误。

5. 批量调用

[
       {"jsonrpc":"2.0", "method": "sum", "params":[1,2,4], "id": "1"},
       {"jsonrpc":"2.0", "method": "notify_hello","params": [7]},
       {"jsonrpc":"2.0", "method": "subtract", "params":[42,23], "id": "2"},
       {"foo":"boo"},
       {"jsonrpc":"2.0", "method": "foo.get", "params":{"name": "myself"}, "id": "5"},
       {"jsonrpc": "2.0", "method":"get_data", "id": "9"} 
]
[
       {"jsonrpc":"2.0", "result": 7, "id": "1"},
       {"jsonrpc":"2.0", "result": 19, "id": "2"},
       {"jsonrpc":"2.0", "error": {"code": -32600,"message": "Invalid Request"}, "id": null},
       {"jsonrpc":"2.0", "error": {"code": -32601,"message": "Method not found"}, "id":"5"},
       {"jsonrpc":"2.0", "result": ["hello", 5], "id":"9"}
]

6. 用例说明

6.1. 列表形式参数

{
    "jsonrpc":"2.0",
    "method": "subtract",
    "params":[42, 23],
    "id": 1
}
{
    "jsonrpc":"2.0",
    "result": 19,
    "id": 1
}

6.2. key-value 形式参数

{
    "jsonrpc":"2.0",
    "method": "subtract",
    "params":{
                "subtrahend": 23,
                "minuend": 42
             }, 
    "id": 3
}
{
    "jsonrpc":"2.0",
    "result": 19,
    "id": 3
}

6.3 通知(notification)

{"jsonrpc":"2.0", "method": "update", "params":[1,2,3,4,5]}
{"jsonrpc": "2.0", "method":"foobar"}

6.4 错误的调用

6.4.1 无效的请求对象

{
    "jsonrpc":"2.0", 
    "method": 1, 
    "params": "bar"
}
{
    "jsonrpc": "2.0", 
    "error":{
                "code": -32600, 
                "message": "Invalid Request"
            },
    "id": null
}

6.4.2 无效的方法

{"jsonrpc":"2.0", "method": "foobar", "id":"1"}
{"jsonrpc": "2.0", "error":{"code": -32601, "message": "Method not found"},"id": "1"}

6.4.3 无效的 JSON

{"jsonrpc":"2.0", "method": "foobar, "params":"bar", "baz]
{"jsonrpc": "2.0", "error":{"code": -32700, "message": "Parse error"},"id": null}

7. 应用实例

status = getBroker().destroy(getParameters().getVmId().toString());
@PostConstruct
private void init() {
  ......
  initVdsBroker();
}
int clientTimeOut = Config.<Integer> getValue(ConfigValues.vdsTimeout) * 1000;
int connectionTimeOut = Config.<Integer> getValue(ConfigValues.vdsConnectionTimeout) * 1000;
int heartbeat = Config.<Integer> getValue(ConfigValues.vdsHeartbeatInSeconds) * 1000;
int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries);
vdsProxy = TransportFactory.createVdsServer(
                cachedVds.getHostName(),
                cachedVds.getPort(),
                clientTimeOut,
                connectionTimeOut,
                clientRetries,
                heartbeat,
                resourceManager.getExecutor());
public static IVdsServer createVdsServer(
            String hostname, int port, int clientTimeOut, int connectionTimeOut, int clientRetries, int heartbeat, ScheduledExecutorService executorService) {

        HttpClient client = HttpUtils.getConnection(
                connectionTimeOut,
                clientRetries,
                Config.getValue(ConfigValues.VdsMaxConnectionsPerHost),
                Config.getValue(ConfigValues.MaxTotalConnections));

        String eventQueue = Config.getValue(ConfigValues.EventQueueName);
        return new JsonRpcVdsServer(
                JsonRpcUtils.createStompClient(
                        hostname,
                        port,
                        connectionTimeOut,
                        clientTimeOut,
                        clientRetries,
                        heartbeat,
                        Config.getValue(ConfigValues.EncryptHostCommunication),
                        Config.getValue(ConfigValues.VdsmSSLProtocol),
                        Config.getValue(ConfigValues.EventProcessingPoolSize),
                        Config.getValue(ConfigValues.VdsRequestQueueName),
                        Config.getValue(ConfigValues.VdsResponseQueueName),
                        eventQueue,
                        executorService)
                , client);
}
public static JsonRpcClient createStompClient(String hostname,
            int port,
            int connectionTimeout,
            int clientTimeout,
            int connectionRetry,
            int heartbeat,
            boolean isSecure,
            String protocol,
            int parallelism,
            String requestQueue,
            String responseQueue,
            String eventQueue,
            ScheduledExecutorService executorService) {
        StompClientPolicy connectionPolicy =
                new StompClientPolicy(connectionTimeout,
                        connectionRetry,
                        heartbeat,
                        IOException.class,
                        requestQueue,
                        responseQueue);
        connectionPolicy.setEventQueue(eventQueue);

        ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class);
        if (Config.getValue(ConfigValues.UseHostNameIdentifier)){
            log.debug(identifierLogMessage, hostname);
            connectionPolicy.setIdentifier(hostname);
        }
        return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol, parallelism, executorService);
}
final Reactor reactor = ReactorFactory.getReactor(provider, type);
ResponseWorker worker = ReactorFactory.getWorker(parallelism);
JsonRpcClient jsonClient = worker.register(client);
public JsonRpcClient register(ReactorClient client) {
        final JsonRpcClient jsonRpcClient = new JsonRpcClient(client, this.tracker);
        client.addEventListener(new MessageListener() {

            @Override
            public void onMessageReceived(byte[] message) {
                queue.add(new MessageContext(jsonRpcClient, message));
            }
        });
        return jsonRpcClient;
}
@Override
public StatusOnlyReturn destroy(String vmId) {
    JsonRpcRequest request = new RequestBuilder("VM.destroy").withParameter("vmID", vmId).build();
    Map<String, Object> response = new FutureMap(this.client, request);
    return new StatusOnlyReturn(response);
}
public JsonRpcRequest build() {
     final TextNode id = this.parameters.textNode(UUID.randomUUID().toString());
     return new JsonRpcRequest(this.methodName, this.parameters, id);
}
Map<String, Object> response = new FutureMap(this.client, request);
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
        final Call call = new Call(req);
        this.tracker.registerCall(req, call);
        retryCall(req, call);
        try {
            this.getClient().sendMessage(jsonToByteArray(req.toJson()));
        } finally {
            retryCall(req, call);
        }
        return call;
}
public JsonNode toJson() {
        ObjectNode node = MAPPER.createObjectNode();
        node.put("jsonrpc", "2.0");
        if (getMethod() == null) {
            node.putNull("method");
        } else {
            node.put("method", getMethod());
        }
        if (getParams() == null) {
            node.putNull("params");
        } else {
            node.put("params", getParams());
        }
        if (getId() == null) {
            node.putNull("id");
        } else {
            node.put("id", getId());
        }
        return node;
}
[
   {
    "jsonrpc":"2.0",
    "method": "VM.destroy",
    "params":{
                "vmID": <vmId>
             }, 
    "id": <UUID.randomUUID()>
   }
]
public void registerCall(JsonRpcRequest req, JsonRpcCall call) {
     if (this.runningCalls.putIfAbsent(req.getId(), call) != null) {
          throw new RequestAlreadySentException();
     }
}
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
     final Call call = new Call(req);
     this.tracker.registerCall(req, call);
     retryCall(req, call);
     try {
         this.getClient().sendMessage(jsonToByteArray(req.toJson()));
     } finally {
         retryCall(req, call);
     }
     return call;
}
public ReactorClient getClient() throws ClientConnectionException {
        if (this.client.isOpen()) {
            return this.client;
        }
        this.client.connect();
        return this.client;
}
public void connect() throws ClientConnectionException {
        if (isOpen()) {
            return;
        }
        try (LockWrapper wrapper = new LockWrapper(this.lock)) {
            if (isOpen() && isInInit()) {
                getPostConnectCallback().await(policy.getRetryTimeOut(), policy.getTimeUnit());
            }
            if (isOpen()) {
                return;
            }
            final FutureTask<SocketChannel> task = scheduleTask(new Retryable<>(() -> {
                InetAddress address = InetAddress.getByName(hostname);
                log.info("Connecting to " + address);

                final InetSocketAddress addr = new InetSocketAddress(address, port);
                final SocketChannel socketChannel = SocketChannel.open();

                socketChannel.configureBlocking(false);
                socketChannel.connect(addr);

                return socketChannel;
            }, this.policy));
            this.channel = task.get();
......
final long timeout = getTimeout(policy.getRetryTimeOut(), policy.getTimeUnit());
while (!this.channel.finishConnect()) {

        final FutureTask<SocketChannel> connectTask = scheduleTask(new Retryable<>(() -> {
                    if (System.currentTimeMillis() >= timeout) {
                        throw new ConnectException("Connection timeout");
                    }
                    return null;
         }, this.policy));
         connectTask.get();
}
_COMMAND_CONVERTER = {
......
'destroy': 'VM.destroy',
......
}
def _callMethod(self, methodName, *args, **kwargs):
        try:
            method = _COMMAND_CONVERTER[methodName]
        except KeyError as e:
            raise Exception("Attempt to call function: %s with "
                            "arguments: %s error: %s" %
                            (methodName, args, e))

        class_name, method_name = method.split('.')
        timeout = kwargs.pop('_transport_timeout', self._default_timeout)
        params = self._prepare_args(class_name, method_name, args, kwargs)

        req = JsonRpcRequest(method, params, reqId=str(uuid4()))

        responses = self._client.call(
            req, timeout=self._timeouts.get(method_name, timeout))
        if responses:
            resp = responses[0]
        else:
            raise JsonRpcNoResponseError(method=method)

        if resp.error is not None:
            return response.error_raw(resp.error.code, str(resp.error))

        if not self._xml_compat:
            return response.success_raw(resp.result)

        if resp.result and resp.result is not True:
            # None is translated to True inside our JSONRPC implementation
            if isinstance(resp.result, list):
                return response.success(items=resp.result)
            elif isinstance(resp.result, six.string_types):
                return response.success(resp.result)
            else:
                return response.success(**resp.result)

        return response.success()
def destroy(self, gracefulAttempts=1,
                reason=vmexitreason.ADMIN_SHUTDOWN):
        self.log.debug('destroy Called')

        result = self.doDestroy(gracefulAttempts, reason)
        if response.is_error(result):
            return result
        # Clean VM from the system
        self._deleteVm()

        return response.success()
private void retryCall(final JsonRpcRequest request, final JsonRpcCall call) throws ClientConnectionException {
        ResponseTracking tracking = new ResponseTracking(request, call, new RetryContext(policy), getTimeout(this.policy.getRetryTimeOut(), this.policy.getTimeUnit()), this.client, !Objects.equals(request.getMethod(), "Host.ping"));
        this.tracker.registerTrackingRequest(request, tracking);
}
public void registerTrackingRequest(JsonRpcRequest req, ResponseTracking tracking) {
        JsonNode id = req.getId();
        List<JsonNode> nodes = new CopyOnWriteArrayList<>();
        try (LockWrapper wrapper = new LockWrapper(this.lock)) {
            this.map.put(id, tracking);
            this.queue.add(id);
            nodes.add(id);
            nodes = this.hostToId.putIfAbsent(tracking.getClient().getClientId(), nodes);
            if (nodes != null && !nodes.contains(id)) {
                nodes.add(id);
            }
        }
}
public void run() {
        MessageContext context = null;
        while (true) {
            try {
                context = this.queue.take();
                if (context.getClient() == null) {
                    break;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Message received: " + new String(context.getMessage(), UTF8));
                }
                JsonNode rootNode = MAPPER.readTree(context.getMessage());
                if (!rootNode.isArray()) {
                    processIncomingObject(context.getClient(), rootNode);
                } else {
                    final Iterator<JsonNode> iter = rootNode.getElements();
                    while (iter.hasNext()) {
                        final JsonNode node = iter.next();
                        processIncomingObject(context.getClient(), node);
                    }
                }
            } catch (Exception e) {
                log.warn("Exception thrown during message processing");
                if (log.isDebugEnabled()) {
                    log.debug(e.getMessage(), e);
                }
                continue;
            }
        }
}
private void processIncomingObject(JsonRpcClient client, JsonNode node) {
......
   client.processResponse(JsonRpcResponse.fromJsonNode(node));
}
protected void loop() {
        for (JsonNode id : queue) {
            if (!this.runningCalls.containsKey(id)) {
                removeRequestFromTracking(id);
                continue;
            }
            ResponseTracking tracking = this.map.get(id);
            if (System.currentTimeMillis() >= tracking.getTimeout()) {
                RetryContext context = tracking.getContext();
                context.decreaseAttempts();
                if (context.getNumberOfAttempts() <= 0) {
                    handleFailure(tracking, id);
                    continue;
                }
                try {
                    tracking.getClient().sendMessage(jsonToByteArray(tracking.getRequest().toJson()));
                } catch (ClientConnectionException e) {
                    handleFailure(tracking, id);
                }
                tracking.setTimeout(getTimeout(context.getTimeout(), context.getTimeUnit()));
            }
        }
}
public static JsonRpcResponse fromJsonNode(JsonNode node) {
        JsonNode jsonrpcNode = node.get("jsonrpc");
        if (jsonrpcNode == null) {
            throw new IllegalArgumentException(
                    "'jsonrpc' field missing in node");
        }

        String version = jsonrpcNode.asText();
        if (version == null || !version.equals("2.0")) {
            throw new IllegalArgumentException("Only jsonrpc 2.0 is supported");
        }

        final JsonNode id = node.get("id");
        if (id == null) {
            throw new IllegalArgumentException("Response missing id field");
        }

        return new JsonRpcResponse(node.get("result"), node.get("error"), id);
}
[
   {
    "jsonrpc":"2.0",
    "result": {
                     "status": {
                                       "code": 0,
                                       "message": "msg"
                                }
              },
    "id": <UUID.randomUUID()>
   }
]
call.getCallback().onResponse(new JsonResponseUtil().populate(response))
public FutureMap(JsonRpcClient client, JsonRpcRequest request) {
     try {
         this.response = client.call(request);
         this.client = client;
     } catch (ClientConnectionException e) {
         throw new TransportRunTimeException("Connection issues during send request", e);
     }
}
@Override
public StatusOnlyReturn destroy(String vmId) {
     ......
     return new StatusOnlyReturn(response);
}
@SuppressWarnings("unchecked")
public StatusOnlyReturn(Map<String, Object> innerMap) {
        Map<String, Object> statusMap = (Map<String, Object>) innerMap.get(STATUS);
        status = new Status(statusMap);
}

7.1 实例总结

代码调用流程图
对象 说明
JsonRpcVdsServer 用于执行 vdsm 命令的代理服务器,同时创建 JsonRpcClient 对象与之对应。
JsonRpcClient JSON-RPC 客户端对象,向服务端发送和接收 JSON 数据。
JsonRpcRequest JSON-RPC 的请求对象,封装请求的 JSON 数据。
JsonRpcResponse JSON-RPC 的响应对象,封装响应的 JSON 数据。
Call 用于记录命令下发的过程,同时包含了请求和响应对象(JsonRpcRequest 和 JsonRpcResponse)。
ReactorClient JSON-RPC 客户端对象反应器,绑定消息反馈事件,当 RPC 请求得到响应回馈时触发事件反应(将结果交由 ResponseWorker 处理)。
ResponseWorker 响应工作者,独立的响应处理线程,用于处理响应的消息(由 ReactorClient 获得后放入队列 queue 中)。同时启动 ReponseTracker 线程。
ReponseTracking 响应跟踪对象,用于设置和保存,命令超时后的重发命令的参数。
ReponseTracker 响应跟踪者,用于处理 ReponseTracking,建立了 ReponseTracking 与 JsonRpcRequest 的映射关系(map 缓存)。同时还建立了 JsonRpcRequest 与 Call 的映射关系(runningCalls 缓存)。同时还处理了命令超时后的重发。
上一篇下一篇

猜你喜欢

热点阅读