Kafka源码分析-Server-网络层(1)
Kafka服务端的架构
在介绍Kafka服务端的代码前,先从整体了解下Kafka服务端的架构:
kafka服务端整体架构 (1).png
网络层
Kafka的客户端会与服务端多个Broker创建网络连接,在网络上流转着各种请求和响应,从而实现客户端和服务端直接的交互。客户端一般不会碰到大数据量的访问,高并发的场景,所以用NetworkClient组件的管理就够了。Kafka的服务端与客户端运行的场景不同,要满足高并发,低延迟的需求,Kafka的服务端使用Reactor模式实现其网络层。Kafka的网络层管理的网络连接不仅有客户端的,也有来自Broker的网络连接。
Reactor模式
Kafka网络层采用的是Reactor模式,是一种基于实际驱动的模式。java NIO提供了实现Reactor模式的API。单线程java NIO的编程模式如下图:
Reactor模式.png
工作原理:
1)先创建ServerSocketChannel对象并在Selector上注册OP_ACCEPT事件,ServerSocketChannel负责监听指定端口上的连接请求。
2)当客户端发起服务端的网络连接时,服务端的Selector监听到此OP_ACCEPT事件,会触发Acceptor来处理OP_ACCEPT。
3)当Acceptor接收到来自客户端的Socket连接请求时会为这个连接创建响应的SocketChannel,将SocketChannel设置为非阻塞模式,并在Selector上注册其关注的I/O事件,如OP_READ,OP_WRITE。此时,客户端和服务端的Socket连接建立完成。
4)当客户端通过已经建立的SocketChannel连接向服务端发送请求时,服务端的Selector会监听到OP_READ事件,并触发执行相应的处理逻辑(上图中的Reader Handler)。当服务端可以向客户端写数据时,服务端的Selector会监听到OP_WRITE事件,并触发相应的执行逻辑(上图中的Writer Handler)。
这些事情都是在同一个线程完成的,KafkaProducer中的Sender线程以及KafkaConsumer的代码都是这种设计。这样的设计时候客户端这样的并发连接数小,数据量较小的场景,这样对于服务端来说就会有缺点。如:某个请求的处理过程比较复杂会造成线程的阻塞,造成所有的后续请求读无法处理,这就会导致大量的请求超时。为了避免这种情况,就必须要求服务端在读取请求,处理请求已经发送响应等各个环节上必须能迅速的完成,这样就提升了编程的难度,在有些情况下实现不了。而且这种模式不能利用服务器多核多处理器的并行处理能力,造成资源的浪费。
为了满足高并发的需求,服务端需要使用多线程来执行逻辑。我们可以对上述架构做调整,将网络的读写的逻辑和业务处理的逻辑进行拆分,让其由不同的线程池来处理,从而实现多线程处理。如下图:
多线程Reactor模式 (1).png
上图中的Acceptor运行在一个线程里,当然也可以使用单线程的ExecutorService实现,因为ExecutorService会在线程异常退出时,创建新的线程进行补偿,所以可以防止出现线程异常退出后整个服务端不能接受请求的异常情况。Reader ThreadPool线程中所有的线程都会在Selector上注册OP_READ事件。Reader ThreadPool中的线程在成功读取请求后,将请求放入MessageQueue这个共享队列中。Handler ThreadPool线程池中的线程会从MessageQueue中取出请求,然后执行业务逻辑对请求进行处理。在这种模式下,即使处理某个请求的线程阻塞了,线程池中还是有其他线程从MessageQueue中取出请求并处理,从而避免了服务端的阻塞。当请求处理完成后,Handler ThreadPool线程还负责产生响应并发送给客户端,这就要求Handler ThreadPool线程池中的线程在Selector中注册OP_WRITE事件,实现发送响应的功能。
而且当读取请求和处理业务直接的速度不匹配时,MessageQueue队列长度的选择就显得很重要,尤其是MessageQueue队列是固定的大小的时候。如果队列长度太小,就会出现拒绝请求的情况;如果不限制MessageQueue队列长度就会出现堆积过多未处理的请求而导致内存溢出。需要设计人员根据实际业务需求做权衡。
上述的设计中读取,写入,业务处理实现了多线程的处理,不再存在性能瓶颈的问题。但是,如果同一个时间出现大量的IO事件,单个Selector就可能在分发事件时阻塞(或延迟)而成为瓶颈。我们可以将上述设计中单个Selector对象扩展成多个,让他们监听不同的IO事件,这样可以避免单个Selector带来的问题。
多线程Reactor模式--多个Selector.png
一般情况下Acceptor单独占用一个Selector。当Acceptor Selector监听到OP_ACCEPT时,会创建相应的SocketChannel,在上图中我们可以采用一定的策略如轮询Selector集合或选择注册连接数最少的Selector,让不同的连接在不同的Acceptor上注册IO事件。之后由此Selector负责监听此SocketChannel上的事件。这样就缓解了单个Selector带来的瓶颈问题。
SocketServer
Kafka的网络层是采用多线程,多个Selector的设计实现的。核心类是SocketServer,其中包含一个Acceptor用于接受并处理所有的新连接,每个Acceptor对应多个Processor线程,每个Processor线程有自己的Selector,主要用于从连接中读取请求和和写回响应。每个Acceptor对应多个Handler线程,主要用于处理请求并将产生响应返回给Processor线程。Processor线程与Handler线程之间通过RequestChannel进行通信,这个网络层的结构如下图:
网络层架构.png
下面介绍SocketServer的具体实现。首先来看SocketServer依赖的组件:
SocketServer.png
- endpoints: Endpoint集合。一般的服务器有多个网卡,可以配置多个IP,Kafka可以监听多个端口。Endpoint类中封装了需要监听的host,port及使用的网络协议。每个Endpoint都会创建一个对应的Acceptor对象。
- numProcessorThreads: Processor线程的个数。
- totalProcessorThreads:Processor线程的总个数,即numProcessorThreads*endpoints.size。
- maxQueueRequests: 在RequestChannel的requestQueue中缓存的最大请求个数
- maxConnectionsPerIp: 每个IP上能创建的最大连接数。
- maxConnectionsPerIpOverrides: Map[String,Int]类型,具体指定某个IP上最大的连接数,这里指定的最大连接数会覆盖上面maxConnectionsPerIp字段的值。
- requestChannel: Processor线程与Handler线程直接交换数据的队列。
- acceptors: Acceptor对象集合,每个Endpoint对应一个Acceptor对象。
-
processors: Processor线程的集合。此集合中包含所有Endpoint对应的Processor线程。
Acceptors集合和Processors集合对应关系.png - connectionQuotas: connectionQuotas类型的对象。在connectionQuotas中提供了控制每个IP上的最大连接数的功能。底层通过一个Map对象,记录每个IP地址上的连接数,创建新Connect时与maxConnectionsPerIpOverrides指定的最大值(或maxConnectionsPerIp)比较,如果超出就报错。当有多个Acceptor线程并发访问底层的Map对象,则需要synchronized进行同步。
再看下SocketServer的初始化流程,SocketServer在初始化时会创建所有的Endpoint,创建与其对应的Acceptor和Processor集合。
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
//创建Processor的数组,其中有totalProcessorThreads个responseQueue队列
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
//创建保存processors数组 长度为totalProcessorThreads
private val processors = new Array[Processor](totalProcessorThreads)
//创建保存acceptors集合
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
private val allMetricNames = (0 until totalProcessorThreads).map { i =>
val tags = new util.HashMap[String, String]()
tags.put("networkProcessor", i.toString)
metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
}
/**
* Start the socket server startup()是socket server核心代码
*/
def startup() {
this.synchronized {//同步
//创建connectionQuotas
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
//socket的sendBuffer大小
val sendBufferSize = config.socketSendBufferBytes
//socket的receiveBuffer大小
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
//遍历endpoints集合
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads
//processors数组 从 processorBeginIndex~processorEndIndex,都是当前endpoint对应的processor对象集合
for (i <- processorBeginIndex until processorEndIndex)
//创建processor对象
processors(i) = newProcessor(i, connectionQuotas, protocol)
//创建acceptor,同时为Processor创建对应的线程。第五个参数指定了Processors数组中与此Acceptor对象对应的Processor对象
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
//创建Acceptor对应的线程,并启动
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup()
//修改processorBeginIndex,为了下一个Endpoint准备
processorBeginIndex = processorEndIndex
}
}
SocketServer的关闭操作简单,会关闭多有个Acceptor和Processor:
/**
* Shutdown the socket server
*/
def shutdown() = {
info("Shutting down")
this.synchronized {//同步
acceptors.values.foreach(_.shutdown)//调用所有acceptor的shutdown
processors.foreach(_.shutdown)//调用所有processor的shutdown
}
info("Shutdown completed")
}