无镜--kafka之生产者(五)
在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎么保证有序的喃?
使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性
//记录tp是否还有未完成的RecordBatch,保证一个tp的顺序性,当一个tp对应的RecordBatch要开始发送时,就将此tp加入到muted中,tp对应的RecordBatch发送完成后,删除muted中的tp
private final Set muted;
public void mutePartition(TopicPartition tp) { muted.add(tp); }
public void unmutePartition(TopicPartition tp) { muted.remove(tp); }
RecordAccumulator.ready方法中进行判断(伪代码)
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
if (!readyNodes.contains(leader) && !muted.contains(part)) {}
}
if (!readyNodes.contains(leader) && !muted.contains(part)),如果muted中包含了这个tp,那么即使这个tp对应的leader存在,RecordBatch可以发送也不会去发送它,因为它上一个RecordBatch还没有处理完成。
RecordAccumulator.drain方法中进行判断(伪代码)
public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {
if (!muted.contains(tp)){}
}
if (!muted.contains(tp))在对RecordAccumulator中的记录进行重新组装的时候,依旧会判断对应的tp是否在muted中。在muted中的依旧不会选择出来发送。
在Sender中的变量:guaranteeMessageOrder:是否保持单个partition的有序性
在KafkaProducer的构造中
this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);
public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }
guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1
我们可以在使用的时候设置max.in.flight.requests.per.connection来设置guaranteeMessageOrder的值。
mutePartition和unmutePartition方法都是在Sender中进行调用
mutePartition在Sender.run中调用
if (guaranteeMessageOrder) {
// 记录将要发送的topicPartition到mute中
for (List batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
发送的时候,把将要提交的RecordBatch的tp加到muted中。下次再需要发送tp里的RecordBatch的时候,如果muted里面包含了此tp,就不会选择出来发送。
在处理服务端响应的时候,清除muted中的tp
if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);
总结:要保证单partition的有序性,需要配置max.in.flight.requests.per.connection=1。