Dubbo数据透传
更多文章请关注公众号:IT巡游屋
前言
关于Dubbo框架,可能很多人都知道。Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,它使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。而它的使用场景主要是用于分布式的远程调用。很多朋友在使用Dubbo的过程中,只是关注着业务代码的接口定义以及实现,但是确往往忽略了一个重要的技术点,就是如何在服务间追踪一个调用链。
关于追踪服务的调用链
举一个例子说明,在我们项目运行的生产环境里,如果有用户反馈自己在操作手机APP支付的时候,支付成功了,但是订单的支付状态却没有更改,还是待支付的状态。用户急忙拨打了客服电话反馈此事。对于一般公司的不完整的基础设施建设情况来说,这个时候就只能我们运维人员或者开发人员去排查该用户当时操作的调用日志。可是在我们分布式的系统部署里面,可能负责支付的业务,涉及到多个服务的调用,并且即使是相同的服务也肯定以集群的方式部署多套。所以如果要从日志中找到该用户调用服务的日志,必须有一个前提就是,我们能追踪请求在服务间的整个调用链。那么要追踪就得先在服务间传递这些追踪数据。
简单常见的Dubbo服务部署架构
image就上图而言,有两个服务的消费者和三个服务的提供者。
消费者:ConsumerA,ConsumerB
提供者:ServiceA,ServiceB,ServiceC
但是这些服务的消费者和提供者都不是单一部署的。消费者部署在了两台服务器C1和C2,提供者分别在P1,P2,P3都部署了。那么现在问题来了,如果ConsumerA中,某一个业务方法,需要分别依靠调用ServiceA、ServiceB和ServiceC才能完成。那么如何才能知道这个请求调用的是P1-P3服务器上部署的哪些服务呢?其中可能的一个结果就是,比如ConsumerA调用了P2中的ServiceA,调用了P3中的ServiceB,最后调用了P1中的serviceC。如下图:
image那么要完成这样子的追踪,通常的思路就是在调用中额外传递一个追踪的数据,比如trackerId。接下来就讲解实现这个追踪数据传递的方法。
方法一
描述
在参数中直接添加追踪数据trackId
代码
/**
* @author jogeen
* @version 1.0
*/
public class ModelA {
/**
* 追踪ID
*/
private String trackerId;
/**
* 业务数据
*/
private String businessMessage;
}
/**
* 接口的定义
* @author jogeen
* @version 1.0
*/
public interface IServiceA {
public void method1(ModelA modelA);
}
分析
通过这样传递,在同一个线程调用里,每个服务都能获取相同的trackId,这样就将调用链串联起来。但是这种方式太过于粗暴,因为每一个需要传输的Model,都得加上trackerId这个追踪字段,对代码的侵入性太强。
方法二
描述
在方法一的基础上,将追踪数据trackId提取到一个父类BaseModel中,所有其它的Model都继承至BaseModel
代码
/**
* @author jogeen
* @version 1.0
*/
public class BaseModel {
/**
* 追踪ID
*/
protected String trackerId;
}
/**
* @author jogeen
* @version 1.0
*/
public class ModelA extends BaseModel{
/**
* 业务数据
*/
private String businessMessage;
}
分析
相较于方法一,侵入性虽然小了许多,但是依然还是破坏了“无侵入性”原则。毕竟,需要使每个Model都去继承BaseModel。在java这种类的单一继承的条件下,方法二也是不合时宜的。
方法三
描述
修改Dubbo源码。如下图,Dubbo本身是具有非常清晰的分层。
image其中黄色标注部分Proxy:是通过动态代理为Consumer 端需要调用的service接口生成的实例对象。Implement:是Provider端,我们所编写的service接口的实现类对象。那么从Consumer 到Provider的调用过程,实际传输的对象是RPCInvocation。只要我们在RPCInvocation中加入我们的追踪数据(trackId),Client将其序列化之后,通过TCP传递到Server,Server反序列化处理之后就能得到RPCInvocation对象,其中就包含着我们加入的trackId。
代码
/**
* 使用ThreadLocal保存现在trackID
* @author jogeen
* @version 1.0
*/
public class TraceIdUtils {
private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<String>();
public static final String TRACK_ID="trackId";
public static String getTraceId() {
return TRACE_ID_THREAD_LOCAL.get();
}
public static void setTraceId(String traceId) {
TRACE_ID_THREAD_LOCAL.set(traceId);
}
}
Cunsumer端的修改点
package org.apache.dubbo.rpc.proxy;
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
RpcInvocation invocation;
if (RpcUtils.hasGeneratedFuture(method)) {
Class<?> clazz = method.getDeclaringClass();
String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
invocation = new RpcInvocation(syncMethod, args);
invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
} else {
invocation = new RpcInvocation(method, args);
if (RpcUtils.hasFutureReturnType(method)) {
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
}
//从ThreadLocal中将Consumer端的当前线程的trackId取出,放入RpcInvocation
invocation.setAttachment(TraceIdUtils.TRACK_ID,TrackUtils.getTraceId());
return invoker.invoke(invocation).recreate();
}
}
Provider端的修改点
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
//省略其它变量定义
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext rpcContext = RpcContext.getContext();
boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
if (supportServerAsync) {
CompletableFuture<Object> future = new CompletableFuture<>();
rpcContext.setAsyncContext(new AsyncContextImpl(future));
}
rpcContext.setRemoteAddress(channel.getRemoteAddress());
//从RpcInvocation中取出trackId,存入ThreadLocal中,供线程中其它业务使用。
TraceIdUtils.setTraceId(inv.getAttachment(TrackUtils.TRACK_ID));
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
//省略类中其它方法。
}
分析
该方法对业务代码没有任何侵入性,对于需要添加此功能的业务代码无需做任何修改。对于性能应该没有明显影响,毕竟trackId只是较短的固定长度的字符串。缺点是每次跟随官方升级Dubbo的版本时,需要再做相同的修改。
方法四
描述
使用RpcContext对象,RpcContext对象是Dubbo框架提供的,其本身维护了一次RPC交互调用的上下文信息。查看RpcContext源码,可以看到,它本身也是维护的ThreadLocal。
代码
源码分析
package org.apache.dubbo.rpc;
public class RpcContext {
/**
* use internal thread local to improve performance
*/
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
//省略其它成员变量
/**
* get context.
*
* @return context
*/
public static RpcContext getContext() {
return LOCAL.get();
}
public static void restoreContext(RpcContext oldContext) {
LOCAL.set(oldContext);
}
//省略其它方法
}
从本质的理论上讲,和方法三的原理是一样的,最终都是通过将追踪数据存入RpcInvocation,然后传递过去。查看源码如下,主要用注释标注的关键点
package org.apache.dubbo.rpc;
public class RpcContext {
/**
* use internal thread local to improve performance
*/
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
//省略其它成员变量
/**
* get context.
*
* @return context
*/
public static RpcContext getContext() {
return LOCAL.get();
}
public static void restoreContext(RpcContext oldContext) {
LOCAL.set(oldContext);
}
//省略其它方法
}
测试代码
//在Consumer端,调用Service方法前
RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1
userService.invokeMehtod();
//在Provider端,在Service的实现方法中
String traceId = RpcContext.getContext().getAttachment("traceId");//代码2
分析
该方法咋一看没什么大问题,但是却有一个不可忽略的缺点。就是RpcContext中的内容,在进行一次RPC调用之后会被清空。当在一个线程中,有多次RPC方法调用时,只有在调用RpcContext.getContext().setAttachment()方法设置了内容之后,最先调用的一次请求在Provider端能收到RpcContext中上下文的信息,而其它后面的请求都是没有的。所以如果要保证每次RPC调用都携带追踪数据,就都要在调用业务方法之前调用上面代码1的语句,如下。显然,这是不够优雅的。
//在Consumer端,调用Service方法前
RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1
userService.invokeMehtod1();
RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1
userService.invokeMehtod2();
RpcContext.getContext().setAttachment("traceId", "trackValue");//代码1
userService.invokeMehtod3();
方法五
描述
在方法四的基础上,加入Dubbo过滤器的使用。自定义过滤器TrackFilter实现com.alibaba.dubbo.rpc.Filter接口。这样即使RpcContext中的上下文内容在每次PRC调用后会被清理,我们也可以在过滤器中重新填入追踪数据trackId。
代码
自定义
package com.ittheima.util;
/**
* 使用ThreadLocal保存现在trackID
* @author jogeen
* @version 1.0
* @date 2018年11月21日
*/
public class TraceIdUtils {
private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL=new ThreadLocal<>();
public static String getTraceId(){
return TRACE_ID_THREAD_LOCAL.get();
}
public static void setTraceId(String traceId){
TRACE_ID_THREAD_LOCAL.set(traceId);
}
}
自定义日志类
/**
* 追踪日志工具类
* @author: jogeen
* @version: v1.0
* @date:2018年11月21日
*/
public class TrackLogUtils {
private static Logger logger = Logger.getLogger(TrackLogUtils.class);
public static void info(String message){
logger.info(TraceIdUtils.getTraceId()+":"+message);
}
}
自定义过滤器
/**
* dubbo过滤器
* @author: jogeen
* @version: v1.0
*/
public class TraceIdFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String traceId = RpcContext.getContext().getAttachment("traceId");
if ( !StringUtils.isEmpty(traceId) ) {
//如果RpcContext携带了traceId,将其作为线程共享变量保存起来。
TraceIdUtils.setTraceId(traceId);
} else {
//如果RpcContext未携带了traceId,从ThreadLocal中查询
traceId=TraceIdUtils.getTraceId();
if ( StringUtils.isEmpty(traceId) ) {
//如果ThreadLocal没有TrackId,说明这是调用链的开端,所以生成一个trackId
traceId= UUID.randomUUID().toString();
//保存
TraceIdUtils.setTraceId(traceId);
}
//重新给RpcContext设置traceId
RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
}
//实际的rpc调用
return invoker.invoke(invocation);
注意:要让我们自定义的过滤器生效,还需要在resource目录下, 添加META-INF/dubbo目录,并且创建com.alibaba.dubbo.rpc.Filter文件
[图片上传中...(image-a071c8-1599661952191-0)]
在服务注册的地方,加入自定义的过滤器(此次为注解形式,配置文件方式也是一样)
@Service(filter = "traceIdFilter")
public class UserServiceImpl implements UserService
在服务调用的地方,加入自定义的过滤器(此次为注解形式,配置文件方式也是一样)
@Reference(filter = "traceIdFilter")
private UserService userService;
客户端方法调用片段
public String showName(){
TrackLogUtils.info("请求方法第1次");
userService.getName("jogeen1");
TrackLogUtils.info("请求方法第2次");
userService.getName("jogeen2");
TrackLogUtils.info("请求方法第3次");
userService.getName("jogeen3");
return null;
}
服务端代码片段
@Override
public String getName(String name) {
TrackLogUtils.info("进入请求体name="+name);
return "jogeen";
}
结果:
客户端
[INFO ] 2018-12-22 20:37:00,987 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第1次完成
[INFO ] 2018-12-22 20:37:00,991 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第2次完成
[INFO ] 2018-12-22 20:37:00,995 3c57472f-be31-4c35-91a9-ea408553df6a:请求方法第3次完成
服务端:
[INFO ] 2018-12-22 20:38:44,490 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen1
[INFO ] 2018-12-22 20:38:44,501 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen2
[INFO ] 2018-12-22 20:38:44,504 01301855-12cb-4d2a-b124-4364a775b045:进入请求体name=jogeen3
分析
该方法非常完美,每次调用之前都会在filter设置追踪数据,不仅对已有代码无任何侵入性,而且还可以实现多服务之间的连续传递。如A->B->C->D,从服务A开始产生的追踪数据可以一直传递到服务D中。
总结
程序员在编写业务代码的同时,也要向公司架构牛人多学习,他们是如何搭建我们的系统框架,以满足日益复杂的业务需求。作为Dubbo过滤器的使用,还可以用于服务间调用响应的耗时记录,以及服务间的安全检验工作等。作为Dubbo功能的扩充,filter还是很有研究价值。