KafkaProducer(1) 注释说明
-
客户端将 records 发送到 Kafka 集群中
-
producer 是线程安全的 , 可以在多线程中共享 producer.
-
producer 持有一个缓冲池( bufferPool ) , 需要发送的 records , 还未发送到集群时 , 保存在缓冲池中 . 会有一个后台线程 ( sender ) 将这些 records 发送到 Kafka 集群.
-
send() 方法是异步的 . 调用 send() 方法后 , 将 record 添加到缓冲池中 , 并立即返回结果 ( 结果中带有 future 对象) . 因此 producer 可以将一批 record 添加到 bufferPool 中 , 提高发送效率 .
-
acks
配置用来控制请求是否完整 . 当设置为all
时 , 提交将会被完全阻塞 . 是最慢的 , 等待时间最长的设置. -
如果请求失败了 , producer 会自动重试 .
retries
配置为0
时 , 则不会重试 . 如果启用了重试 , 那么就会有重复发送 record 的可能 . -
producer 通过缓冲池管理着每个 partition 上未发送的 records . 这个缓冲池的大小 , 通过
batch.size
配置指定 . 设置比较大的值 , 一次可以发送更多的数据 , 同样的 , 也需要更多的内存 . -
默认情况下 , 当缓冲区 ( buffer ) 中有 record 要发送时 , 会立即发送 . 就算 buffer 还没填满 , 也会发送 . 当然也可以通过
linger.ms
配置 , 当配置大于 0
时 , producer 会等待指定的时间 , 这样 , 会有更多的 record 填充到相同的 buffer 中 . 类似于 TCP 协议中的 Nagle (纳格尔) 算法. 等待同样会带来发送的延迟 . 需要注意的是 , 当多个 record 同时达到时 , 此时就算linger.ms=0
, 也会汇集多个 record 后再发送 . 因此 , 在高负载下 , 无论linger.ms
是否配置 , 都会是批量发送 . 但是不在高负载的场景下 , 设置linger.ms > 0
, 会减少请求的次数 , 请求更有效 . -
buffer.memory
配置的是 producer 总共可用的缓冲池的大小 . 如果 写缓存 ( send()方法 ) 的速度过快 , 大于发送数据的速度 , 那么缓冲池最终会被耗尽 , 后续的send() 请求都将被阻塞 . 这个阻塞会持续max.block.ms
的时长 , 超时还没写到缓存里 , 会抛出TimeoutException
. -
key.serializer
和value.serializer
决定了怎样将 ProducerRecord 对象转化为 bytes (字节) . -
从 Kafka 0.11 版本开始 , KafkaProducer 将支持两种模式 : 幂等的生产者 ( idempotent producer ) 和 事务的生产者( transactional producer ) . 幂等生产者将Kafka的语义从至少一次增强到恰好一次 . 特别是 producer 在重试是 , 也不会产生重复数据 . 事务生产者原子性的发送多个 partitions ( 甚至多个 topics ) .
-
要启用幂等 , 设置配置
enable.idempotence = true
, 这么设置后 ,retries
的重试次数默认为Integer.MAX_VALUE
, 即为无限次 .acks
也默认为all
. 其他没有 API 使用上的变化 , 当前已经有应用在不修改代码的情况下 , 也可以直接使用 幂等生产者 . -
使用幂等的话 , 一定要避免应用程序级别的重复发送 , 因为这些重复发送不能消除 . ( kafka 只能在同一个 partition 上实现顺序和幂等 , 多个服务器上发送相同的数据 , 可能会在不同的 partition 上 . 那么不同的 partition 上就没办法保证幂等 , 且重复数据没办法识别和删除 ) . 已经启用了幂等的话 , 建议不要修改
retries
配置 , 默认就使用无限次 ( Integer.MAX_VALUE ) . 如果 send() 方法返回 error , 甚至重试无数次后还是 error . 建议停掉 producer , 检查下最后一次重试的内容 , 确保不是重复内容 ( not duplicated ) . 最后 , 幂等只能保证在同一个 session 里保证幂等 . 重新建立的 kafka session 就不行了. -
要使用事务生产者及其相关的一些 API , 必须要设置
transactional.id
. 启用了事务 , 那么幂等会自动启用 , 因为事务要依赖于幂等 . 已经设置了transactional.id
, 那么事务中的 topics 需要设置为持久化 . 还有 ,replication.factor
要至少设置为3
,min.insync.replicas
对于事务中的 topics 要设置为2
, 最后 , 为了确保端到端的事务一致性 , 消费者 ( consumers ) 必须要配置为只读取已经事务提交的 message . -
设置
transactional.id
的目的是为了 在一个 producer 实例中跨多人 session 的情况下回滚事务的场景 . 对于 partition 上的每个 producer 实例 ,transactional.id
应该是唯一的 . -
新的事务操作相关 API 都是阻塞式的 , 并且发生失败时都会抛出异常 .
//事务发送的简单使用示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
-
每个 producer 实例 , 每次只能开启一个事务 , 在
beginTransaction()
和commitTransaction()
中间发送的所有消息 , 会在同一个事务中 . -
事务生产者是使用异常来传递错误的状态 , 特别的 , 不能在
producer.send()
方法里使用callback
变量 , 也不能在 返回的 future 中调用future.get()
. 事务在执行过程中产生的任务错误, 都会抛出KafkaException
异常 . -
在收到
KafkaException
异常后 , 可以调用producer.abortTransaction()
来结束并回滚事务 . 来保证所有已经执行的回滚掉 . -
从 0.10.0 或更高版本开始 , kafkaProducer 可以和 brokers 进行连接通信 . 要使用 事务特性 , broker 的版本至少要是 0.11.0 或更高 . 否则将返回一个
UnsupportedVersionException
.
总结
- 在注释中着重提到了几个配置:
-
acks
: 控制请求的结果 -
retries
: 重试的次数 , 不启用幂等的话 , 可能会有重复数据 -
batch.size
: 指定每个缓存 buffer 的大小 . 默认是 16 K -
linger.ms
: 批量执行的等待时长 , 默认0 , 即不等待 . -
buffer.memory
: 缓存池 (bufferPool) 的最大可用内存 . 默认 32M -
max.block.ms
: 在缓存不够用时 , 等待释放缓存的最大等待时长 . -
enable.idempotence
: 启用幂等 , 使 Kafka 从至少一次 , 增强到 恰好一次 . -
transactional.id
: 启用事务 , 指定事务实例的ID
-
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的