我爱编程Kafka文字欲

无镜--kafka之消费者(四)

2018-09-26  本文已影响2人  绍圣

消费者网络客户端轮询:ConsumerNetworkClient。ConsumerNetworkClient是对NetworkClient的封装。

客户端发送请求后,不知道服务端什么时候返回响应。所以客户端获取结果有三种轮询方式:

1,客户端不阻塞,设置超时时间为0,表示请求发送完成后马上返回到调用者的主线程中。

2,客户端设置超时时间,如果在指定时间内没有结果,返回返回到调用者的主线程中。

3,客户端设置超时时间为最大值(可以理解为一直阻塞),如果没有结果,就会一直阻塞,不会返回到调用者的主线程中。

NetworkClient发送请求过程

1,client.ready(node):客户端连接上目标节点,并准备好发送请求。

2,client.send(request):发送请求,将请求设置到网络通道中。

3,client.poll(timeout):客户端轮询获取结果。

ConsumerNetworkClient发送请求过程

1,send():创建客户端请求,并缓存到未发送的请求集合(unsent)中。

2,poll():处理客户端请求。

3,trySend():调用NetworkClient.send(),暂时把请求放到网络通道中。

4,NetworkClient.poll():真正发送请求。

trySend()

处理unsent:Map<节点,List<请求对象>>中保存的请求,把各个节点的请求设置到各个节点对应的通道(KafkaChannel)中,并注册该通道的写事件。注:每个节点对应的通道一次只会处理一个请求,所以如果一个节点的上一个发送请求还没有发送,那么当前此节点的当前请求就不会设置到通道中。成功设置到节点对应的通道后,从集合中删除,以防重复发送。

NetworkClient.poll()

真正把请求发送到网络中。trySend()方法中,为节点对应的通道注册了写事件。在轮询方法中,写事件准备就绪,处理通道的写操作将数据写到网络中。trySend()方法可能设置多个节点的请求,所以就会有多个通道的写事件准备就绪,因此轮询方法中就会发送多个请求。kafka的处理方式不是每次调用NetworkClient.send()方法就调用一次NetworkClient.poll()。而是把所有准备好的客户端请求都设置到对应的网络通道后执行一次轮询:把所有写事件准备就绪的通道找出来,执行写操作。

NetworkClient:无镜--kafka之生产者(三) - 简书 有比较详细的介绍。

心跳任务

每个消费者都需要定时的向服务端的协调者发送心跳,以表明自己是存活的。如果消费者在一段时间内没有发送心跳到服务端的协调者,那么服务端的协调者就会认为消费者挂掉。就会将挂掉的消费者上的分区分给消费组中的其他消费者。

发送心跳是在消费者的协调者上完成的,消费者在加入消费组时,启动发送心跳线程。ConsumerCoordinator.poll-->ensureActiveGroup()-->startHeartbeatThreadIfNeeded()-->HeartbeatThread().start()。HeartbeatThread是AbstractCoordinator的内部类。HeartbeatThread采用死循环来不断的发送心跳请求。

发送心跳请求采用组合模式,每个消费者都只有一个心跳任务,心跳对象记录了心跳任务的元数据。

public final class Heartbeat {

private final long sessionTimeout;  // 会话超时的时间,超过表示会话失败

private final long heartbeatInterval; // 心跳间隔,表示多久发送一次心跳

private final long maxPollInterval;

private final long retryBackoffMs;

private volatile long lastHeartbeatSend; // 发送心跳请求时,记录发送时间

private long lastHeartbeatReceive; // 接收心跳结果后,记录接收时间

private long lastSessionReset; // 上一次的会话重置时间

private long lastPoll;

private boolean heartbeatFailed;

public boolean shouldHeartbeat(long now) { return timeToNextHeartbeat(now) == 0; }

/*** 计算下一次发送心跳的时间 * * @param now * @return 0:表示立即发送,不等于:表示离下一次发送心跳的时间 */

public long timeToNextHeartbeat(long now) {

// 从上次发送心跳后到现在一共过去了多长时间

long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);

final long delayToNextHeartbeat;

if (heartbeatFailed)

delayToNextHeartbeat = retryBackoffMs;

else

delayToNextHeartbeat = heartbeatInterval;

// 从上次发送心跳后到现在一共过去了多长时间大于了心跳间隔时间,表明要立即发送心跳。返回0

if (timeSinceLastHeartbeat > delayToNextHeartbeat)

return 0;

else // 否则返回还有多久发送下一次心跳请求

return delayToNextHeartbeat - timeSinceLastHeartbeat;

}

}

