Kafka之Producer端如何更新MetaData(二)
前言
在上篇文章中,我们简单介绍了Producer端的消息发送过程,其中的很多的细节并没有讲解到,我们阅读源码的时候会发现,在消息发送的过程中,有很多地方会请求metadata数据,本篇文章将主要讲解Producer端如何更新metadata。
一、Cluster与MetaData数据结构
MetaData封装了一个关于元数据的逻辑。这个类是客户端线程和后台的Sender线程共享,MetaData维护了一个包含了部分topic的集合,当我们请求topic对应的metadata而不可得时,就会触发metadata的更新。
public class Metadata implements Closeable {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
private MetadataCache cache = MetadataCache.empty();
private boolean needUpdate;
/* Topics with expiry time */
private final Map<String, Long> topics;
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}
Cluster包含了kafka集群中部分的结点、Topics和partitions信息。
public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
}
二、MetaData的更新流程
在producer发送消息的过程中,首先就会确保topicPartition对应的metadata是否存在,其逻辑主要封装在waitOnMetadata方法中。
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
(1)、首先如果cluster中的invalidTopics列表包含了要发送的topic,方法抛出异常。如果缓存的cluster包含了对应的topic的patritions信息,就直接返回。
(2)、do while中的循环逻辑会不断的请求更新metadata数据,直到取到数据或者超时抛出异常。
- metadata.requestUpdate()方法会把相关字段needUpdate置为true, 然后唤醒sender后台线程。
- metadata.awaitUpdate(version, remainingWaitMs)方法等待更新结果,直到更新后版本号大与传进来的版本号,或者超时remainingWaitMs。 方法内部其实仍然是一个循环,跳出循环的条件就是版本号或者超时。
我们发现线程会一直堵塞在一个嵌套的两层循环中,直到超时或者取到想要的结果。其实更新metadata操作,主要是通过sender.wakeup()来唤醒 sender线程,间接唤醒NetworkClient线程,NetworkClient线程来负责发送Metadata请求,并处理Server端的响应。
2.1 NetworkClient中的poll()
在整个Producer端的发送过程中,真正发送请求、处理返回结果的请求封装在NetworkClient类中的poll()方法中,send()函数的方法名字很具有迷惑性,官方wiki给出的注释:"Queue up the given request for sending",send()方法只是将相关信息保存在了InFlightRequests的发送队列中和KafkaChannel中,InFlightRequests保存发送的消息内容、KafkaChannel保存发送的元信息。真正的相关操作实现封装在poll方法中。
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
poll()方法主要包含了四个步骤:
- 如果有因为版本不一致或者失去连接的发送请求,就直接处理掉。
- metadataUpdater.maybeUpdate(now)方法判断现在是否需要更新metadata,如果需要的话,选择负载最小的一个结点,建立连接。maybeUpdate(now, node)主要是将更新metadata请求进入请求到发送队列中去。
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) {
this.metadataFetchInProgress = true;
MetadataRequest.Builder metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return defaultRequestTimeoutMs;
}
// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
if (isAnyNodeConnecting()) {
// Strictly the timeout we should return here is "connect timeout", but as we don't
// have such application level configuration, using reconnect backoff instead.
return reconnectBackoffMs;
}
if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node);
initiateConnect(node, now);
return reconnectBackoffMs;
}
// connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
return Long.MAX_VALUE;
}
}
在上一篇的文章中,我们直到每次请求更新metadata的时候,线程会阻塞在两层的循环中,直到超时或者成功更新。为了方法理解:我们考虑第一次发送数据的情景:
(1)第一次唤醒send线程时,调用poll方法: 尝试与node结点建立连接。
(2)第二次唤醒send线程时,调用poll方法: 发送metadata的更新请求。
(3)第二次唤醒send线程时,调用poll方法: 处理metadata的更新结果。
经过三次唤醒后,成功获取metadata数据,线程跳出循环继续下面的流程。
- selector.poll()方法执行真正的I/O操作,包括发送请求、接受结果。该方法底层依赖了JAVA原生的NIO方法。
- 处理请求后的结果,包括metadata返回的结果和数据请求的结果。在此我们之分析metadata的响应结果,逻辑主要封装在handleCompletedMetadataResponse()方法中:
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
// If any partition has leader with missing listeners, log a few for diagnosing broker configuration
// issues. This could be a transient issue if listeners were added dynamically to brokers.
List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
topicMetadata.partitionMetadata().stream()
.filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
.map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
.collect(Collectors.toList());
if (!missingListenerPartitions.isEmpty()) {
int count = missingListenerPartitions.size();
log.warn("{} partitions have leader brokers without a matching listener, including {}",
count, missingListenerPartitions.subList(0, Math.min(10, count)));
}
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now, null);
} else {
this.metadata.update(response, now);
}
}
2.2 Metadata策略更新形式
通过总结发现,meta的更新主要有两种形式:
- 强制更新:调用metadata.requestUpdate()强制更新,requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了true。强制更新主要存在如下更几种情况:
- 初始化连接,initConnect的时候
- 处理超时请求,handleTimedOutRequests
- 处理失连,handleDisconnections
- topic对应的leader不存在的时候
- 强制关闭Sender线程,forceClose时候
- prodcuer端对应的prodcerId不存在,maybeWaitForProducerId的时候
- 由于InvalidMetadata曝出异常InvalidMetadataException
- 周期更新:根据lastSuccessfulRefreshMs、metadataExpireMs、lastRefreshMs、refreshBackoffMs等字段、周期性的更新metadata数据。
三、总结
本篇文章主要讲解了producer端如何更新metadata,只是介绍了主要的更新流程,还有很多的细节,目前我也没有搞懂,之后如果有新的理解的话,我会及时更新在文章中。下一篇将讲解为了保障partition消息的时序性,producer端做了哪些工作。