Kafka ——如何保证消息不会丢失
前言
Kafka 提供了数据高可靠的特性,
但是如果使用不当,
你可能无法享受到这一特性,
今天我们就来看看如何正确的使用Kafka 保证数据的不会丢失吧!
生产者的正确的消息发送方式
Kafka为生产者生产消息提供了一个 send(msg)
方法,
另有一个重载的方法send(msg, callback)
,
-
send(msg)
该方法可以将一条消息发送出去,
但是对发送出去的消息没有掌控能力,
无法得知其最后是不是到达了Kafka,
所以这是一种不可靠的发送方式,
但是也因为客户端只需要负责发送,
所以具有较好的性能。Future<RecordMetadata> future = producer.send(record)
上面的示例代码也可以看到,
send
返回的是一个Future
,
也就是说其实你是可以Future.get()
获取返回值的,
但这种同步的方式,基本上可以说是不会用到。 -
send(msg, callback)
该方法可以将一条消息发送出去,
并且可以从callback
回调中得到该条消息的发送结果,
并且callback
是异步回调,
所以在兼具性能的情况下,
也对消息具有比较好的掌控。ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } });
-
综上,如果要使数据不丢失,
首先你就的使用send(msg, callback)
来发送消息,
绝大多数情况下,我也建议你这么做。
生产者的配置
当我们通过 send(msg, callback)
是不是就意味着消息一定不丢失了呢?
答案明显是:不是的
我们接着上面,
send(msg, callback)
里面 callback
返回的成功,
到底是不是真的确保消息万无一失了?
其实这个返回的成功也是可以在生产者配置的:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//*******重点*****************
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
这段代码是生产者发送消息的一个例子,
其中没使用callback
主要是这里callback
不是重点,
我们的重点是props.put("acks", "all");
这个acks
配置属性就是我们callback
成功的具体含义:
-
acks=0
acks = 0如果设置为零,那么生产者将完全不会管服务器是否收到消息。
该记录将立即添加到套接字缓冲区中并视为已发送。
并且重试配置不会生效(因为客户端通常不会知道任何故障)。
返回值的偏移量将始终等于 -1。
该方式具有最大的吞吐量,
一般建议直接配合send(msg)
使用。 -
acks=1
当leader接受到消息就会直接给客户端返回成功,
一般情况下这种模式都能很好的保证数据的不丢失,
只有在laeder接受到数据,
然后还没来得及同步到follower,
就挂掉了才会导致数据的丢失,
这种概率还是比较小的。
这也是默认的选择方式,
兼具较好的吞吐和较高的可靠性 -
acks=all 或者 acks=-1
当leader接受到消息,并同步到了一定数量的follower,
才向生产者发生成功的消息,
同步到的follower数量由 broker 端的min.insync.replicas
决定
除非一些不可抗力因素,
这种方式基本可以确保数据的完全不丢失。
Broker 端的配置
其实到这里,生产者端基本已经做好了数据不丢失的大部分准备,
但是有些东西是要配合 Broker 端一起,
才能达到预期的不丢失数据的,
比如我们上面说到的
-
min.insync.replicas
配置
我们上面知道了,
当 生产者acks = -1
的时候,
写入的副本数就必须 >=min.insync.replicas
数,
当达不到这个要求的时候,
生产者端会收到一个either NotEnoughReplicas or NotEnoughReplicasAfterAppend
的异常。
所以我们这个参数必须不能大于replication.factor
副本数。
否则生产者将无法写入任何数据,
一般建议replication.factor
数要大于min.insync.replicas
,
比如3个机器的集群,设置replication.factor
= 3,
那么设置min.insync.replicas
= 2 就可以了,
这样既保证了数据写入的时候有一个副本的冗余,
也能保证在一些情况下,
某台Broker宕机导致数据无法达到3个副本时,
依然可以正常写入数据。 -
unclean.leader.election.enable
这里 Broker 端还有一个重要的配置就是unclean.leader.election.enable = false
这个配置代表着一些数据落后比较多的 follower,
是否能在leader宕机后被选举成新的 leader
如果你设置成 true,
很明显,如果这样的follower成为新leader,
就会造成最新的一部分数据丢失掉,
重试 retries
上面已经基本完成了不丢数据的方方面面了,
但是有些东西不是我们能控制的,
比如 网络抖动 等不可抗拒的因素,
这时候重试次数就很关键了,
配置合适的retries
重试次数,
和 合适的retry.backoff.ms
重试间隔时间,
将为我们的数据发送提供更高的稳定性,
当然如果实在发送不成功,怎么办呢?
一般我们也可以把发送不成功的数据保存在一个日志文件,
如果数据很重要,那就发送警告信息,
人工干预一下。