RabbitMQ急速入门-笔记3-web15672控制台Exch

2019-07-06  本文已影响0人  牵手生活

创建Queue队列(QUEUE_ORDER)

创建Order队列 创建Order队列后

创建Exchange(EXCHANGE_ORDER)

创建order交换机 创建order交换机之后

交互性类型介绍


交互性类型

队列详情介绍

image.png

手动绑定(只有绑定后才可以发送消息)

可以在exchange交换机界面绑定 也可以在queue队列界面绑定

order.wxid_on8oksh88zo22
模糊匹配单个词(可以支持一个点)
order.

模拟匹配多个词。可以支持多个点如(order.wxid_on8oksh88zo22.abc123)

order.#

手动绑定

绑定后可以看绑定关系-路由规则


绑定后

queue中看到的绑定关系

queue中看到的绑定关系

消息的生产者MqMsgProducer


/**
 * 消息的生产者
 */
@Component
public class MqMsgProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;
    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public MqMsgProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        //消息ID
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        logger.info("rabbitMq 生产者消息  内容content = {} CorrelationData= {}", content,correlationId);
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_TASK, RabbitMqConfig.ROUTINGKEY_TASK, content, correlationId);
    }

    /**
     * 根据消息对象发送消息
     * @param order
     */
    public void sendOrder(Order order) {
        //消息ID
        CorrelationData correlationId = new CorrelationData(order.getId()); //
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        logger.info("rabbitMq 生产者消息  内容Order = {} CorrelationData= {}", order,correlationId);
        rabbitTemplate.convertAndSend(
                RabbitMqConfig.EXCHANGE_ORDER
                , RabbitMqConfig.ROUTINGKEY_ORDER   //路由key
                , order  //消息体内容  --rabbbitmq会自动帮我们做系列化
                , correlationId);
    }



    /**
     * 回调--应该是发送确认--callback是异步的--确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
     * RabbitMQ(四)消息确认(发送确认,接收确认)
     * https://blog.csdn.net/qq315737546/article/details/54176560
     * RabbitMQ:消息发送确认 与 消息接收确认(ACK)
     * https://www.jianshu.com/p/2c5eebfd0e95
     * @param correlationData  消息唯一标志
     * @param ack 如果消息没有到exchange,则confirm回调,ack=false 如果消息到达exchange,则confirm回调,ack=true
     * @param cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData +"cause = {}",cause);
        if (ack) {
            logger.info("收到回调,成功发送到broker");
        } else {
            logger.info("收到回调,失败发送到broker:" + cause);
        }
    }
}

运行单元测试sendOrderTest

    @Test
    public void sendOrderTest() throws InterruptedException {

        Order order = new Order();
        order.setId("20190706_000001");
        order.setName("牵手订单00001");
        order.setMessgeId(System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());  //保障是唯一的

        mqMsgProducer.sendOrder(order);


    }
image.png

通过控制台查看

看到消息,点击进去看看


看到消息

get message 获取消息--是base64位编码


image.png

把base64的消息内容通过解密

image.png

删除绑定

image.png

删除队列&清空队列

image.png

创建消费者MqMsgReceiverOrder

@RabbitListener注解可以自动帮我们创建队列、路由、

@Component
public class MqMsgReceiverOrder {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());



    /**
     *
     * @param order 其中@Payload表示消息体的类似是Order
     * @param headers
     * @param channel 消费者手动签收时,依赖消息通道Channel
     * @throws Exception
     */

    //@RabbitListener注解可以自动帮我们创建队列、路由、
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitMqConfig.QUEUE_TASK,durable = "true") //第二2参数是是否持久化
            ,exchange = @Exchange(name = RabbitMqConfig.EXCHANGE_ORDER,durable = "true",type = "topic")
            ,key = RabbitMqConfig.ROUTINGKEY_ORDER
            )
    )


    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               @Headers Map<String,Object> headers,
                               Channel channel) throws Exception{

        logger.info("收到消费消息,模拟业务操作。订单id={}",order.getId());
        headers.get(AmqpHeaders.DELIVERY_TAG);


        //处理手动必须
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);//第2个参数是否支持批量接收

        channel.basicAck(deliveryTag,false);  //第二2参数时手动签收



    }


}

image.png
上一篇下一篇

猜你喜欢

热点阅读