rabbitmq(三)事务和Confirm
2019-05-21 本文已影响0人
guideEmotion
一 消息确认机制
问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的
两种解决方式:
- AMQP实现了事务机制
- Confirm模式
事务机制
TxSelect
j用户将当前channel设置成transaction模式
TxCommit
用于提交事务
txRollback
回滚事务
生产者

缺点 因为会增加请求次数,所以会减少吞吐量
Confirm模式
生产者端Confirm模式的实现原理

confirm模式最大的好处在于他是异步
的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时
继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack
消息,生产者应用程序同样可以在回调方法中处理该nack消息;
参考:https://blog.csdn.net/hzw19920329/article/details/54340711
普通模式(串行)
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectiionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();//设置成confirm模式
String msg = "hello confirm";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println(" 发送完毕 "+msg);
if(!channel.waitForConfirms()){//单条确认一次
System.out.println("send failer");
}else{
System.out.println("send successed");
}
channel.close();
connection.close();
}
批量模式
失败的话,也是这一批都没了
String msg = "hello confirm";
for(int i=0;i<10;i++){
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println(" 发送完毕 "+msg);
}
if(!channel.waitForConfirms()){//发送多条后确认一次
System.out.println("send failer");
}else{
System.out.println("send successed");
}
从这点我们看出broker端默认情况下是不进行批量回复的,并不是针对每条消息都发送一条ack消息;
异步
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
for(int i=0;i<5;i++){
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());
confirmSet.add(nextSeqNo);
}
目前没搞明白这么写的原理