ElasticSearch数据传输机制

2017-07-10  本文已影响0人  报纸团

ElasticSearch的数据传输服务TransportService

ElasticSearch的数据传输服务是在TransportService类中实现的。TransportService的核心方法是sendRequest,如下图所示:

sendRequest方法

从上面的代码段可以看出几个有用的信息:

  1. 首先看这一句:
生成requestId

这一句表明,每个被传输的请求,均包含一个requestId,根据requestId的生成函数newRequestId()的实现来看,requestId实际是一个自增的long型变量。这一变量的作用就是为了标识每一次请求。在接收方处理完请求,并返回应答时,需要将请求的requestId带回,以便发送方收到应答后,能够确定是对哪次请求的应答。

  1. 再来看这一句:
响应回调句柄对象

在发送传输请求时,同时指定了返回数据的回调句柄对象:TransportResponseHandler<T> hander。这一回调句柄对象被注册到clientHandlers容器中。
clientHandlers可以看做是一个Map,map的key值是requestId,map的value值是对应的TransportResponseHandler。可以通过clientHandlers.get(requestId)这样的调用来获取到对应的ResponseHandler。

  1. 最后看这一段
调用不同方法发送Request

在发送请求的时候,需要指定发送的目标节点。如果目标节点是本机,直接调用sendLocalRequest方法即可,这一方法不需要通过网络协议进行传输。如果目标节点不是本机,则调用transport成员的sendRequest方法实现数据发送。

Transport组件

ElasticSearch的节点间数据传输组件被抽象成Transport接口。并通过构造函数注入的方式注入到TransportService对象中,如下图所示:

将transport对象注入到TransportService对象中

可以看到,transport对象和threadPool对象都是通过构造函数注入的方式注入到TransportService中的。
,实际上,Transport这一接口的实现类仅有NettyTransport这一个。所以,可以认为ElasticSearch的节点间通讯就是通过Netty来实现的。

NettyTransport

由于Netty基于拦截器模式实现的NIO通讯框架,因此Netty的响应处理机制要通过如下代码说明:

Netty的ChannelPipeline设置

从上图的代码可以看出,ServerChannelPipelineFactory在pipeline上主要添加了两个Handler,一个是SizeHeaderFrameDecoder,一个是MessageChannelHandler。

SizeHeaderFrameDecoder

SizeHeaderFrameDecoder在ChannelPipeline中被命名为“size”,考虑到Netty本身也内置一个类似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解为该Decoder是负责通过一个数据包长度的字段来指示包的长度的。而实际上,elasticsearch的SizeHeaderFrameDecoder的功能远比简单的一个包长度复杂,Netty的数据包头也不仅是一个包长度信息。下面详细介绍一下Netty数据包的包头数据结构。

Netty数据包头格式

NettyHeader

NettyHeader数据的格式如下:

字段名称| 字段长度(字节)| 说明|备注
----|------|----
MARKER_BYTES_SIZE| 2 | 起始标识 | “ES”两个大写字母
MESSAGE_LENGTH_SIZE| 4 | 消息长度 | int型变量
REQUEST_ID_SIZE| 8 | 消息ID | long型变量,请求发起方自增生成
STATUS_SIZE | 1 |状态变量|消息的flag集合,下面详细说明
VERSION_ID_SIZE| 4 | 版本信息 |

STATUS字段

NettyHeader中的Status字段的意义在TransportStatus类中定义。STATUS字段主要包含三个标志位:

TransportStatus的代码如下所示:

TransportStatus.java

下图图示中列出了STATUS字段(单字节)各个标识位的位置和意义。可以看出,只有后三位是有意义的。

7|6|5|4|3|3|2|1|0
----|----|----|----|----|----|----|----
-|-|-|-|-|-|压缩标识|Error标识|response标识(request为0,response为1)

MessageChannelHandler

MessageChannelHandler在ChannelPipeline中被命名为“dispatcher”,这说明该Decoder负责决定接收到的数据包该交给那个具体的业务逻辑去处理。在MessageChannelHandler的业务逻辑中,如下三个成员起了重要作用:

这三个成员是通过构造函数传入的,如下图所示:

MessageChannelHandler的构造函数

从上面的代码可以看出来,threadPool和transportServiceAdapter均来自于transport对象,因此,对于MessageChannelHandler来说,transport是至关重要的。

transportServiceAdapter