消费者和服务端的协调者进行交互,必须确保消费者连接上协调者所在的节点。但是在交互过程中两边都会出现问题。比如协调者可能会挂掉,那么服务端应该给消费组重新选择一个协调者,那么后面消费组里面的消费者就需要去连接新的协调者了。所以在消费者的发送心跳线程里面必须针对服务端返回的不同的错误码处理不同的业务:

if (coordinatorUnknown()) { // 没有连接上服务端协调者

if (findCoordinatorFuture == null)

lookupCoordinator(); // 发送GroupCoordinator请求

else

AbstractCoordinator.this.wait(retryBackoffMs);

} else if (heartbeat.sessionTimeoutExpired(now)) { // 在会话超时时间内没有收到心跳应答,客户端认为协调者挂了.

coordinatorDead(); // 处理协调者挂掉的逻辑,比如:处理unsent变量中保存的请求的处理器中的onFailure方法

} else if (heartbeat.pollTimeoutExpired(now)) { // 查看消费者客户端的轮询时不是超过了心跳最大的轮询等待时间

maybeLeaveGroup(); // 发送离开组请求

} else if (!heartbeat.shouldHeartbeat(now)) { // 现在不需要发送心跳,下一次循环再检查 AbstractCoordinator.this.wait(retryBackoffMs);

}

消费者提交偏移量

消费组发生再平衡时分区会被分配给新的消费者,为了保证新的消费者能够从分区的上一次消费位置继续拉取并处理消息的话,那么每个消费者都需要将所消费的分区的消费进度定时的同步给消费组对应的服务端协调者节点上。

在KafkaConsumer中提供了两种偏移量的提交方式:同步和异步。

异步

如果用户设置了自动提交偏移量(enable.auto.commit=true),客户端在每一次轮询的时候,都会自定提交偏移量。

KafkaConsumer.poll()-->KafkaConsumer.pollOnce()-->ConsumerCoordinator.poll()-->ConsumerCoordinator.maybeAutoCommitOffsetsAsync()

可以看出提交偏移量请求还是通过客户端协调者发送的。每次的轮询在发送拉取请求之前,在客户端协调者的轮询方法中,除了检查心跳,就是要提交偏移量。也就是说只要消费者要去消费消息,就会执行提交偏移量的动作(enable.auto.commit=true的前提下)。

疑问:如果一个消费者消费了一次消息之后,就不消费了或者就挂机了。这时提交偏移量还没有提交给服务端的,那么在再平衡后把此分区分给了消费组中的其他消费者后就会出现重复消费了。当然还有一种情况就是一个主题下的一个分区只能被一个消费组中的一个消费者所消费,但是可以被其他消费组中的消费者进行消费,这样情况是kafka所准许的,本身就会出现重复消费的情况,kafka只保证分区在同一个消费组中的有序性,不保证在不同消费组中的有序性,所以以上情况下也就算是一个消费组中的一个消费者挂了或者不消费消息了,对其他消费组是没有任何影响的。

那么kafka是如何来处理这个问题的或者是有没有处理这个问题喃?


ConsumerCoordinator.maybeAutoCommitOffsetsAsync()

