YarnRPC过程

2017-11-26  本文已影响101人  searchworld

最近在看《深入解析YARN架构设计与实现原理》,看到第三章YRAN RPC实现的时候对其中如果使用ProtoBuf不是很理解,尤其是里面同一个类会有Java定义和ProtoBuf两个实现,两者的交互细节书上写的不是很清楚,这里根据自己的理解简单记录下。这里以server端为例。

Server的创建在是通过RpcServerFactoryPBImpl.getServer方法,这里protocol参数以ResourceTracker为例(在ResourceTrackerService.serviceStart中),instance参数就是ResourceTrackerService本身,也就是真正实现了rpc调用服务端方法的类。最终实现方法是:

public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig) {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        // getPbServiceImplClassName 根据约定的规则在 PB_IMPL_PACKAGE_SUFFIX 找到  ResourceTrackerPBServiceImpl 类
        pbServiceImplClazz = localConf
            .getClassByName(getPbServiceImplClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnRuntimeException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      // 使用ResourceTrackerService实例去构造ResourceTrackerPBServiceImpl,
      // ResourceTrackerPBServiceImpl的真正功能只是将PB版的请求(比如NodeHeartbeatRequestProto)封装成Java版的请求(对应的是NodeHeartbeatRequestPBImpl),
      // 再传给ResourceTrackerService实际调用产生Java版的Response,转成PB版的返回
      service = constructor.newInstance(instance);
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    } catch (InstantiationException e) {
      throw new YarnRuntimeException(e);
    }

    // service是ResourceTrackerPBServiceImpl的实例,实现 ResourceTrackerPB接口,因此pbProtocol就是ResourceTrackerPB
    Class<?> pbProtocol = service.getClass().getInterfaces()[0];
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        // 这里也是根据约定找到 ResourceTracker$ResourceTrackerService类,这个由PB生成
        protoClazz = localConf.getClassByName(getProtoClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnRuntimeException(e);
      }
    }
    
    try {
      // method即 newReflectiveBlockingService 方法最终调用 ResourceTrackerPBServiceImpl相应的方法
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
  }
上一篇下一篇

猜你喜欢

热点阅读