thrift

通过Packetbeat抓取Finagle协议数据包(自定义Th

2017-01-06  本文已影响1171人  Kungfu猫熊

写在前面

最近一年多一直在做服务治理相关的开发工作. 起初服务监控采用了成本比较低的方式来实现(提供者,消费者自己按分钟维度上报健康数据到Redis,但是这种方式只是在Java的服务提供者和消费者做到了很好的实现, 其他语言目前只能上报很少部分的监控数据). 因为公司的开发语言是多样的, 其中包括: Nodejs, Ruby, Golang, Java, Scala等, 那么将来要对监控数据的模型拓展, 需求变更等, 将很难快速推广实现. 随着公司业务的高速发展, 以及将来所有服务部署Docker化, 服务的监控预警已经是服务治理工作中的重中之重. 服务监控最好可以同时监控基础服务(Mysql, Redis等),业务服务. 我们的业务服务是采用的Twitter的Finagle-thrift实现多语言之间的RPC调用. Balabala说了这么多, 就是我们现在要做全链路监控, 做监控首先第一步是需要可以收集到这些网络调用的原始数据, 这个时候ElasticStack中的Beats项目进入了我们的视线, Beats项目中的Packetbeat子项目可以抓取到像Mysql, Redis, Thrift等协议的数据包. 但是,我们业务使用的通信协议是Finagle-thrift, 这里面为了满足一些拓展(比如:用于RPC调用链跟踪的Zipkin),Finagle-thrift在原生Thrift上做了二次封装, 接下来需要让Packetbeat对Finagle-thrift协议支持. 下面我将分析过程整理如下, 方便以后温习回顾.

Packetbeat项目介绍

更详细的请参考 Medcl的一个教程

整个Beats项目都是用的Golang语言开发, Golang这几天也是现学现卖, 我在整个调试过程中没有找到可以比较方便进行Debug的方式, 只能通过fmt.Println进行各种调试信息的输出, 这个过程比较痛苦. 这里我顺便记录一下怎么配置Go的环境, 有几个概念比较懵,在此记录一下.

安装GO

这里 获取对应的操作系统的GO安装bao

GOPATH

获取项目

 #创建相应目录
 mkdir -p $GOPATH/src/github.com/elastic/ 
 cd $GOPATH/src/github.com/elastic
    
 #签出源码
 git clone https://github.com/elastic/beats.git
 cd beats
    
 #修改官方仓库为upstream源,设置自己的仓库为origin源
 git remote rename origin upstream
 git remote add origin git@github.com:medcl/packetbeat.git
    
 #获取上游最新的代码,如果是刚fork的话可不用管
 git pull upstream master
    
 #签出一个名为finagle的分支,用于开发这个功能
 git checkout -b finagle
    
 #切换到packetbeat模块
 cd packetbeat
    
 #获取依赖信息
 (mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone  https://github.com/golang/tools.git )
 go get github.com/tools/godep
    
 #编译
 make

yml配置文件说明

interfaces:
  #如果提供者消费者在本机,直接写成lo0 
  device: en0
protocols:
    # 自定义协议名
  finaglethrift:
    ports: [20880, 9090, 9091, 9099, 9098]
    # 自定义Thrift的Transport type一定要是frame的方式, 否则解析不出来
    transport_type: framed
    protocol_type: binary
    # idl文件一定要有
    idl_files: ["test_cfg/result.thrift","test_cfg/order.thrift","test_cfg/hello.thrift"]


output:
  elasticsearch:
    hosts: ["192.168.10.235:9200"]
  kafka:
    hosts: ["192.168.5.159:9092"]
    topic: "packetbeat_test_qqq"
shipper:
logging:
  files:
    path: /tmp/mybeat

需要修改哪些文件

原生Thrift简单分析

通讯协议格式

核心模型

在TBinaryProtocol中有对消息体的读取和写入操作, 具体代码如下:

  public TMessage readMessageBegin() throws TException {
        int size = readI32();
        if (size < 0) {
          int version = size & VERSION_MASK;
          if (version != VERSION_1) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
          }
          return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
        } else {
          if (strictRead_) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
          }
          return new TMessage(readStringBody(size), readByte(), readI32());
        }
  } 
    
    public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {
          int version = VERSION_1 | message.type;
          writeI32(version);
          writeString(message.name);
          writeI32(message.seqid);
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
  }
    
    
    /**
     * Message type constants in the Thrift protocol.
     *
     */
    public final class TMessageType {
          public static final byte CALL  = 1;
          public static final byte REPLY = 2;
          public static final byte EXCEPTION = 3;
          public static final byte ONEWAY = 4;
    }


