kafka 网络模型2 Selector

2020-06-06  本文已影响0人  不存在的里皮

在上一篇文章中kafka 网络模型1 请求响应流程,我分析了Kafka的请求、响应流程,但留下了Selector的疑点。本文会分析Selector和它的poll()是如何进行网络IO的,NetworkReceive是如何被完整读取的,Send是如何被完整写出的,还会涉及到KafkaChannel和它的mute机制。

Selector

读完上一篇文章,我们应当理解了Kafka Selector在内部维护了一个java nio Selector,变量名叫nioSelector


外部向Kafka Selector注册SocketChannel,其实都是注册到了java Selector上。
而Kafka Selector又是通过调用java Selector,来收集触发了I/O事件的Socket,从而对其执行I/O。
图中橙色的圆圈代表触发了I/O事件的Socket

poll

Processor线程在主循环调用Selector::poll,看下它的实现。方法一开始就调用了clear。


结合注释可知,clear的实现是很简单地清空各种成员变量,因为这些变量都是上一次poll()的结果,在这次poll()之前就应该被处理过。[1]

Selector::poll的方法很长,我们看到它的方法主体,借助其内部维护的java Selector收集了触发I/O事件的SelectionKey,并调用pollSelectionKeys执行I/O。



pollSelectionKeys

收集到了要I/O的SelectionKey后,pollSelectionKeys要怎么做呢?

读者可以先自主思考: 现在收集到了一组触发了I/O事件的SelectionKey,它们可能触发了读事件/写事件,或者两者都触发了。那么我们要对这些SelectionKey逐一判断触发的事件,如果触发了读事件,就尝试把字节流读进NetworkReceive;如果触发了写事件,就尝试把Send的数据写出去。

没错,pollSelectionKeys就是这么做的。对每个SelectionKey,它首先取出它上面附着的KafkaChannel,以便之后要进行IO操作时,对其进行IO。



然后,根据条件判断调用attempRead,并调用attempWrite。

在条件满足时,attemptRead会被调用于读取NetworkReceive;在attemptWrite条件满足后,write会被调用于写出Send。我们看下attemptRead和write的实现, 不难发现规律:

addToCompletedReceives

此处可见,KafkaChannel最重要的方法是有关IOreadmaybeCompleteReceivewritemaybeCompleteSend。我们之后再看它们的实现

KafkaChannel

KafkaChannel是基于java SocketChannel上的一层封装(尽管它是利用java nio attachment机制附着在SocketChannel上的对象)。每个KafkaChannel代表一个客户端,一个KafkaSelector会管理多个KafkaChannel,并对其进行IO操作。


KafkaChannel的附着
上文提到"尽管它是利用java nio attachment机制附着在SocketChannel上的对象",KafkaChannel是如何被附着的呢?
我们回到Processor的源码,在主循环中的其中一个方法configureNewConnections中,将SocketChannel注册到Selector上


Selector会一路调用至buildAndAttachKafkaChannel,在此创建KafkaChannel并附着到SocketChannel上


KafkaChannel的结构
KafkaChannel中最重要的三个成员变量是TransportLayer、NetworkReceive和Send。KafkaChannel通过TransportLayer进行读写,读取NetworkReceive,写出Send。

TransportLayer

我们稍占篇幅用来解释"TransportLayer只是SocketChannel的一层封装"这一点。
首先从它的注释可见,该接口就是SocketChannel的一个封装,可以直接当做SocketChannel的替代。


TransportLayer和SocketChannel都继承了GatheringByteChannel和ScatteringByteChannel,因此能对ByteBuffer和ByteBuffer数组进行读写

SocketChannel

这个类只有两个实现,分别对应PLAINTEXT和SSL模式,两个实现都维护了对SocketChannel的引用。篇幅关系,这里就不看SslTransportLayer了。


总之,对TransportLayer调用的各种IO方法,在底层都是转交给SocketChannel完成的,所以我们可以把它当做SocketChannel一样使用。

IO

在上一章我们说到,Selector对客户端的IO在于attempRead和write,但后者又会对KafkaChannel调用read、maybeCompleteReceive、write、maybeCompleteSend,这些都是KafkaChannel有关IO的重要方法。

我们先看KafkaChannel是如何执行读取的,可知,读取和判断是否完成,与NetworkReceive::readFrom和NetworkReceive::complete有关。





再看KafkaChannel是如何执行写出的

NetworkReceive和Send

NetworkReceive
结构如下,NetworkReceive包含两个ByteBuffer,叫做size和buffer。

在读取时,先读取4字节到size内,再根据size指示的大小为buffer分配内存,然后读满整个buffer时,NetworkReceive就读取完成了。由于缓存的大小清晰,能够避免"tcp粘包"问题


从构造函数中看出,size是固定4字节的



readFrom方法负责读取size和buffer,由于该方法可能被多次调用,每次都需要判断size和buffer的状态,并读取。




complete方法判断是否读取完成,也就是size和buffer是否都读满了

Send
Send是一个接口,含有completed和writeTo方法。有三个类/抽象类实现了writeTo方法。注释中强调了该方法可能会被调动多次才写出完成,因此其实现都遵循了这一点。

以ByteBufferSend为例,在构造函数中,计算了remaining,要写出的剩余字节数



writeTo方法负责写出一组ByteBuffer



completed方法会判断remaining是否不大于0(在PLAINTEXT下,pending始终为false)

mute机制

在读取一个请求后,mute
写出一个响应后,unmute
这样做是为了使得每个请求一来一回,有序排队


  1. 比如completedReceives是上一次poll()中收到的请求,在这次poll()调用前就应当被处理过,所以这一次调用就应该清空。换句话说,如果这次不清空,那之后Processor就会重复处理这些请求了。

  2. 读者可自行点击查看这两个方法在PLAINTEXT下的实现,此处就不占用篇幅了。

上一篇下一篇

猜你喜欢

热点阅读