Thrift

2017-11-06  本文已影响0人  暂且听风吟一曲

架构图

业务层:根据业务逻辑,实现thrift文件中接口
接口层:根据thrift文件,生成框架代码
协议层:对数据流进行序列化(二进制、json)
传输层:负责网络传输

C/S模型

Client端

在Client端,用户首先需要依次指定transport类型、protocol类型和client对象。client对象负责将函数名以及参数发送给server端,并且解析server端返回的结果。

Server端
  1. 在Processor的初始化过程,绑定用户实现的handler。
class MyService:
  def func(self, n1, n2):
        pass
handler = MyService()
processor = MyService.Processor(handler)
  1. Server端建立transport,并设置protocol。
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
  1. 创建server对象并开始服务。
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
server.serve()

Thrift内部流程

序列化支持
  1. ttypes.py
    thrift文件中定义的数据格式,编译后的存放在ttypes.py中。Thrift将用户自定义的struct、enum和exception都转换为python class。
class SharedStruct {
  thrift_spec = (
    None, #0
    (1, TType.I32, 'key', None, None, ) #1
    (2, TType.STRING, 'value', None, None, ) # 2
  )
  
  def __init__(self, key=None, value=None):
    self.key = key
    self.value = value

  def read(self, iprot):
    .....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      elif:
        if ftype == TType.STRING:
          self.value = iprot.readString()
        else:
          iport.skip(ftype)  
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprort.readStructEnd()

    def write(self, oprot):
      oprot.writeStructBegin('SharedStruct')
      if self.key is not  None:
        oprot.writeFieldBegin('key', TType.I32, 1)
        oprot.wirteI32(self.key)
        oprot.writeFieldEnd()
      ....

可以看出,Thrift为自定义类型生成了spec结构、read方法和write方法。在之后的分析中,spec结构对应于自定义类型的内部数据参数,
对数据结构的解析将起到指导性的作用。read方法和write方法是互逆的过程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的顺序将SharedStruct的内容写入output protocol中,而read方法按照相应的顺序,从input protocol中读取各个字段。
注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以虽然传参不同,但是实际上是互斥空操作。

Client端
client.open()

连接指定的ip地址、端口。

sum_ = client.add(1, 1)

调用client对象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,将参数发送给Server端;在recv_xxx中,使用xxx_result的read方法来解析Server端返回的结果。Server端会在返回结果的末尾设置TType.Stop标识来表征消息的结束。

Server端
    self.serverTransport.listen()
    while True:
      client = self.serverTransport.accept()
      if not client:
        continue
      itrans = self.inputTransportFactory.getTransport(client)
      otrans = self.outputTransportFactory.getTransport(client)
      iprot = self.inputProtocolFactory.getProtocol(itrans)
      oprot = self.outputProtocolFactory.getProtocol(otrans)
      try:
        while True:
          self.processor.process(iprot, oprot)
      itrans.close()
      otrans.close()

itrans、otrans、iprot、oprot都是通过工厂方法实例化。itrans、otrans负责Server端与Client端的数据传输,iprot、oprot负责解码工作(应该相同)。

序列化的流程
  1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,消息seqId
  2. 写消息体。如果参数是一个类,就writeStructBegin
  3. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
  4. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
  5. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
  6. 读消息时根据数据类型读取相应的长度
Protocol

用于信息的序列化过程,分为write和read对称的两部分。

方法/字段 含义 内容
Message 消息传输的头部 name(方法名)+ type + seqid
FieldB/E/Stop type + id(IDL中的索引) / None / STOP
Struct name
Map ktype + vtype + size
List etype + size
Set etype + size
Bool/Byte/I16/I32/I64/Double/String/Binary bool_val
skip ttype

STRUCT

struct_begin
while True:  
      field_begin
      if ttype == STOP:
          break
     skip(ttype)
    field_end
struct_end

MAP

map_begin
for i in range(size):
  skip(ktype)
  skip(vtype)
map_end

SET / LIST

begin
for i in range(size):
  skip(ttype)
end
HelloService.py
class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iport = self._oprot = None
    if oport is not None:
      self._oprot = oprot
    self._seqid = 0
  def getStruct(self, key):
    self.send_getStruct(key)
    return self.recv_getStruct()

  def send_getStruct(self, key):
    self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
    args = getStruct_args()
    args.key = key
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()

  def recv_getStruct(self):
    iprot = self._iprot
    (fname, mtype, rseqid) = iprot.readMessageBegin()
    if mtype == TMessageType.EXCEPTION:
      ...
    result = getStruct_result()
    result.read(iprot)
    iprot.readMessageEnd()
    if result.success is not None:
      return result.success
    raise TApplicationException(...)

Client实现的用户声明方法的方式为send_xxx然后recv_xxx,这个模式符合rpc的调用思想,先将请求发出去,然后等待接收远端的回应。

send_xxx && recv_xxx

在send_xxx中,首先会写入name、ttyp和seqid的信息。然后通过获取getStruct方法的参数信息,并写入到输出流中。
获取参数信息的方式如下:

class getStruct_args:
  thrift_spec = (
    None, # 0
    (1, TType.I32, 'key', None, None, ), # 1
  )

  def __init__(self, key=None,):
    self.key = key

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_args')
    if self.key is not None:
      oprot.writeFieldBegin('key', TType.I32, 1)
      oprot.writeI32(self.key)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

send_xxx通过调用getStruct_args的write方法来发送参数信息到远端,可以猜想到,server端会调用getStruct_args的read方法来解析参数信息。
当server端处理完成后,会发送结果到client端。对结果的处理类似于参数处理的逆过程。由getStruct_result负责:

class getStruct_result:
  thrift_spec = (
    (0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
  )

  def __init__(self, success=None,):
    self.success = success

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 0:
        if ftype == TType.STRUCT:
          self.success = SharedStruct()
          self.success.read(iprot)
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_result')
    if self.success is not None:
      oprot.writeFieldBegin('success', TType.STRUCT, 0)
      self.success.write(oprot)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

server端使用getStruct_result的write来发送结果,client端会调用read来响应的解析结果。从代码中可以看出,结果success为TType.STRUCT类型,也就是SharedStruct结构体,而对SharedStruct结构体的解释就位于SharedStruct类中的thrift_spec字段。

上一篇下一篇

猜你喜欢

热点阅读