Finagle-thrift协议分析

因为Finagle-thrift是在Thrift协议之上做了封装, 我们主要对着两个协议中具体的数据进行比对.

测试数据IDL

为了让测试具有代表性, 构建的IDL文件中既有简单的没有入参,返回值的finaglePing方法, 也有有入参,复杂返回值的detail方法

    
include "result.thrift"
    
/*订单*/
struct Order {
    1:i32 userId
    /*买家*/
    2:string userName,
    /*订单ID*/
    3:string orderId,
}
    
struct OrderResult {
    1:result.Result result,
    2:optional Order order
}
    
service OrderServ{
    /*订单详情*/
    OrderResult detail(1:i32 userId, 2:string userName, 3:string orderId)
    void finaglePing()
}   
    
    
    
    
/************************复杂返回值Result的定义***************************/
    
struct FailDesc {
    1:string name,
    2:string failCode,
    3:string desc
}
    
struct Result {
    
    1:i32 code,
    
    2:optional list<FailDesc> failDescList
}
    
struct StringResult {
    1:Result result,
    
    2:optional string value,
    
    3:optional string extend
}   
    
    

一次RPC调用的差异

原生Thrift调用

我们针对finaglePing方法通过原生Thrift进行一次RPC调用,并在Client端TcpDump出产生的数据包


2016-04-30_12-33-13

从图上可以看出,包含了3次握手, 1次Client与Server的业务请求交互, 4次挥手关闭连接.

下面我们看Client发送请求时的具体数据包内容如下图:


2016-04-30_12-39-12

这里包含数据长度, Thrift是否是严谨读写,消息类型, 消息内容等信息.

Fiangle-thrift调用及分析

我们针对finaglePing方法同样通过Fiangle-thrift方式进行一次RPC调用,并在Client端TcpDump出产生的数据包


2016-04-30_12-45-41

从上图看出, 一次RPC调用包含了, 3次握手, 1次fiangle确认协议的请求交互, 1次Client与Server的业务请求交互, 4次挥手关闭连接.

关于Client发送的请求和原生Thrift还不太一样, 在创建完连接之后, 需要发送
一次带有__can__finagle__trace__v3__信息的请求已确认是否是Finagle-thrift协议, 确认成功之后才会进行真正的业务交互, 这次确认是一次标准的Thrift通信,具体如下图:

2016-04-30_12-52-52

下面是在确认Fiangle标识之后进行的真正的业务通信,具体如下图:

2016-04-30_12-57-36

我们上面这张图中可以看出在标准的Thrift协议数据之前Finagle-thrift自己又加了很多自己的数据,具体加了什么, 我们来看一下Fiangle的源码, 具体如下:

```
/**
 * ThriftClientFramedCodec implements a framed thrift transport that
 * supports upgrading in order to provide TraceContexts across
 * requests.
 */
object ThriftClientFramedCodec {
  /**
   * Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodecFactory]].
   * Passing a ClientId will propagate that information to the server iff the server is a finagle
   * server.
   */
  def apply(clientId: Option[ClientId] = None) =
    new ThriftClientFramedCodecFactory(clientId)

  def get() = apply()
}

class ThriftClientFramedCodecFactory(
    clientId: Option[ClientId],
    _useCallerSeqIds: Boolean,
    _protocolFactory: TProtocolFactory)
  extends CodecFactory[ThriftClientRequest, Array[Byte]]#Client {

  def this(clientId: Option[ClientId]) = this(clientId, false, Protocols.binaryFactory())

  def this(clientId: ClientId) = this(Some(clientId))

  // Fix this after the API/ABI freeze (use case class builder)
  def useCallerSeqIds(x: Boolean): ThriftClientFramedCodecFactory =
    new ThriftClientFramedCodecFactory(clientId, x, _protocolFactory)

  /**
   * Use the given protocolFactory in stead of the default `TBinaryProtocol.Factory`
   */
  def protocolFactory(pf: TProtocolFactory) =
    new ThriftClientFramedCodecFactory(clientId, _useCallerSeqIds, pf)

  /**
   * Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodec]]
   * with a default TBinaryProtocol.
   */
  def apply(config: ClientCodecConfig) =
    new ThriftClientFramedCodec(_protocolFactory, config, clientId, _useCallerSeqIds)
}

class ThriftClientFramedCodec(
  protocolFactory: TProtocolFactory,
  config: ClientCodecConfig,
  clientId: Option[ClientId] = None,
  useCallerSeqIds: Boolean = false
) extends Codec[ThriftClientRequest, Array[Byte]] {

  private[this] val preparer = ThriftClientPreparer(
    protocolFactory, config.serviceName,
    clientId, useCallerSeqIds)

  def pipelineFactory: ChannelPipelineFactory =
    ThriftClientFramedPipelineFactory

  override def prepareConnFactory(
    underlying: ServiceFactory[ThriftClientRequest, Array[Byte]]
  ) = preparer.prepare(underlying)

  override val protocolLibraryName: String = "thrift"
}       
```