MessageChannelHandler的核心逻辑从messageReceived方法展开。但是,在进入messageReceived方法之前,我想先介绍一下transportServiceAdapter。这是后面关于messageReceived方法相关逻辑中需要涉及到的一个重要成员变量。

TransportServiceAdapter接口和TransportService类

transportServiceAdapter成员是TransportServiceAdapter接口的一个实现类,该接口的代码如下所示:

TransportServiceAdapter接口

该接口只有一个实现类,即TransportService.Adapter。其实,TransportServiceAdapter虽然命名为Adapter,但是,它的设计原意可能更接近门面模式。因为目标是使用一个更简单的接口来调用TransportService。TransportServiceAdapter接口有两个主要的获取消息处理句柄的方法,分别是:

TransportService.Adapter.onResponseReceived
TransportService.Adapter.onResponseReceived

从代码中可以看到,这部分代码的主要逻辑是从clientHandlers容器中,获取到response的处理句柄——ResponseHandler。关于clientHandlers,之前在介绍TransportService.sendRequest方法时,介绍过了。下面结合此部分代码,重新回忆和梳理一下request的发送和响应流程:

  1. 发送方构建Request,在提交Request的同时,还需要提供responseHandler的响应信息回调处理句柄对象。
  2. 发送方将构建的Request对象和responseHandler句柄传递给TransportService的sendRequest方法。
  3. TransportService的sendRequest方法首先给request分配一个requestId,然后将requestId和responseHandler已key-value对的方式存储到clientHandlers容器中。随后,sendRequest调用transport成员变量的sendRequest方法执行数据发送操作。
  4. 接收方接收到request,进行处理,并返回response。(这部分操作在下面会进一步描述)。然后通过请求的通道(channel)将response返回。
  5. 发送方通过Netty框架完成接收数据包的处理,根据数据包的status字段,判断这是一个Response,然后调用MessageChannelHandler的相应函数进行处理。MessageChannelHandler最终通过调用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根据requestId查找到该response对应的handler处理句柄对象。
  6. 调用handler的handleResponse方法进行返回结果的处理。根据handler的执行线程选择,可能在数据接收线程里面直接进行处理,也可能在线程池调用线程进行处理。
    下面,首先对上述第5步的数据包接收处理过程进行详细描述。
TransportService.Adapter.getRequestHandler方法
TransportService.Adapter.getRequestHandler

代码很简单,就是直接调用requestHandlers的get方法。requestHandlers也是个map,key值是action,value值是RequestHandlerRegistry,这个Registry中包含相应消息的hander句柄对象。

那么requestHandlers是由谁构建的呢,这个requestHandler是在系统启动时,由各个消息相应的Action对象通过调用registerRequestHandler方法,注册到TransportService中的。整个ElasticSearch各个模块,其中大量功能是需要用到节点间通讯的。因此,ElasticSearch各个模块均会调用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction为例进行说明,代码如下:

SearchServiceTransportAction

从代码中可以看到,SearchServiceTransportAction中注册了大量不同类型的Request的处理句柄。

MessageChannelHandler的messageReceived方法

MessageChannelHandler作为Netty的Decoder的实现类。需要重载messageReceived方法。在该方法中,根据消息的status信息,来决定如何对消息进行处理。具体代码如下:

MessageChannelHandler的消息分发逻辑

可以看到,实际处理消息内容的函数有如下几个:

handleRequest函数

handleRequest函数的代码如下图所示:

MessageChannelHandler.handleRequest

上述代码主要有三个需要注意的地方,已经在上面的代码中通过红色方框标出

获取requestHandler

通过调用transportServiceAdapter.getRequestHandler方法实现,这部分代码在前面介绍transportServiceAdpater成员变量的时候,已经进行了较为详细的说明。

执行request的消息处理函数

从代码上看,是根据request的处理句柄对象的执行方式设定来决定是在当前线程(Netty的消息处理线程)中进行消息处理还是在特定的线程池中完成消息处理。

异常信息返回

如果在request消息处理过程中发生异常,则调用transportChannel.sendResponse(Throwable e)方法将错误信息返回给request请求节点。

handleResponse函数

handleResponse函数的代码如下图所示:

MessageChannelHandler.handleResponse

handleResponse方法中并无特殊需要注意的代码。大致逻辑与handleRequest相同。

上一篇 下一篇

猜你喜欢

热点阅读