消息队列MQ

消息队列(四)kafka的Exactly Once 和 事务

2020-04-22  本文已影响0人  joshuaXin

一:Consumer的事务相关

1.Exactly Once实现:官方建议:Storing Offsets Outside Kafka

   这种办法我没有实现过,因为在我的场景里面,没有这么严格的常见;他的思路是:在Kafka外部,比如关系数据库,或者本地,利用原子性事务存储处理结果和offset,利用rebalancecallback来处理消费者再平衡后的offset初始化,简单翻译一下:

name="rebalancecallback" Storing Offsets Outside Kafka/* The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the  consumption in the same system in a way that both the results and offsets are stored atomically. This is not always  possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality./

在消费者中可以不使用kafka内嵌的storage,最基本的case是在一个可以同时存储results和offset的系统中保存。虽然不一定每个系统都有这个条件,但是有条件同时保存的,可以给出“Exactly Once”的语义;

/If the results of the consumption are being stored in a relational database, storing the offset in the database  as well can allow committing both the results and offset in a single transaction.If the results are being stored in a local store it may be possible to store the offset there as well. if a crash occurs that causes unsync'd data to be lost/

片段截取:如果results保存在关系数据库里面,这样在一个transaction里面就可以同时提交和保存;也可以保存在本地,但是当crash的时候,数据可能会丢失;

2. Consumer的隔离级别

   在0.11引入了Transaction之后,随之又引入了volatile.level、LSO (LastStableOffset),当volatile为read_committed时,只能读到LSO的;LSO≤HW≤LEO,而为read_uncommitted时,可以读到HW(HighWater),即in-sync同步的位置,LEO(Log End Offset)表示主副本最后的消息;

二:Producer的事务使用

    kafka 0.11 新增了幂等型producer和事务型producer;事务的提出并没有解决纯Consumer端的问题,解决的是Producer发送多个message下的事务,Producer的Stream流中,先consumer,再produce的场景下的事务,毕竟让kafka去实现Consumer的多个message的commit、abort有点不现实

1.Producer的幂等性

1)语义:单会话幂等性,生产者重试将不会产生重复消息;且不需要在client上有任何改动

2)条件:enable.idempotence=true, 若下面的参数未设置,将默认为:retries=Integer.MAX_VALUE,acks = all,当然如果retries有限,需要client去做一些保证;

3)max.in.flight.requests.per.connection=1,Producer在单个连接上能够发送的未响应请求的个数,为1时表示Producer的每个消息batch得到响应后,才继续发送下一个;

2.Consumer的幂等性

  这个需要业务端自己来保证,比如使用redis

if(cache.contain(msgId)){

// cache中包含msgId,已经处理过

continue;

}else {

lock.lock();

cache.put(msgId,timeout);

commitSync(); lock.unLock();

}

3.事务的使用

一个Producer同时只能生成一个Transaction,使用Producer的事务,必须设置transactional.id,设置之后,idempotence所以来的配置会自动生效,replication.factor >=3,min.insync.replicas=2;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
 producer.initTransactions();
 try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++){
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), i));
       }
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
 // 这些错误没办法恢复,只能关闭Producer
   producer.close();
 } catch (KafkaException e) {
    // 对于其他原因,关闭事务                           
 producer.abortTransaction(); 
 }
 }

三:Producer幂等的实现

1.支持的是单会话的幂等性,session级别的;

2.保证1:只支持单个producer向单个分区发送;它为每个Producer生成一个uniqueId,为每个message生成一个sequence,用来判断是否重复;

3.保证2:发送失败:需要无限次的重试,retries默认为无穷大,当然用户可以自己设置,但设置少了,可能就保证不了完全幂等了;

4.max.in.flight.requests.per.connection=1的时候,保证严格的Sequence的顺序性;

5.Producer宕机的场景:Producer肯定会重启,重启后epoch加1,宕机时发送失败的消息,应属于业务方的范畴,需要从业务角度去重试,这种情况下Producer仍保持幂等性;

6.消息发送失败的场景:会将消息添加到重试队列,进行重试,对于Sequence异常的,还会重新申请ProduceId;

四:Producer的事务实现

Produce的事务也是分布式事务的范畴,即事务参与者、支持事务的服务器、事务管理器分别位于不同的节点之上,它的操作分布在不同的服务器上,且属于不同的应用,一张kafka事务流程如下:

1. kafka 事务流程

1.Transaction Coordinator事务协调者

a) 分布式事务中一般都会有事务管理者的角色,它是用来保存事务的状态,并执行commit和cancel的动作,如2PC和TCC中都有类似的角色;

b) 在kafka中,事务协调者的作用有:1.给Producer分配produceId,2.存储__transaction_state,即向内部主题持久化Transaction的请求和处理结果;3.负责commit或者abort事务;

c) 异常处理:TC服务和内部主题都是多节点的,是有可靠性保证的;子操作超时:会进行回滚动作;

2.事务流程:

a) 查找Transaction Coordinator,类Consumer去获得offset的时候,去找Group Coordinator;

b) 获取Produce Id,这个Id在事务和幂等性上都有,为了区分session,也避免僵尸节点

c) 启动事务,事务协调者会做一些预备动作;

d) 事务开始:Produce会发送消息、pid、epoch、Sequence等,Consumer会提交offset到事务协调者,事务协调者会把offset提交到内部主题;

e) 事务最终提交和放弃

总体上来讲,分布式事务要解决的问题,还都是类似的,kafka的事务解决了

参考文章:

https://zhmin.github.io/2019/04/16/kafka-producer-idempotence/

https://zhmin.github.io/2019/05/20/kafka-transaction/

上一篇下一篇

猜你喜欢

热点阅读