【Ovirt 笔记】engine 与 vdsm 之间的调用分析
2018-05-31 本文已影响10人
58bc06151329
文前说明
作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。
本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。
分析整理的版本为 Ovirt 4.2.3 版本。
1. 简介
-
JSON-RPC(Remote Protocol Call)是一种以 JSON 为协议的远程调用服务,具有开发调试简单,多平台通用的特性。
- 是一种以 json 为消息格式的远程调用服务,它是一套允许运行在不同操作系统、不同环境的程序实现基于 Internet 过程调用的规范和一系列的实现。
- 是一种无状态、轻量级远程过程调用(RPC)协议。该协议是与传输方式无关的,其定义内容在同一个处理过程使用时,可通过 sockets、HTTP 或其它任意消息传递环境。
- 使用了 JSON(RFC 4627) 作为数据格式。
- JSON-RPC 和 XML-RPC 相比的优点
- XML-RPC 是以 XML 作为消息格式,XML 具有体积大,格式复杂,传输占用带宽。
- 程序对 XML 的解析也比较复杂,并且耗费较多服务器资源。JSON 相比 XML 体积小巧,并且解析相对容易很多。
- JSON-RPC 相对 XML-RPC 在带宽、服务器资源消耗、开发易用性方面要强很多。
1.2 约定
- MUST,MUST NOT,REQUIRED,SHALL,SHALL NOT,SHOULD,SHOULD NOT,RECOMMENDED,MAY,和 OPTIONAL 这些关键词由 RFC 2119 定义。
- JSON-RPC 使用了 JSON 格式,所以与 JSON 使用相同的类型系统(参见
http://www.json.org/ 或者 RFC 4627)。- JSON 可以表示四种基础类型(字符串,数字,布尔,和空值)以及两种结构类型(对象和数组)。
- 客户端和服务端之间交互的所有成员名称都应当是大小写敏感的。
- 客户端被定义为请求对象的来源和应答对象的处理者,服务端被定义为应答对象的来源和请求对象的处理者。
2. 请求
- 请求时向服务器传输数据格式如下(基于 JSON 2.0)
{
"jsonrpc" : 2.0,
"method" : "sayHello",
"params" : ["Hello JSON-RPC"],
"id" : 1
}
2.1 请求对象
属性 | 说明 |
---|---|
jsonrpc | 定义 JSON-RPC 版本,必须等于 2.0。 |
method | 字符串类型,包含被调用方法的名称。方法名称由 rpc 开头,紧随着一个 . 字符( U+002E 或者 ASCII 46),是 rpc 内部保留的方法和扩展名称,不允许被使用。 |
params | 结构类型值,保存着方法调用时的参数值。此成员可省略。若无参数则为 NULL。 |
id | 由客户端指定的标识,必须包含一个字符串或数字或 NULL 值。如果对象中不包含此成员,则该对象被认作一个通知(notification)。id 值通常不应该为 NULL。而且数字值不应当包含小数部分。如果包含 id 值,则服务端的响应必须包含与其一样的值。此成员用于维护两个对象之间的关联关系。 |
- 不应该使用 NULL 作为 id 的值,因为规范文档使用 NULL 值作为 id 未知时响应对象 id 的值。
- 1.0 版本中使用 id 为 NULL 值的对象被视作通知,这样做容易在处理时造成混乱。
- id 不应该包含小数部分,因为许多十进制小数都无法用二进制准确的表示。
2.2 参数结构
- RPC 调用包含的参数必须是结构类型的。要么使用数组,按位置区分;要么使用对象,按名称区分。
- 按位置区分:params 必须是一个数组,以服务端所期望的顺序包含所有参数。
- 按名称区分:params 必须是一个对象,成员名称与服务端期望的参数名称一致。
- 如果缺少相应名称的参数,则可能产生错误。
- 名称必须与方法所期望的参数名称完全匹配,包括大小写。
3. 响应
- 服务器返回的数据格式也为 JSON,格式如下
{
"jsonrpc" : 2.0,
"result" : "Hell JSON-RPC",
"error" : null,
"id" : 1
}
3.1 响应对象
属性 | 说明 |
---|---|
jsonrpc | 定义 JSON-RPC 版本,必须等于 2.0。 |
result | 方法返回值,如果调用成功则必须包含此成员。如果发生错误则必须不包含此成员。其值由被调用的服务端方法决定。 |
error | 此成员在发生错误时是必要的。如果在调用时没有发生错误,则此成员必须不存在。此成员值必须是一个对象。 |
id | 此成员是必要的。它必须与对应请求对象的 id 值相同。如果在获取请求对象的 id 时发生错误,那么它必须为 NULL 值。 |
- result 成员和 error 成员两者必须包含其一,且不能同时存在。
4. 错误
{
"code" : 1,
"message" : "Nothing found",
"data":null
}
4.1 错误对象
属性 | 说明 |
---|---|
code | 用于指示发生错误的数字。必须是整数。 |
message | 用于简短描述该错误的字符串。该消息内容应当是一个简洁明了的单句。 |
data | 基础类型或者结构类型值,包含更多关于该错误的信息。此成员是可以省略。 |
4.2 错误码
- 错误码 -32768 到 -32000 作为预定义错误的保留值,该范围内的任何未定义代码为未来保留使用。
- 这些错误代码基本上与 XML-RPC 的相同
代码 | 错误 | 含义 |
---|---|---|
-32700 | 解析错误 | 服务器接收到无效的 JSON,服务器解析 JSON 文本发生错误。 |
-32600 | 无效的请求 | 发送的 JSON 不是一个有效的请求。 |
-32601 | 方法未找到 | 方法不存在或不可见。 |
-36602 | 无效的参数 | 无效的方法参数。 |
-36603 | 内部错误 | JSON-RPC 内部错误。 |
-32000 到 -32099 | 服务器端错误 | 保留给具体实现服务器端错误。 |
5. 批量调用
-
客户端可以发送一个请求对象数组来进行批量调用。
- 当所有的请求都响应完毕后,服务器应以一个数组作为响应,每个请求都应该对应一个请求对象。
- 服务器可以以任何宽度,以任意顺序,并发的批量处理一个 RPC 调用。
- 客户端可以通过 id 将请求和响应进行匹配。
-
请求
[
{"jsonrpc":"2.0", "method": "sum", "params":[1,2,4], "id": "1"},
{"jsonrpc":"2.0", "method": "notify_hello","params": [7]},
{"jsonrpc":"2.0", "method": "subtract", "params":[42,23], "id": "2"},
{"foo":"boo"},
{"jsonrpc":"2.0", "method": "foo.get", "params":{"name": "myself"}, "id": "5"},
{"jsonrpc": "2.0", "method":"get_data", "id": "9"}
]
- 响应
[
{"jsonrpc":"2.0", "result": 7, "id": "1"},
{"jsonrpc":"2.0", "result": 19, "id": "2"},
{"jsonrpc":"2.0", "error": {"code": -32600,"message": "Invalid Request"}, "id": null},
{"jsonrpc":"2.0", "error": {"code": -32601,"message": "Method not found"}, "id":"5"},
{"jsonrpc":"2.0", "result": ["hello", 5], "id":"9"}
]
6. 用例说明
6.1. 列表形式参数
- 请求
{
"jsonrpc":"2.0",
"method": "subtract",
"params":[42, 23],
"id": 1
}
- 响应
{
"jsonrpc":"2.0",
"result": 19,
"id": 1
}
6.2. key-value 形式参数
- 请求
{
"jsonrpc":"2.0",
"method": "subtract",
"params":{
"subtrahend": 23,
"minuend": 42
},
"id": 3
}
- 响应
{
"jsonrpc":"2.0",
"result": 19,
"id": 3
}
6.3 通知(notification)
- 通知是一个不包含 id 的请求对象。一次通知请求意味着客户端不需要相应的应答对象,因此不需要有应答对象返回给客户端。服务端必须不能回应通知,包括那些在批量请求中的消息。
- 通知无法通过定义确认,因为它们没有对应返回的应答对象。而且,客户端无法得知任何错误消息(比如:" Invalid params "," Internal error ")。
{"jsonrpc":"2.0", "method": "update", "params":[1,2,3,4,5]}
{"jsonrpc": "2.0", "method":"foobar"}
6.4 错误的调用
6.4.1 无效的请求对象
- 请求
{
"jsonrpc":"2.0",
"method": 1,
"params": "bar"
}
- 响应
{
"jsonrpc": "2.0",
"error":{
"code": -32600,
"message": "Invalid Request"
},
"id": null
}
6.4.2 无效的方法
- 请求
{"jsonrpc":"2.0", "method": "foobar", "id":"1"}
- 响应
{"jsonrpc": "2.0", "error":{"code": -32601, "message": "Method not found"},"id": "1"}
6.4.3 无效的 JSON
- 请求
{"jsonrpc":"2.0", "method": "foobar, "params":"bar", "baz]
- 响应
{"jsonrpc": "2.0", "error":{"code": -32700, "message": "Parse error"},"id": null}
7. 应用实例
- 以虚拟机关机为例,调用
DestroyVDSCommand
命令。
status = getBroker().destroy(getParameters().getVmId().toString());
-
VdsManager
初始化建立与主机的连接。- clientTimeOut 定义客户端超时时间。
- connectionTimeOut 定义连接的时间。
- heartbeat 定义心跳频率。
- clientRetries 定义客户端重试连接次数。
@PostConstruct
private void init() {
......
initVdsBroker();
}
int clientTimeOut = Config.<Integer> getValue(ConfigValues.vdsTimeout) * 1000;
int connectionTimeOut = Config.<Integer> getValue(ConfigValues.vdsConnectionTimeout) * 1000;
int heartbeat = Config.<Integer> getValue(ConfigValues.vdsHeartbeatInSeconds) * 1000;
int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries);
vdsProxy = TransportFactory.createVdsServer(
cachedVds.getHostName(),
cachedVds.getPort(),
clientTimeOut,
connectionTimeOut,
clientRetries,
heartbeat,
resourceManager.getExecutor());
- 创建 vds 代理对象
JsonRpcVdsServer
。
public static IVdsServer createVdsServer(
String hostname, int port, int clientTimeOut, int connectionTimeOut, int clientRetries, int heartbeat, ScheduledExecutorService executorService) {
HttpClient client = HttpUtils.getConnection(
connectionTimeOut,
clientRetries,
Config.getValue(ConfigValues.VdsMaxConnectionsPerHost),
Config.getValue(ConfigValues.MaxTotalConnections));
String eventQueue = Config.getValue(ConfigValues.EventQueueName);
return new JsonRpcVdsServer(
JsonRpcUtils.createStompClient(
hostname,
port,
connectionTimeOut,
clientTimeOut,
clientRetries,
heartbeat,
Config.getValue(ConfigValues.EncryptHostCommunication),
Config.getValue(ConfigValues.VdsmSSLProtocol),
Config.getValue(ConfigValues.EventProcessingPoolSize),
Config.getValue(ConfigValues.VdsRequestQueueName),
Config.getValue(ConfigValues.VdsResponseQueueName),
eventQueue,
executorService)
, client);
}
- 创建
JsonRpcVdsServer
代理对象的同时,设置 engine 与 vdsm 之间的连接属性。- 采用了 STOMP 协议。
- 创建 JSON-RPC 客户端对象实例
JsonRpcClient
。
public static JsonRpcClient createStompClient(String hostname,
int port,
int connectionTimeout,
int clientTimeout,
int connectionRetry,
int heartbeat,
boolean isSecure,
String protocol,
int parallelism,
String requestQueue,
String responseQueue,
String eventQueue,
ScheduledExecutorService executorService) {
StompClientPolicy connectionPolicy =
new StompClientPolicy(connectionTimeout,
connectionRetry,
heartbeat,
IOException.class,
requestQueue,
responseQueue);
connectionPolicy.setEventQueue(eventQueue);
ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class);
if (Config.getValue(ConfigValues.UseHostNameIdentifier)){
log.debug(identifierLogMessage, hostname);
connectionPolicy.setIdentifier(hostname);
}
return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol, parallelism, executorService);
}
- 创建响应器对象
Reactor
。
final Reactor reactor = ReactorFactory.getReactor(provider, type);
- 创建客户端响应对象
ReactorClient
,将其与客户端对象JsonRpcClient
进行绑定。-
ResponseWorker
主要作用是对下发命令的响应进行处理。 -
this.tracker
是ResponseTracker
主要负责下发命令的响应跟踪。 - 通过
onMessageReceived
事件跟踪命令下发后的响应信息。
-
ResponseWorker worker = ReactorFactory.getWorker(parallelism);
JsonRpcClient jsonClient = worker.register(client);
public JsonRpcClient register(ReactorClient client) {
final JsonRpcClient jsonRpcClient = new JsonRpcClient(client, this.tracker);
client.addEventListener(new MessageListener() {
@Override
public void onMessageReceived(byte[] message) {
queue.add(new MessageContext(jsonRpcClient, message));
}
});
return jsonRpcClient;
}
- 使用了
JsonRpcVdsServer
代理,下发 vdsm 命令。- 这里下发的是虚拟机关机命令。
@Override
public StatusOnlyReturn destroy(String vmId) {
JsonRpcRequest request = new RequestBuilder("VM.destroy").withParameter("vmID", vmId).build();
Map<String, Object> response = new FutureMap(this.client, request);
return new StatusOnlyReturn(response);
}
-
JsonRpcRequest
类组装 JSON-RPC 请求。-
new RequestBuilder("VM.destroy")
组装请求方法,方法名 VM.destroy。 -
withParameter("vmID", vmId)
定义参数。 -
build()
组装请求对象。
-
public JsonRpcRequest build() {
final TextNode id = this.parameters.textNode(UUID.randomUUID().toString());
return new JsonRpcRequest(this.methodName, this.parameters, id);
}
- 执行 RPC 请求。
Map<String, Object> response = new FutureMap(this.client, request);
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
final Call call = new Call(req);
this.tracker.registerCall(req, call);
retryCall(req, call);
try {
this.getClient().sendMessage(jsonToByteArray(req.toJson()));
} finally {
retryCall(req, call);
}
return call;
}
- 通过
req.toJson()
方法组装请求的 JSON 对象。
public JsonNode toJson() {
ObjectNode node = MAPPER.createObjectNode();
node.put("jsonrpc", "2.0");
if (getMethod() == null) {
node.putNull("method");
} else {
node.put("method", getMethod());
}
if (getParams() == null) {
node.putNull("params");
} else {
node.put("params", getParams());
}
if (getId() == null) {
node.putNull("id");
} else {
node.put("id", getId());
}
return node;
}
- 请求对象格式。
[
{
"jsonrpc":"2.0",
"method": "VM.destroy",
"params":{
"vmID": <vmId>
},
"id": <UUID.randomUUID()>
}
]
- 将 JSON-RPC 调用放入缓存。
public void registerCall(JsonRpcRequest req, JsonRpcCall call) {
if (this.runningCalls.putIfAbsent(req.getId(), call) != null) {
throw new RequestAlreadySentException();
}
}
- 下发请求与响应。
public Future<JsonRpcResponse> call(JsonRpcRequest req) throws ClientConnectionException {
final Call call = new Call(req);
this.tracker.registerCall(req, call);
retryCall(req, call);
try {
this.getClient().sendMessage(jsonToByteArray(req.toJson()));
} finally {
retryCall(req, call);
}
return call;
}
- 创建下发命令的连接。
public ReactorClient getClient() throws ClientConnectionException {
if (this.client.isOpen()) {
return this.client;
}
this.client.connect();
return this.client;
}
- 获得
SocketChannel
对象 。
public void connect() throws ClientConnectionException {
if (isOpen()) {
return;
}
try (LockWrapper wrapper = new LockWrapper(this.lock)) {
if (isOpen() && isInInit()) {
getPostConnectCallback().await(policy.getRetryTimeOut(), policy.getTimeUnit());
}
if (isOpen()) {
return;
}
final FutureTask<SocketChannel> task = scheduleTask(new Retryable<>(() -> {
InetAddress address = InetAddress.getByName(hostname);
log.info("Connecting to " + address);
final InetSocketAddress addr = new InetSocketAddress(address, port);
final SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(addr);
return socketChannel;
}, this.policy));
this.channel = task.get();
......
- 执行下发的命令。
final long timeout = getTimeout(policy.getRetryTimeOut(), policy.getTimeUnit());
while (!this.channel.finishConnect()) {
final FutureTask<SocketChannel> connectTask = scheduleTask(new Retryable<>(() -> {
if (System.currentTimeMillis() >= timeout) {
throw new ConnectException("Connection timeout");
}
return null;
}, this.policy));
connectTask.get();
}
- 命令下发到 vdsm,建立下发命令与实际执行方法的映射关系(在节点上执行)。
- vdsm/jsonrpcvdscli.py 文件中定义。
- VM.destroy 与 destroy 对应。
_COMMAND_CONVERTER = {
......
'destroy': 'VM.destroy',
......
}
- 解析 JSON 参数对象,执行映射方法(在节点上执行)。
def _callMethod(self, methodName, *args, **kwargs):
try:
method = _COMMAND_CONVERTER[methodName]
except KeyError as e:
raise Exception("Attempt to call function: %s with "
"arguments: %s error: %s" %
(methodName, args, e))
class_name, method_name = method.split('.')
timeout = kwargs.pop('_transport_timeout', self._default_timeout)
params = self._prepare_args(class_name, method_name, args, kwargs)
req = JsonRpcRequest(method, params, reqId=str(uuid4()))
responses = self._client.call(
req, timeout=self._timeouts.get(method_name, timeout))
if responses:
resp = responses[0]
else:
raise JsonRpcNoResponseError(method=method)
if resp.error is not None:
return response.error_raw(resp.error.code, str(resp.error))
if not self._xml_compat:
return response.success_raw(resp.result)
if resp.result and resp.result is not True:
# None is translated to True inside our JSONRPC implementation
if isinstance(resp.result, list):
return response.success(items=resp.result)
elif isinstance(resp.result, six.string_types):
return response.success(resp.result)
else:
return response.success(**resp.result)
return response.success()
- 这里执行了断电
destroy
方法,并返回结果(在节点上执行)。- vdsm/virt/vm.py 文件中定义。
def destroy(self, gracefulAttempts=1,
reason=vmexitreason.ADMIN_SHUTDOWN):
self.log.debug('destroy Called')
result = self.doDestroy(gracefulAttempts, reason)
if response.is_error(result):
return result
# Clean VM from the system
self._deleteVm()
return response.success()
- 创建命令下发超时,重发命令的参数对象
ResponseTracking
。
private void retryCall(final JsonRpcRequest request, final JsonRpcCall call) throws ClientConnectionException {
ResponseTracking tracking = new ResponseTracking(request, call, new RetryContext(policy), getTimeout(this.policy.getRetryTimeOut(), this.policy.getTimeUnit()), this.client, !Objects.equals(request.getMethod(), "Host.ping"));
this.tracker.registerTrackingRequest(request, tracking);
}
- 建立请求与
ResponseTracking
的映射关系。
public void registerTrackingRequest(JsonRpcRequest req, ResponseTracking tracking) {
JsonNode id = req.getId();
List<JsonNode> nodes = new CopyOnWriteArrayList<>();
try (LockWrapper wrapper = new LockWrapper(this.lock)) {
this.map.put(id, tracking);
this.queue.add(id);
nodes.add(id);
nodes = this.hostToId.putIfAbsent(tracking.getClient().getClientId(), nodes);
if (nodes != null && !nodes.contains(id)) {
nodes.add(id);
}
}
}
- 响应处理类
ResponseWorker
根据下发命令反馈,解析响应结果。- 同时启动
ResponseTracker
线程,用于跟踪命令响应,超时后进行重发。
- 同时启动
public void run() {
MessageContext context = null;
while (true) {
try {
context = this.queue.take();
if (context.getClient() == null) {
break;
}
if (log.isDebugEnabled()) {
log.debug("Message received: " + new String(context.getMessage(), UTF8));
}
JsonNode rootNode = MAPPER.readTree(context.getMessage());
if (!rootNode.isArray()) {
processIncomingObject(context.getClient(), rootNode);
} else {
final Iterator<JsonNode> iter = rootNode.getElements();
while (iter.hasNext()) {
final JsonNode node = iter.next();
processIncomingObject(context.getClient(), node);
}
}
} catch (Exception e) {
log.warn("Exception thrown during message processing");
if (log.isDebugEnabled()) {
log.debug(e.getMessage(), e);
}
continue;
}
}
}
private void processIncomingObject(JsonRpcClient client, JsonNode node) {
......
client.processResponse(JsonRpcResponse.fromJsonNode(node));
}
-
ResponseTracker
命令下发超时后的重发处理。
protected void loop() {
for (JsonNode id : queue) {
if (!this.runningCalls.containsKey(id)) {
removeRequestFromTracking(id);
continue;
}
ResponseTracking tracking = this.map.get(id);
if (System.currentTimeMillis() >= tracking.getTimeout()) {
RetryContext context = tracking.getContext();
context.decreaseAttempts();
if (context.getNumberOfAttempts() <= 0) {
handleFailure(tracking, id);
continue;
}
try {
tracking.getClient().sendMessage(jsonToByteArray(tracking.getRequest().toJson()));
} catch (ClientConnectionException e) {
handleFailure(tracking, id);
}
tracking.setTimeout(getTimeout(context.getTimeout(), context.getTimeUnit()));
}
}
}
- 命令未超时,组装响应对象
JsonRpcResponse
。
public static JsonRpcResponse fromJsonNode(JsonNode node) {
JsonNode jsonrpcNode = node.get("jsonrpc");
if (jsonrpcNode == null) {
throw new IllegalArgumentException(
"'jsonrpc' field missing in node");
}
String version = jsonrpcNode.asText();
if (version == null || !version.equals("2.0")) {
throw new IllegalArgumentException("Only jsonrpc 2.0 is supported");
}
final JsonNode id = node.get("id");
if (id == null) {
throw new IllegalArgumentException("Response missing id field");
}
return new JsonRpcResponse(node.get("result"), node.get("error"), id);
}
- 响应对象格式。
[
{
"jsonrpc":"2.0",
"result": {
"status": {
"code": 0,
"message": "msg"
}
},
"id": <UUID.randomUUID()>
}
]
- 获取 result 部分,转换格式为
Map<String, Object>
。
call.getCallback().onResponse(new JsonResponseUtil().populate(response))
public FutureMap(JsonRpcClient client, JsonRpcRequest request) {
try {
this.response = client.call(request);
this.client = client;
} catch (ClientConnectionException e) {
throw new TransportRunTimeException("Connection issues during send request", e);
}
}
- 业务层解析返回的 result 结果。
@Override
public StatusOnlyReturn destroy(String vmId) {
......
return new StatusOnlyReturn(response);
}
@SuppressWarnings("unchecked")
public StatusOnlyReturn(Map<String, Object> innerMap) {
Map<String, Object> statusMap = (Map<String, Object>) innerMap.get(STATUS);
status = new Status(statusMap);
}
7.1 实例总结
代码调用流程图- 图例说明
- 其中紫色为重点关注对象。
- 绿色为缓存和队列。
- 虚线框为引用对象。
- 数字标识调用时序。
对象 | 说明 |
---|---|
JsonRpcVdsServer | 用于执行 vdsm 命令的代理服务器,同时创建 JsonRpcClient 对象与之对应。 |
JsonRpcClient | JSON-RPC 客户端对象,向服务端发送和接收 JSON 数据。 |
JsonRpcRequest | JSON-RPC 的请求对象,封装请求的 JSON 数据。 |
JsonRpcResponse | JSON-RPC 的响应对象,封装响应的 JSON 数据。 |
Call | 用于记录命令下发的过程,同时包含了请求和响应对象(JsonRpcRequest 和 JsonRpcResponse)。 |
ReactorClient | JSON-RPC 客户端对象反应器,绑定消息反馈事件,当 RPC 请求得到响应回馈时触发事件反应(将结果交由 ResponseWorker 处理)。 |
ResponseWorker | 响应工作者,独立的响应处理线程,用于处理响应的消息(由 ReactorClient 获得后放入队列 queue 中)。同时启动 ReponseTracker 线程。 |
ReponseTracking | 响应跟踪对象,用于设置和保存,命令超时后的重发命令的参数。 |
ReponseTracker | 响应跟踪者,用于处理 ReponseTracking,建立了 ReponseTracking 与 JsonRpcRequest 的映射关系(map 缓存)。同时还建立了 JsonRpcRequest 与 Call 的映射关系(runningCalls 缓存)。同时还处理了命令超时后的重发。 |