private void maybeAutoCommitOffsetsAsync(long now) {

if (autoCommitEnabled) {

if (coordinatorUnknown()) {

this.nextAutoCommitDeadline = now + retryBackoffMs;

} else if (now >= nextAutoCommitDeadline) {

this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync();

}

}

}

/** * 执行提交偏移量请求 */

private void doAutoCommitOffsetsAsync() { commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {

public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

if (exception != null) {

if (exception instanceof RetriableException)

nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);

} else {

}

}

});

}

/** * @param offsets 使用分区的拉取偏移量作为分区的提交偏移量提交到服务端中 * @param callback */

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {

invokeCompletedOffsetCommitCallbacks();

if (!coordinatorUnknown()) { // 确保连接上服务端的协调者,执行提交偏移量请求 doCommitOffsetsAsync(offsets, callback);

} else {

// 发送连接服务端的协调者请求,并在监听器中执行提交偏移量请求 lookupCoordinator().addListener(new RequestFutureListener<Void>() {

public void onSuccess(Void value) {

doCommitOffsetsAsync(offsets, callback);

}

public void onFailure(RuntimeException e) {

completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));

}

});

}

client.pollNoWakeup();

}

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {

this.subscriptions.needRefreshCommits(); // 通知订阅状态需要拉取提交偏移量 RequestFuture<Void> future = sendOffsetCommitRequest(offsets);

final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() {

public void onSuccess(Void value) {

if (interceptors != null)

interceptors.onCommit(offsets);

completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));

}

public void onFailure(RuntimeException e) {

completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); }

});

}

/** * 发送提交偏移量的请求 * @param 分区的拉取偏移量作为提交偏移量 */

private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {

if (offsets.isEmpty())

return RequestFuture.voidSuccess();

Node coordinator = coordinator();

if (coordinator == null)

return RequestFuture.coordinatorNotAvailable();

Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());

for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue();

offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata()));

}

final Generation generation;

if (subscriptions.partitionsAutoAssigned())

generation = generation();

else

generation = Generation.NO_GENERATION;

if (generation == null)

return RequestFuture.failure(new CommitFailedException());

OffsetCommitRequest req = new OffsetCommitRequest( this.groupId, generation.generationId, generation.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData);

return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets));

}

采用组合模式发送提交偏移量请求(OFFSET_COMMIT)。提交偏移量请求其实就是把拉取偏移量的值提交到服务端去保存。消费者收到提交偏移量请求的响应后由OffsetCommitResponseHandler(组合模式中的适配器)处理,更新消费者的订阅状态中的提交偏移量。

在组合模式异步请求的监听器的回调方法中,把完成的请求存放到completedOffsetCommits变量中。在ConsumerCoordinator.poll的方法中第一句会调用completedOffsetCommits变量中保存完成的提交偏移量请求的callback方法。

以上代码中offsets参数来自于订阅状态(SubscriptionState)的allConsumed()方法。消费者所有消费的分区偏移量实际上是分区状态对象(TopicPartitionState)的拉取偏移量(position变量),而不是提交偏移量(committed)

拉取偏移量和提交偏移量的关系

谈到拉取偏移量就会想到拉取请求,发送一次拉取请求,在客户端轮询方法返回拉取的记录集之前,会计算出下一次发送拉取请求时用到的拉取偏移量的值,并更新分区状态的拉取偏移量。在这个时候并没有更新提交偏移量,所以拉取偏移量也能代表分区的消费进度。

疑问:如果客户端返回的记录集后面就出现异常或者宕机了。那么最新计算的拉取偏移量还没有赋值给提交偏移量和提交到服务端的协调者节点中保存,那么等消费者重新启动的时候,获取拉取偏移量就是老的,这样拉取的消息就是消费过的,出现重复消费。

