dubbo过滤器

2021-03-02  本文已影响0人  SparkOnly

加载原理

dubbo的过滤器整体都是采用SPI的方式进行加载的

  1. 首先通过SPI加载dubbo加载策略
private static LoadingStrategy[] loadLoadingStrategies() {
        return stream(load(LoadingStrategy.class).spliterator(), false)
                .sorted()
                .toArray(LoadingStrategy[]::new);
    }

默认有三种策略,从以下三个目录中读取,优先级依次递减

  1. 加载扩展类
    这里会遍历上面的加载策略,通过拼接策略目录+类的类型名(全路径名),找到对应的文件,遍历文件内容,根据文件里的key-value加载文件里对应的类
private Map<String, Class<?>> loadExtensionClasses() {
    cacheDefaultExtensionName();

    Map<String, Class<?>> extensionClasses = new HashMap<>();

    for (LoadingStrategy strategy : strategies) {
        loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
    }

    return extensionClasses;
}

令牌验证

通过令牌验证在注册中心控制权限,以决定要不要下发令牌给消费者,可以防止消费者绕过注册中心访问提供者

全局方式

<!--随机token令牌,使用UUID生成-->
<dubbo:provider interface="com.foo.BarService" token="true" />
<!--固定token令牌,相当于密码-->
<dubbo:provider interface="com.foo.BarService" token="123456" />

服务级别

<dubbo:service interface="com.foo.BarService" token="true" />
<dubbo:service interface="com.foo.BarService" token="123456" />

协议级别

dubbo:protocol name="dubbo" token="true" />
<dubbo:protocol name="dubbo" token="123456" />

服务端实现

通过隐式传参来获取token进行判断

String token = invoker.getUrl().getParameter(TOKEN_KEY);
if (ConfigUtils.isNotEmpty(token)) {
  Class<?> serviceType = invoker.getInterface();
  Map<String, Object> attachments = inv.getObjectAttachments();
  String remoteToken = (attachments == null ? null : (String) attachments.get(TOKEN_KEY));
  if (!token.equals(remoteToken)) {
    throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
  }
}

消费者

对于消费者,如果通过注册中心来访问,并不需要自己指定token
但是如果是直连方式,默认是没有token的,当然是可以通过手动指定隐式传参来做规避这个问题

生产者限流

默认实现类:ExecuteLimitFilter,配置项:executes,默认0表示不进行限流

实现原理

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
    if (!RpcStatus.beginCount(url, methodName, max)) {
        throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
                "Failed to invoke method " + invocation.getMethodName() + " in provider " +
                        url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                        "\" /> limited.");
    }

    invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
    try {
        return invoker.invoke(invocation);
    } catch (Throwable t) {
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    }
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
    RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
    if (t instanceof RpcException) {
        RpcException rpcException = (RpcException) t;
        if (rpcException.isLimitExceed()) {
            return;
        }
    }
    RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
}

可以看到,这里主要是借助RpcStatus的静态方法来实现,限流最大数通过url直接获取

RpcStatus的内部逻辑

  1. 这里对于每个url都有一个RpcStatus对象,而每个url状态对象里,又通过methodName关联了一个RpcStatus(通过ConcurrentHashMap)。限流针对方法进行处理
  2. active字段为原子变量,记录当前的数量
private final AtomicInteger active = new AtomicInteger();

public static boolean beginCount(URL url, String methodName, int max) {
    max = (max <= 0) ? Integer.MAX_VALUE : max;
    RpcStatus appStatus = getStatus(url);
    RpcStatus methodStatus = getStatus(url, methodName);
    if (methodStatus.active.get() == Integer.MAX_VALUE) {
        return false;
    }
    for (int i; ; ) {
        i = methodStatus.active.get();
        if (i + 1 > max) {
            return false;
        }
        if (methodStatus.active.compareAndSet(i, i + 1)) {
            break;
        }
    }
    appStatus.active.incrementAndGet();
    return true;
}
  1. 结束时,调用endCount方法进行处理,更新统计字段
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
    endCount(getStatus(url), elapsed, succeeded);
    endCount(getStatus(url, methodName), elapsed, succeeded);
}

private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
    status.active.decrementAndGet();
    status.total.incrementAndGet();
    status.totalElapsed.addAndGet(elapsed);
    if (status.maxElapsed.get() < elapsed) {
        status.maxElapsed.set(elapsed);
    }
    if (succeeded) {
        if (status.succeededMaxElapsed.get() < elapsed) {
            status.succeededMaxElapsed.set(elapsed);
        }
    } else {
        status.failed.incrementAndGet();
        status.failedElapsed.addAndGet(elapsed);
        if (status.failedMaxElapsed.get() < elapsed) {
            status.failedMaxElapsed.set(elapsed);
        }
    }
}

自定义实现日志追踪

  1. 编写dubbo过滤器
@Activate(group = CommonConstants.CONSUMER)
public class DubboTraceFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        final String traceId = MDC.get(Constant.TRACE_ID);
        RpcContext.getContext().setAttachment(Constant.TRACE_ID, traceId);
        return invoker.invoke(invocation);
    }
}
  1. resources目录下,建立META-INF/dubbo目录
  2. 新增文件:com.alibaba.dubbo.rpc.Filter
  3. 文件里配置:
trace=cn.gw.server.filter.DubboTraceFilter
上一篇下一篇

猜你喜欢

热点阅读