无镜--kafka之生产者(三)
话说上回中,KafkaProducer已经将生产的记录追加到了RecordAccumulator中。那么接下来的事情,就是怎么样把这些记录提交到服务端了。
是否还记得在KafkaProducer.doSend方法一下代码段:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
// 5.如果 batch已经满了或者是新建立的Batch,唤醒 sender线程发送数据
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
在一个RecordBatch已经满了或是新建立了一个RecordBatch(之所以新建是因为旧的放不下消息了。因此意味着旧的就可以发送了)。就唤醒发送线程,准备提交记录到服务端。
this.sender.wakeup(); // 将Sender线程从阻塞中唤醒
Sender
实现Runnable接口的对象。一个KafkaProducer持有一个Sender实例。Sender线程迭代RecordAccumulator中batches变量的每个分区(tp),获取分区对应的主副本节点,然后取出分区对应队列中的RecordBatch,提交到服务端。(追加消息到记录收集器中(RecordAccumulator)都是按照分区分好组了,所以每个分区队列都是保存的即将发送到这个分区主副本对应的节点上的记录)。
Sender.run
void run(long now) {
// 从元数据对象中获取集群信息
Cluster cluster = metadata.fetch();
// 遍历所有的topic-partition,如果其对应的RecordBatch可以发送(大小达到 batch.size或时间达到 linger.ms) 就取出其对应的leader。
// 返回ReadyCheckResult实例,包含:可以发送的RecordBatch对应的节点(leader)等信息
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果有topic-partition的leader是未知的,就强制metadata更新
if (!result.unknownLeaderTopics.isEmpty()) {
// ready()方法中遇到没有leader的tp就将其加入ReadyCheckResult.unknownLeaderTopics的set集合中
// 然后会去请求这些tp的的meta
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 如果与node没有连接(如果允许连接但还没连接,就初始化连接),就证明该node暂时不能接收数据,暂时移除该 node
// 建立到主节点的网络连接,移除还没有准备好的节点(leader还没有选择出来的节点) Iterator iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是 node.id),并将 RecordBatch 从对应的 queue 中移除
// 读取记录收集器,返回组合好的在同一个节点上的所有主副本对应的分区的RecordBatch
// Map<nodeID,要准备发送到该节点的所有RecordBatch(包括不同的分区)>
Map<Integer,List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);
if (guaranteeMessageOrder) {
// 记录将要发送的 topicPartition
for (List batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 将由于元数据不可用等情况而导致不能发送的 RecordBatch移除
List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
// 构建以节点为级别的生产请求列表,既每个节点只有一个客户端请求
// 减少客户端到服务端的请求次数
List requests = createProduceRequests(batches, now);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now); // 保存要发送的客户端请求,这里没有真正的发送
// 执行真正的网络读写请求,将上面的客户端请求真正发送出去
this.client.poll(pollTimeout, now);
}
在发送线程发送消息时,记录收集器会按照节点维度将RecordBatch重新组装(Map<nodeID,要准备发送到该节点的所有RecordBatch>),返回给发送线程,再由发送线程为每一个节点创建一个客户端请求。
细看一下run中的方法:
RecordAccumulator.ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque deque = entry.getValue();
Node leader = cluster.leaderFor(part); // 查询tp的leader对应的节点信息
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
// 如果 muted 集合包含这个 tp,那么在遍历时将不会处理它对应的 deque,
// 也就是说,如果一个 tp在muted集合中,说明它还有RecordBatch正在处理中(没有收到响应)
// 那么即使它对应的RecordBatch可以发送了,也不会处理
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull(); // batch满了
boolean expired = waitedTimeMs >= timeToWaitMs; // batch超时
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader); // 将可以发送的leader添加到集合中
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
ready方法返回的ReadyCheckResult对象包括:可以发送的RecordBatch对应的节点(leader)信息,下一次就绪检查点的时间,分区的leader未知的topic信息。发现有分区的leader未知的topic信息那么就会去强制更新元数据里面的集群信息。
RecordAccumulator.drain
public Map<Integer,List<RecordBatch>> drain(Cluster cluster, Set nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer,List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
if (!muted.contains(tp)) {
// 被 mute 的 tp 依然不会被遍历
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
// tp有对应的队列有数据,会选择出来,加上已经被选择出来的RecordBatch,直到达到最大的请求长度,才停止
// 这样一个RecordBatch及时没有达到发送条件(没有装满),为了保证每个请求尽可能多的发送数据,也会被发送出去。
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
if (!backoff) {
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
drain方法,在max.request.size的范围内发送尽可能多的RecordBatch。并且重新按照节点维度重新整合记录。
在记录收集器中的存储数据格式为:batches-->Map<TopicPartition,Deque<RecordBatch>>。发送线程获取数据时记录收集器返回的数据格式为:batches-->Map<nodeId,List<RecordBatch>>
记录收集器(RecordAccumulator),发送线程(Sender),服务端(Broker)
参考了《Kafka技术内幕:图文详解Kafka源码设计与实现》中的图
wakeup方法把发送线程唤醒,但是Sender并不负责真正发送客户端请求到服务端,它做的事情只是从记录收集器(RecordAccumulator)中,取出可以发送的记录,封装成Map<nodeId,List<RecordBatch>>结构,创建好客户端请求,然后把请求交给NetworkClient(客户端网络对象)去发送。
NetworkClient
Kafka客户端发送是基于NIO构建自己的通信层NetworkClient。它管理了客户端和服务端的网络通信。
以上是NetworkClient关于通信层方面的生态类。
NetworkClient重要的几个方法:
ready():连接所有可以连接的节点。如果服务器不能连接,就把节点移除掉。
send():在当前节点可以发送新的请求的情况下(这里的可以发是在能正常连接的情况下,同一个节点,一个客户端请求还没有完成时,就不能发送新的客户端请求),把Sender发送线程创建的客户端请求,存到节点对应的通道中(KafkaChannel),并缓存到“没有收到响应的队列”中(InFlightRequests)。
poll():轮询,真正的执行网络请求,发送请求到节点,读取响应。此方法中要调用org.apache.kafka.common.network.Selector.poll()方法。在一次poll之后会对这次poll数据进行相关的处理:
1,处理已经完成的Send,包括那些发送完成后不需要响应的Send-->handleCompletedSends。
2,处理从服务端接收到响应-->handleCompletedReceives。
3,处理连接失败那些连接-->handleDisconnections。
4,处理新建立的那些连接-->handleConnections。
5,超时的请求-->handleTimedOutRequests。
6,调用请求的回调函数。
Selector(org.apache.kafka.common.network)
来回顾一下java NIO中的一些概念:以下描述参考:《Kafka技术内幕:图文详解Kafka源码设计与实现》
SocketChannel:客户端网络连接通道,底层的字节数据读写都发生在通道上(从通道中读取数据,将数据写入通道中),通道会和字节缓冲区一起使用,从通道中读取数据时需要构造一个缓冲区,调用channel.read(buffer)就会将通道中的数据添加到缓冲区中。将数据写入通道时。要先将数据写到缓存区中,调用channel.write(buffer)将缓冲区中的每个字节写入到通道中。
Selector:发生在通道上的事件有读和写,选择器会通过选择键的方式监听读写事件的发生。
SelectionKey:将通道注册到选择器上:channel.register(selector)会返回选择键。选择键将通道和选择器关联起来。读和写事件发生时,通过选择键可以得到对应的通道,从而在通道上进行读写操作。
Sender,NetworkClient,Selector
KafkaChannel
id:NodeId
TransportLayer:负责字节操作的传输层,KafkaChannel要操作SockerChannel时,都交给TransportLayer传输层去做。
NetworkReceive:接收的数据。
Send:发送的请求数据,一个KafkaChannel一次只存放一个请求数据。等着数据发送完成后,才能发送下一个请求数据。
TransportLayer
传输层对SockerChannel做了简单的封装(都实现了ScatteringByteChannel和GatheringByteChannel接口),选择器Selector在调用KafkaChannel.write和read方法时,实际是调用Send.writeTo和NetworkReceive.readFrom,再调用底层SockerChannel.write和read方法
Selector轮询选择器监听到了客户端的读写事件,会获取绑定到选择键(SelectionKey)上的KafkaChannel,KafkaChannel会将读写操作交给传输层(TransportLayer),TransportLayer再使用底层的SocketChannel完成数据的操作。
NetworkClient.ready
在确认节点是否可以发送的时,允许连接但是没有连接的情况下会初始化连接,调用org.apache.kafka.common.network.Selector.connect,连接动作使用java原生的SocketChannel完成。在此方法中会构建KafkaChannel,让KafkaChannel和SelectionKey关联起来。还维护了节点和KafkaChannel的映射关系(<nodeId,KafkaChannel>)。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true);
boolean connected = socketChannel.connect(address); // 发起连接请求
SelectionKey key = socketChannel.register(java.nio.channels.Selector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); // 构建KafkaChannel对象
key.attach(channel); // 将KafkaChannel注册到选择键上
this.channels.put(id, channel); // 节点和KafkaChannel的映射关系
NetworkClient.send
客户端发送的ClientRequest请求,经过NetworkClient.send()--->org.apache.kafka.common.network.Selector.send()--->KafkaChannel.setSend()。保存到对应的KafkaChannel中,但在KafkaChannel还有未发送成功的Send请求,则后面的请求则不能发送(在一个KafkaChannel中,一次只能发送一个客户端请求)。
KafkaChannel一次只处理一个Send,每次都会注册写事件,当Send发送成功后,就注销写事件。这里的发送完成是整个Send请求发送完成,如果调用一次底层的write方法没有完成写完,那么写事件不会被注销,会继续监听写事件,直到整个Send请求发送完成。
注册写事件,当Selector轮询后,写事件准备就绪,就会从KafkaChannel取出客户端请求,调用底层的write方法进行发送。
NetworkClient.poll
NetworkClient的轮询会调用Selector.poll(),在选择键上处理读写事件,当事件发生时,调用KafkaChannel上的read和write会得到返回值NetworkReceive和Send对象,加入到List<Send>:completedSends(发送完成的客户端请求对象集合)和Map<KafkaChannel,Deque<NetworkReceive>>:stagedReceives(完全接收完服务端响应保存到KafkaChannel对应的队列中)。最后这些集合中的数据服务于poll方法后续的handle开头的方法中。
private void pollSelectionKeys(Iterable selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key); // 获取绑定到选择键中的KafkaChannel sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try { // 处理一些刚建立 tcp 连接的 channel
if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 连接已经建立
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
}
else continue;
}
if (channel.isConnected() && !channel.ready())
channel.prepare();
// 在读取一个响应的时候,可能会调用很多次的read,所以需要循环读取
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) // 循环接收,直到读取到一个完整的 Receive,才退出循环
addToStagedReceives(channel, networkReceive); // 读取完成后将响应数据添加到集合中
}
// 底层发送的时候,并不定一次可以完全发送,所以会调用很多次的write,才会完成一个Send的发送
// write是非阻塞的,不会等到全部发送才返回
// 所以在没有全部发送的时候,不会注销写事件
//在epoll的缺省模式下(LT(水平触发)):写缓冲区只要不满,就一直会触发写事件。所以只要不注销写事件,那么就会触发写事件,直到把一个完整的Send发送完成
// 在LT模式下,写缓冲区为满的概率很小,所以写完Send后,要注销写事件,否则会出现一直触发写事件
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); // send不为空,表示完全发送出去了,返回此send对象,如果没有完全发送出去,就返回NULL
if (send != null) {
this.completedSends.add(send); // 将完成的 send添加到list中 this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) { // 关闭断开的连接
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
close(channel); this.disconnected.add(channel.id());
}
}
}
以上就是生产者的产出客户端请求通过Sender-->NetworkClient-->Selector-->KafkaChannel-->Send/NetworkReceive-->TransportLayer-->SocketChannel。这个链条进行发送和消息接收。