消费者客户端在轮询方法中返回记录集的时候就计算出下一次拉取请求的偏移量,并更新分区状态的拉取偏移量。之后的处理记录集的业务是用户自己保证。如果在下一次轮询前,客户端挂掉,那么没有把已经处理过的偏移量提交到服务端协调者,那么等客户端下一次启动的时候,从服务端协调者获取的偏移量就是老的。这样就会出现重复消费。这点kafka把处理方式留给了业务系统。

拉取记录集(enable.auto.commit=true):

1,先提交拉取偏移量的值到服务端的协调者。提交请求成功在回调方法中更新分区状态的提交偏移量。注意这里的拉取偏移量要么是从服务端获得的拉取偏移量,要么就是上一次拉取到记录后重新计算出的拉取偏移量。

2,获取记录集:

      2-1有记录:计算出下一次拉取请求的拉取偏移量并更新分区状态的拉取偏移量的值。

      2-2无记录:使用从服务端获得的拉取偏移量创建拉取请求。

发送提交拉取偏移量的请求是在发送拉取请求之前,也就是说使用的拉取偏移量在拉取记录之前就保持到了服务器的协调者中。试想把这两个步骤反过来,先发送拉取请求再发送提交拉取偏移量的请求,会出现什么情况喃?

这个引出了消息处理语义:至多一次,至少一次,正好一次。

至多一次:消息最多被处理一次,可能会丢失,但绝不会重复消费。

至少一次:消息至少被处理一次,不可能丢失,但可能会重复消费。

正好一次:消息正好被处理一次,不可能丢失,但绝不会重复消费。

至多一次

现象:先发送提交拉取偏移量的请求保存消费进度,再获取记录集处理消息。这样可能会出现:消费者发送提交拉取偏移量请求在服务端保存完消费进度后,再处理消息之前挂掉。那么在再平衡后新的消费者获取的拉取偏移量在这个位置之前的消息可能没有被真正的处理。这样就是出现消息丢失了(没有被处理,消息还在服务器里)。

kafka实现至多一次的方式:设置消费者自动提交偏移量,并且设置较短的提交时间间隔。

至少一次

现象:先获取记录集处理消息。再发送提交拉取偏移量的请求保存消费进度。这样可能会出现:消费者处理完消息,但是在发送提交拉取偏移量的请求保存消费进度的时候挂了。那么在再平衡后新的消费者获取的拉取偏移量在这个位置后面的消息可能被处理过了,那么新的消费者又会重新处理一次,这样消息就被重复处理了。

kafka实现至少一次的方式:设置消费者自动提交偏移量,但设置很长的提交时间间隔;或者关闭消费者自动提交偏移量,处理完消息后手动同步提交偏移量。

正好一次

正好一次其实就是保证处理记录集和保存偏移量的请求必须是一个原子操作。要么同时成功要么同时失败。

kafka实现至少一次的方式:关闭消费者自动提交偏移量,订阅主题时设置自定义的消费者再平衡监听器:发送再平衡时,获取偏移量就从关心数据库或者是文件中获取。

同步

消费者同步提交偏移量的做法:在最外层用一个死循环来确保必须收到服务端的响应结果才能结束。

public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())

return;

while (true) {

ensureCoordinatorReady();

RequestFuture<Void> future = sendOffsetCommitRequest(offsets);

client.poll(future);

if (future.succeeded()) {

if (interceptors != null)

interceptors.onCommit(offsets);

return;

}

if (!future.isRetriable())

throw future.exception();

time.sleep(retryBackoffMs);

}

}

同步方式提交偏移量通常是存在依赖条件,必须等待偏移量提交完成后才能继续往下执行。在加入消费组或者是重新加入消费组的时候,如果enable.auto.commit=true,那么就会用阻塞的方式完成一次提交偏移量请求。把自己本地保存的最新的拉取偏移量提交到服务器端的协调者保存。这样后面分到分区的消费者获取拉取偏移量就可以从最新的消费点开始消费。

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

上一篇下一篇

猜你喜欢

热点阅读