Scala源码看起来太费劲, 既然知道了原理, 为了可以解析出具体的Fiangle-thrift中的东西, 我只需要设置FrameSize和data的offset的位置, 获取到原生的Thrift协议中的Framed数据即可, 然后复用Packetbeat自带的针对Thrift协议包的抓取与组合逻辑.

通过比对两个业务包我知道中间Fiangle-thrift自己添加的信息字节大小固定为129个字节,这只是Client在发送请求时才会添加这些附加信息, Server端返回值则是在原生Thrift协议中添加了1个字节, 其中我还需要排除创建连接之后发送的Finagle-thrift协议确认请求.

我们完成了普通Finagle-thrift协议的解析,接下来还要针对附带Zipkin信息的Finagle-thrift协议的解析, Zipkin是参考Google的Dapper完成的可以对RPC调用链进行跟踪的框架, 这已经是业内针对分布式系统之间RPC调用链跟踪的通用解决方案. Zipkin无非就是在RPC调用时多传输了TraceId, SpanId, ParentSpanId, IsSample等信息, 通过下面的Zipkin源码可以确定这些信息的大小也是固定字节,并且大小为4个字节. Zipkin关于这块的源码如下:

```
/**
 * The wire format is (big-endian):
 *     ''spanId:8 parentId:8 traceId:8 flags:8''
 */
def tryUnmarshal(body: Buf): Try[TraceId] = {
      if (body.length != 32)
        return Throw(new IllegalArgumentException("Expected 32 bytes"))

      val bytes = local.get()
      body.write(bytes, 0)

      val span64 = ByteArrays.get64be(bytes, 0)
      val parent64 = ByteArrays.get64be(bytes, 8)
      val trace64 = ByteArrays.get64be(bytes, 16)
      val flags64 = ByteArrays.get64be(bytes, 24)

      val flags = Flags(flags64)
      val sampled = if (flags.isFlagSet(Flags.SamplingKnown)) {
        if (flags.isFlagSet(Flags.Sampled)) someTrue else someFalse
      } else None

      val traceId = TraceId(
        if (trace64 == parent64) None else Some(SpanId(trace64)),
        if (parent64 == span64) None else Some(SpanId(parent64)),
        SpanId(span64),
        sampled,
        flags)

      Return(traceId)
}
```

下面是Fiangle-thrift针对Zipkin关闭和开启的一个抓包对比图:


2016-04-30_13-46-49

根据上面的分析逻辑,我就可以在Packetbeat中的messageParser方法中通过一些字节特征修正FrameSize和data的offset来把数据包变成原生的Thrift协议数据包, 具体代码如下:

func (thrift *Thrift) messageParser(s *ThriftStream) (bool, bool) {
        var ok, complete bool
        var m = s.message
        for s.parseOffset < len(s.data) {
            dataStr := string(s.data)
            switch s.parseState {
            case ThriftStartState:
                m.start = s.parseOffset
                if thrift.TransportType == ThriftTFramed {
                    if len(s.data) < 4 {
                        return true, false
                    }
                    frameSize := common.Bytes_Ntohl(s.data[:4])
                    m.FrameSize = frameSize
                    s.parseOffset = 4
                    
                    if (!strings.Contains(dataStr, "__can__finagle__trace__v3__")) {
                        var thriftFlagIndex1 int = bytes.LastIndex(s.data, thriftFlag1)
                        if thriftFlagIndex1> -1 {// 如果标识为80010001 那么代表是client->server
                            // client -> server
                            m.FrameSize = common.Bytes_Ntohl(s.data[:4]) - uint32(thriftFlagIndex1) - 4  // 从8001位置之后开始
                            s.parseOffset = thriftFlagIndex1// 从8001位置开始(包括8001位置)
                        }else{//如果没有标识为80010001, 那么应该有标识位80010002, 那么代表是server->client
                            // finagle 返回值
                            if bytes.LastIndex(s.data, thriftFlag2)==5 {
                                m.FrameSize = frameSize - 1
                                s.parseOffset = 4 + 1
                            }
                        }
                    }
                }
        ... ...
}

参考

Thrift Tutorial

Packetbeat协议扩展开发教程

由浅入深了解Thrift

上一篇下一篇

猜你喜欢

热点阅读