rabbitmq3-剖析源码讲解简单队列
一、简单队列的模型:
对文中有些术语不懂的,或者简称的不懂的,建议读读前面的系列的文章,因为有很多的概念在里面不可能每一篇博文都去讲这个,这样博客也就会太臃肿,这一系列的博文是博主在整体的学习了一遍之后自我系统化总结的,希望能帮助初学者有一个系统化的学习依据。
image.png
很多时候我并没有想到这个模式的应用场景,的确这个模式真的是有点太简单了,但是最近通过别人所提出的问题的时候我发现在某些场景下这个模式也是必不可缺的。
首先rabbitmq不提供顺序消息,但是通过某些方式我们可以做到,例如简单队列,在不设置优先队列和延迟队列的情况是有可能实现的(注意是有可能,因为这个条件是比较严苛的,需要单线程的P和单线程的C)
总之这篇博文中我们先开个头讲讲各个方法的参数以及他们的意义。
二、具体的代码的实现:
1、先创建连接工具类
创建一个连接的工具类,在这里大家可以回忆一下jdbc的连接的过程,以及jdbc的连接池,大体的思想是一致的不过rabbitmq的客户端采用的机制不一样,这里有一篇博文分析的非常的不错提供给大家。RabbitMQ客户连接池的实现。后面博文的代码都会基于这个来写。
public class ConnUtils {
public static Connection getConn() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置rabbitmq的服务器地址
factory.setHost("192.168.0.200");
// 设置rabbitmq的用户名和密码,默认都是guest,但是在前面的[rabbitmq1-概述及其使用docker安装](https://blog.csdn.net/weixin_42849915/article/details/81977968)中有截图讲如何设置这个东西,不设置也是可以的,但是还是建议大家设并设置权限
factory.setUsername("fkxuexi");
factory.setPassword("fkxuexi");
factory.setVirtualHost("spring_cloud");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
return connection;
}
}
其中大家可以关注一下ConnectionFactory ,从下面的代码看ConnectionFactory或许是维护了一个Channel的连接池,这里博主没有深究过,因为大体的原理都是差不多的,无非就维护一个列表然后通过加锁和线程通信来实现,如果大家有兴趣可以去深究一下jdbc的连接池和这个的区别。
public static final int DEFAULT_CHANNEL_MAX = 0;
private int requestedChannelMax = DEFAULT_CHANNEL_MAX; 默认的channel的连接数,默认的为0在以后的配置中注意下
2、创建P:
这个里面有大量对应参数的注释,以及对应生产端的应答以保证可靠性的操作,同时也有部分的源码的注释分析
/**
* rabbitmq 是3.7.7的版本
*/
public class Producer {
/**
* 在这里一步小心看到一到面试题,routing_key 和 binding_key 的最大长度是多少?
* 同样也是255
*/
// 设置queue的名称,注意这个名字的长度是有限制的 if(queue.length() > 255) 是要小于255的
public static final String QUEUE_NAME = "simple_worker";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection conn = ConnUtils.getConn();
// 创建一个channel,和exchange进行打交道的一直是这个货
Channel channel = conn.createChannel();
// 开启持久化
boolean durable = true ;
// 关闭排他,如果这个地方为true的话将会开启一个独占队列,只对首次申明他的连接可以,且在连接断开时自动删除
boolean exclusive = false;
/**
* true if we are declaring an autodelete queue (server will delete it when no longer in use),
* 当这个队列长时间没有使用的话将会被删除
*/
boolean autoDelete = false;
/**
* 最后一个参数是 Map<String, Object> arguments 这个参数我们在这里先不做说明,后面说道rabbitmq的高级特性的时候会说道,
* 可以提前预告一下,可以设置哪些东西:①:优先级队列;②:延迟队列……
*
* 这里有一个疑问:我们每次重复运行这条语句,会不会把队列给覆盖了呀,这里是不会的我们看一下相关代码
* talk is cheap,show me the code -- linus Torvalds
* void recordQueue(AMQP.Queue.DeclareOk ok, RecordedQueue q) {
* this.recordedQueues.put(ok.getQueue(), q);
* }
* private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
* 这是一个加了锁的map,现在不用担心重复的申明队列了吧
*/
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,null);
/**
* rabbitmq的可靠性的一个实现方式,消息发送到达broker的可靠性的实现
* 生产者的应答 这里我们不讲事务模式,开启事务模式的话,性能将会降低250倍
* http://www.rabbitmq.com/confirms.html#publisher-confirms-ordering
*In most cases, RabbitMQ will acknowledge messages to publishers in the same order they were published
* (this applies for messages published on a single channel). However, publisher acknowledgements
* are emitted asynchronously and can confirm a single message or a group of messages. The exact moment
* when a confirm is emitted depends on the delivery mode of a message (persistent vs. transient) and
* the properties of the queue(s) the message was routed to (see above). Which is to say that different
* messages can be considered ready for acknowledgement at different times. This means that acknowledgements
* can arrive in a different order compared to their respective messages. Applications should not depend
* on the order of acknowledgements when possible.
* 上面的一段话的意思是:大多数情况下,mq将以与发布时间相同的顺序进行确认,但是这适用于单频道上发布的消息,但是
* 不同的消息可以在不同的时刻进行确认,所以应用程序不应该尽可能的依赖预确认的顺序
*/
channel.confirmSelect();// 将channel 置为confirm 模式
/**
* When in confirm mode, returns the sequence number of the next message to be published.
* 当时confirm模式的时候,我们可以拿到发送的消息的序列号
* 其实这个就是deliverTag
* if (nextPublishSeqNo > 0) {
* // 在这里维护了一个未应答的set,维护未应答消息的状态以及生成SeqNo(deliveryTay)
* unconfirmedSet.add(getNextPublishSeqNo());
* nextPublishSeqNo++;
* }
* 上面也说过了,异步的confirm,响应的顺序并不一定是严格的按照消息的投递的顺序的,同时如果消息长时间没有响应,
* 也可能是消息没有投递到,这个时候我们就可以在内存中维护一份消息id的状态表,当然这个表肯定不会太大,太大则意味着这要么你的
* 系统的mq的吞吐量不行,要么网络延迟大,系统都这样了,多了也就不提了。
*
* 当然这样会造成消息的重复消费,在我的前一篇的博客中关于可靠性的分析当中,我提到了就算这边没有消息的重复投递,在C
* 端依旧是有可能造成消息的重复的,因为存在几率C在消费了消息之后,发送ack的工程中网络中断,那么这个消息将会被重入队列
* 在requeue为true的情况下
*/
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("我倒要看看你和deliverTay到底是不是同一个家伙,nextPublishSeqNo = "+nextPublishSeqNo);
String msg = "我就一个测试消息,你想咋地";
// 这个地方我们还没有使用到路由,下面我们会一一说明五种模式中的几种路由的使用的方法
String exchange = "";
// routing key 我们直接指定为队列的名字
/**顺便提一提,这rabbitmq java客户端没有注释这是真蛋疼
* {@link com.rabbitmq.client.AMQP.BasicProperties} 具体可以查看这个里面的设置
*/
channel.basicPublish(exchange,QUEUE_NAME,null,msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
/**
* 第一个参数是在同一个channel中是唯一的,防止重复投递
* @param deliveryTag
* @param multiple 如果为true则表名小于当前的deliverTag的都被确认
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("消息已经送达到broker deliverTay:"+ deliveryTag );
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("这个在broken异常,或者无法投递消息时出现 deliverTay:"+ deliveryTag );
}
});
// 这里我们休眠5秒钟否则当channel和connection关闭后,无法接受到应答的消息
Thread.sleep(5000);
channel.close();
conn.close();
}
2、创建C:
这个里面也有对于消费者端的应答以保证消息的可靠性,同时对于消息的重复消费问题的解答
/**
* 注意这个地方不能如果叫Consumer的话,会和com.rabbitmq.client.Consumer冲突,所以这个地方我们叫consume
*/
public class Consume {
/**
* 在这里一步小心看到一到面试题,routing_key 和 binding_key 的最大长度是多少?
* 同样也是255
*/
// 设置queue的名称,注意这个名字的长度是有限制的 if(queue.length() > 255) 是要小于255的
public static final String QUEUE_NAME = "simple_worker";
public static void main(String[] args) throws IOException, TimeoutException {
// 这个里面我就不写那么多的注释了
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* 即处理投递过来的消息的
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 这个是和我们在productor中获取的是一样的
long deliveryTag = envelope.getDeliveryTag();
/**
* TODO 假装这个地方是我们的逻辑处理 ,
*
* doSomething();
*/
String msg = new String(body,"utf-8");
System.out.println("我是消息,这个地方我们获取到了消息并不代表,我们进行了应答,消息为:"+msg);
/**
* 这里我们分别测试 成功应答和拒绝应答,我们通过rabbitmq的web控制台来查看
*
* 注意:rabbitmq的消费端的可靠性的保证,1、当consumer挂掉了(channel断掉)则将消息重回队列并投递给其他的消费者
* 但是这里面是有可能造成重复的消费的,
* 假若我们考虑这种场景:当consumer1在处理完逻辑之后,发送应答由于网络中断,这个应答并没有到达broker那么channel
* 断开,所以这个时候消息会重回队列,会被投递给其他的消费者进行消费,所以这个时候的机制我们仍然可以维护一份消息id的
* 状态表,同理这个表依旧不可能太大
*/
/**
* multiple 如果为true的话,则代表只要deliverTag小于当前的一律都被确认,deliveryTay是在同一个channel是主键递增的
* 如果为false的话,那么则代表只确定当前的
*/
boolean multiple = false;
// 1、成功应答
channel.basicAck(deliveryTag,false);
}
};
/**
* 这个地方我们关闭自动应答,自动应答模式:如果消息投递到了,不管你consumer 处理是否完成,则broker任务消息已经被消费了,然后
* 就会删除消息,所以这里我们开启手动应答,这又这样我们在handleDelivery中的应答才能生效
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
三、指标观察:
1、运行生产者是broker的状态
image.png我们可以看到,nextPublishSeqNo和deliveryTay是同一个东西
image.png
Features 中的D:则代表durable持久化的意思,Ready则是代表消息等待表消费,Unacked则表明有多少条消息是已经被投递,但是没有应答的,后面我们在应答哪里做休眠来测试。
2、运行消费者进行应答:
为了过程更直观,在上面的程序中加入下列语句:
// 1、成功应答
try {
Thread.sleep(60000);//休眠一分钟,来观察控制台的指标有什么不同
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag,false);
image.png
image.png
image.png
从上图可以看到消息已经被消费。其中经历了从 Ready -- Unacked 到最中的被消费然后broker完全的删除掉
3、下面查看消费者进行否定应答
依旧需要修改下列的代码
// 1、成功应答
try {
Thread.sleep(60000);//休眠一分钟,来观察控制台的指标有什么不同
} catch (InterruptedException e) {
e.printStackTrace();
}
// 这个参数可以使得,被拒绝的消息重回队列
boolean requeue = true;
channel.basicNack(deliveryTag,false,requeue);
image.png
这里不管你等多久,一直会保持这个状态,原因是只有一个消费者,那么当消息被拒绝后立即又会被投递给这个消费者,一直会持久Unacked的状态,从下面的打印情况可以佐证我们上面的说法。
image.png
正常的状态下应该回归到Ready的状态的,现在我们关闭消费者,查看状态,当我们关闭消费者后,这个消息就重回队列了。当然我们也可以设置requeue为false进行测试,这个就有大家自行测试了。
image.png
ps:如果控制台有些不太会用的话,可以查看一下我的第一篇博文rabbitmq1-概述及其使用docker安装里面有一点点的小介绍,后面会专门写一关于控制台的使用的博文。