第四章-高级整合应用- AMQP

2019-04-06  本文已影响0人  yanghx

1.RabbitMQ整合Spring AMQP

image.png

注意

image.png
image.png

消息模板 RabbitTemplate

RabbitTemplate 即消息模板

简单消息监听容器:SimpleMessageListenerContainer

注意:

SimpleMessageListenerContainer为什么可以动态感知配置变更?

配置代码


    /**
     * 简单消息监听容器
     * 配置完成后。可以在管控台看到消息者信息。 以及消费者标签信息
     *
     * @param connectionFactory 链接工厂
     * @return SimpleMessageListenerContainer
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //设置要监听的队列
        simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //初始化消费者数量
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        //最大消费者数量
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        //设置是否重回队列[一般为false]
        simpleMessageListenerContainer.setDefaultRequeueRejected(false);
        //设置自动ack
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置channel 是否外露
         simpleMessageListenerContainer.setExposeListenerChannel(true);
        //设置消费端标签的策略
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queueName) {
                return queueName + "_" + UUID.randomUUID().toString();
            }
        });
        //设置消息监听 ChannelAwareMessageListener
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("----------消费者: " + msg);
            }
        });

        return simpleMessageListenerContainer;


    }

连接信息

image.png

消费者信息和相关配置

image.png

消息监听适配器:MessageListenerAdapter


/**
 * 通过`simpleMessageListenerContainer` 配置消息监听适配器。 指向这个类
 *
 * @author yangHX
 * createTime  2019/4/6 12:16
 */
public class MessageDelegate {


    /**
     * MessageListenerAdapter 默认指定接收消息的方法的名字就是 handleMessage .当然也可以手动设置
     *
     * @param messageBody message信息
     */
    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法,消息内容: " + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }

}





/**
 * spring amqp 消息转换器
 *
 * @author yangHX
 * createTime  2019/4/6 12:28
 */
public class TextMessageConverter implements MessageConverter {


    /**
     * 将数据转化为 message 类
     *
     * @param o                 要发送的数据
     * @param messageProperties 消息头
     * @return Message
     * @throws MessageConversionException ex
     */
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(o.toString().getBytes(), messageProperties);
    }

    /**
     * 将message转换为想要的数据类型
     *
     * @param message message
     * @return Object
     * @throws MessageConversionException ex
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {

        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}






        ///消息监听适配器 只截取了一小段
        /*
         * 适配器方式。 默认是有自己的方法名字。 handleMessage
         *  可以自己指定一个方法的名称。 consumerMessage
         *  也可以添加一个转换器: 从字节数组转换为String
         */
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
        messageListenerAdapter.setMessageConverter(new TextMessageConverter());
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);


        return simpleMessageListenerContainer;





 /**
     * 发送消息。测试转换器和适配器
     * <p>
     * 转换器判断contentType 将字节数组转化为字符串
     * 适配器将数据交给 MessageDelegate 的 consumeMessage 方法进行处理
     */
    @Test
    public void testMessage4Text() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plan");
        Message message = new Message("mq 消息1234".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.abc", message);
        rabbitTemplate.send("topic002", "rabbit.abc", message);
    }



image.png

MessageListenerAdapter 消息监听适配器总结

MessageConverter 消息转换器

转换器类型

SpringBoot 整合RabbitMQ


    /**
     * 回调函数 confirm确认模式
     */
    final ConfirmCallback confirmCallback = new ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if (!ack) {
                System.out.println("-----异常处理");
            }
        }
    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange : " + exchange + " , routingKey : " + routingKey + " , replyCode: " + replyCode + " , replyText: " + replyText);
        }
    };




SpringBoot 整合RabbitMQ 消费端

@RabbitListener注解使用

image.png

SpringCloud Stream 整合

image.png image.png image.png

-这个原因是因为SpringCloudStream框架为了和kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的。 这点就是当前版本的Spring Cloud Stream 的定位

image.png
上一篇下一篇

猜你喜欢

热点阅读