27.Dubbo全链路异步
2020-09-18 本文已影响0人
山海树
异步调用流程图

在消费端使用get()生成invoker的时候,如果开启了async=true,则会在DubboInvoker中设置RpcContext.getContext的future为AdapterFuture

通过查看AdapterFuture的构造方法
public FutureAdapter(ResponseFuture future) {
this.future = future;
this.resultFuture = new CompletableFuture<>();
future.setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
Result result = (Result) response;
FutureAdapter.this.resultFuture.complete(result);
V value = null;
try {
value = (V) result.recreate();
} catch (Throwable t) {
FutureAdapter.this.completeExceptionally(t);
}
FutureAdapter.this.complete(value);
}
@Override
public void caught(Throwable exception) {
FutureAdapter.this.completeExceptionally(exception);
}
});
}
可以看到实际上还是通过一个ResponseFuture 完成数据的回调赋值给CompleteFuture;



从以上三张图可以看到,生成Invoker的时候设置的ResponseFuture,其实是一个DefaultFuture,在初始化DefaultFuture的时候回根据请求id存储一份到DefaultFuture对象中,此时由于是异步调用,因此迅速返回,当有结果返回的时候,

消费端reveived方法接到数据,总终调用到DefaultFuture的reveived方法,此时根据requestId拿到response,后调invokeCallback()触发回调方法。
服务端
void handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object data;
if (req.isBroken()) {
data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable)data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus((byte)40);
channel.send(res);
} else {
data = req.getData();
try {
CompletableFuture<Object> future = this.handler.reply(channel, data);
if (future.isDone()) {
res.setStatus((byte)20);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenComplete((result, t) -> {
try {
try {
if (t == null) {
res.setStatus((byte)20);
res.setResult(result);
} else {
res.setStatus((byte)70);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException var8) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var8);
}
} finally {
;
}
});
} catch (Throwable var6) {
res.setStatus((byte)70);
res.setErrorMessage(StringUtils.toString(var6));
channel.send(res);
}
}
}
dubbo当方法有返回值的时候回调用HeaderExchangeHandler处理,在HeaderExchangeHandler中reveived最终调用到handleRequest()来处理请求。
通过调用dubbo的reply()方法

该方法中将对方法的执行是否开启异步做出判断并进一步处理

此方法将对返回值做出判断

然后在HeaderExchangeHandler中对返回是否完成做判断,并发送数据。