Grpc升级1.3.0后,服务路由该怎么做

2017-05-17  本文已影响1408人  linking12
摘要

Gprc Java最近1.3.0 Release了

  1. 首先是Keepalived机制
    1)客户端的Keepalives设置可以work了
    2)设置Keepalives后,在整个网络连接建立完成之后,会不断的发送ping消息给服务端
    3)服务端根据Keppavlied的ping消息来自动识别哪些连接是断了的
  2. 服务端可以设置连接的时效了
    1)当设置连接的最大时间到了,该连接将会中断掉
  3. 增加了trace的一些传递
  4. 对LoadBalancers进行了一些变化
重点说说LoadBalancer的一些变化

在1.1.0版本,整个Lb的工厂类是这样的

 private static class RoundRobinLoadBalancer<T> extends LoadBalancer<T> {
        private RoundRobinLoadBalancer(TransportManager<T> tm){
            this.tm = tm;
        }

而TransportManager是建立trsnsport的管理通道,而真正做负载均衡是这样的

@Override
    public T pickTransport(Attributes affinity) {
      final RoundRobinServerList<T> addressesCopy;
      synchronized (lock) {
        if (closed) {
          return tm.createFailingTransport(SHUTDOWN_STATUS);
        }
        if (addresses == null) {
          if (nameResolutionError != null) {
            return tm.createFailingTransport(nameResolutionError);
          }
          if (interimTransport == null) {
            interimTransport = tm.createInterimTransport();
          }
          return interimTransport.transport();
        }
        addressesCopy = addresses;
      }
      return addressesCopy.getTransportForNextServer();
    }

而现在这些都没了,变成了一个叫做Subchannel来做这件事情了
针对Subchannel在他的注释里写了

 /**
   * A logical connection to a server, or a group of equivalent servers represented by an {@link 
   * EquivalentAddressGroup}.
   *
   * <p>It maintains at most one physical connection (aka transport) for sending new RPCs, while
   * also keeps track of previous transports that has been shut down but not terminated yet.
   *
   * <p>If there isn't an active transport yet, and an RPC is assigned to the Subchannel, it will
   * create a new transport.  It won't actively create transports otherwise.  {@link
   * #requestConnection requestConnection()} can be used to ask Subchannel to create a transport if
   * there isn't any.
   */

大概意思是一个逻辑连接而非真正的物理连接,里面有可能有一个或多个地址来
他维护了一个真正的物理连接去真正的建立Rpc的连接,而当连接状态还没有变成活动的时候,他将建立一个transport

所以这里比较有意思:
首先,grpc针对transport再进行了一次封装,把transport的建立提前了,在从nameresove拿到ip地址后,马上就会进行requestConnection

@Override
    public void requestConnection() {
      subchannel.obtainActiveTransport();
    }

 @Nullable
  ClientTransport obtainActiveTransport() {
    ClientTransport savedTransport = activeTransport;
    if (savedTransport != null) {
      return savedTransport;
    }
    try {
      synchronized (lock) {
        savedTransport = activeTransport;
        // Check again, since it could have changed before acquiring the lock
        if (savedTransport != null) {
          return savedTransport;
        }
        if (state.getState() == IDLE) {
          gotoNonErrorState(CONNECTING);
          startNewTransport();
        }
      }
    } finally {
      channelExecutor.drain();
    }
    return null;

而真正实现负载均衡的是SubchannelPicker

/**
   * The main balancing logic.  It <strong>must be thread-safe</strong>. Typically it should only
   * synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
   *
   * <p>Note: Implementations should override exactly one {@code pickSubchannel}.
   */
  @ThreadSafe
  public abstract static class SubchannelPicker {
    /**
     * Make a balancing decision for a new RPC.
     *
     * @param args the pick arguments
     */
    public abstract PickResult pickSubchannel(PickSubchannelArgs args);
  }

所以,在这里的整个负载均衡机制和原来1.1.0版本已经完全不同,如果想在负载均衡前面再加一次路由规则的限制的话,需要重新修改
我是这样修改的:
对SubchannelPicker进行扩展,当选择做负载均衡的时候,把某一些建立的Subchannel给踢掉

 @Override
    public PickResult pickSubchannel(PickSubchannelArgs args) {
        Map<String, Object> affinity = args.getCallOptions().getOption(GrpcClientCall.CALLOPTIONS_CUSTOME_KEY);
        GrpcURL refUrl = (GrpcURL) affinity.get(GrpcClientCall.GRPC_REF_URL);
        if (size > 0) {
            Subchannel subchannel = nextSubchannel(refUrl);
            affinity.put(GrpcClientCall.GRPC_NAMERESOVER_ATTRIBUTES, nameResovleCache);
            return PickResult.withSubchannel(subchannel);
        }
        if (status != null) {
            return PickResult.withError(status);
        }

        return PickResult.withNoResult();
    }

    private Subchannel nextSubchannel(GrpcURL refUrl) {
        if (size == 0) {
            throw new NoSuchElementException();
        }
        synchronized (this) {
            Subchannel val = list.get(index);
            index++;
            if (index >= size) {
                index = 0;
            }
           //剔除不合规的Subchannel
            boolean discard = discard(refUrl, val);
            if (discard && index != 0) {
                nextSubchannel(refUrl);
            }
            return val;
        }
    }

但是这样有一个问题,剔除掉Subchannel,但是其实其Transport其实还在,还是浪费了整个客户端的资源,这个问题暂时也没有好的办法,如果把该Subchannel给shutdown掉,由于在接到了nameresovle的地址列表了,已经建立起了transpot了,如果shutdown了,将会触发nameresovle的refresh,重新获取地址,这样又是一个重复的循环,并不合算,所以只能是暂时浪费了一点内存来做服务路由了

扩展后代码如下:
https://github.com/linking12/saluki/blob/master/saluki-core/src/main/java/com/quancheng/saluki/core/grpc/GrpcRoutePicker.java

文章写的比较乱,主要是在升级Grpc后碰到的一些问题,然后想出的一些解决办法,希望看得懂
另外,吐槽一下,在选择很多人说自己写个rpc,他搞一下,他也弄一下,其实rpc不是那么简单的传输一个数据就拉倒这么简单,维护连接的状态,传输的数据这些都是要方方面面需要考虑的事情,现在业界有许多rpc的基础组件在了,如果用在生产上就不要自己造个轮子重新实现一套rpc
基于http2的有grpc、armeria都是一个比较靠谱的选型,而不是一来就我要自己造一个轮子实现以下rpc

上一篇 下一篇

猜你喜欢

热点阅读