3.kafka生产者

2022-12-20  本文已影响0人  呦丶耍脾气

一、原理

kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,因为Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。


分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池(当通过分区器创建数据后,申请内存,发送到集群后再释放),避免频繁的申请和释放内存。
因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小16K

batch.size:只有数据累积到batch.size之后,sender才会发送数据。默认16K
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送信息。单位ms,默认值是0ms(即默认到了就发送,不等待到达batch.size阈值)
生产环境中上面两个参数都需要调整

0:生产者发送过来的数据,不需要等待数据落盘应答
1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓
-1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。
(1) Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2)
(2) 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如如果2超时,(leader:0,ISR:0,1)
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景

二、Kafka事务原理

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了数据不重复
重复数据的评判标准:具有<PID,Partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务
使用幂等性:开启参数enable.idempotence默认为true,即默认为开启

事务并不是直接存储在磁盘中,而是存储在一个特殊的topic的分区中。


三、数据乱序

生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序


max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。

开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。

上一篇下一篇

猜你喜欢

热点阅读