老男孩的成长之路Java成长之路

springboot 集成rabbitmq 并采用ack模式 以

2020-06-17  本文已影响0人  路人甲java

rabbitmq 是spring所在公司Pivotal自己的产品 是基于AMQP高级队列协议的消息中间件 采用erlang开发 因此安装需要erlang环境 具体安装根据自己的环境 因为跟spring有共同的血缘关系 所以spring 全家桶对其的支持应该是相当完善的

一般消息队列 都是生产者将消息发送到队列 消费者监听队列进行消费 rabbitmq 一个虚拟主机(默认 /)持有一个或者多个交换机(Exchange) 用户只能在虚拟主机的粒度进行权限控制 交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上 这样生产者和队列就没有直接联系 而是将消息发送的交换机 交换机再把消息转发到对应绑定的队列上 此处需要详细熟悉rabbitmq的工作流程 不清楚可以找相关资料进行学习

上面说了 Exchange 作为rabbitmq的一个独特的重要的概念 这里有必要着重强调一下 我们从 spring对rabbitmq的封装来解读一下这个东西

package org.springframework.amqp.core;

/**
 * Constants for the standard Exchange type names.
 *
 * @author Mark Fisher
 * @author Gary Russell
 */
public abstract class ExchangeTypes {

    public static final String DIRECT = "direct";

    public static final String TOPIC = "topic";

    public static final String FANOUT = "fanout";

    public static final String HEADERS = "headers";

    public static final String SYSTEM = "system";

    /**
     * The constant to represent {@code x-delayed-message} exchange mode.
     * @deprecated since 1.6.4, it's not a user-available exchange type,
     * the delayed {@code boolean} is used for that.
     */
    @Deprecated
    public static final String DELAYED = "x-delayed-message";
}

上面是 交换机类型的定义类 说明了6种交换机类型 最后一种因为即将弃用 所以是五种 我们常用的有四种 下面这个建造类说明了一切

package org.springframework.amqp.core;

import java.util.Map;

/**
 * Builder providing a fluent API for building {@link Exchange}s.
 *
 * @author Gary Russell
 * @since 1.6
 *
 */
public final class ExchangeBuilder extends AbstractBuilder {

    private final String name;

    private final String type;

    private boolean durable;

    private boolean autoDelete;

    private boolean internal;

    private boolean delayed;

    /**
     * Construct an instance of the appropriate type.
     * @param name the exchange name
     * @param type the type name
     * @see ExchangeTypes
     * @since 1.6.7
     */
    public ExchangeBuilder(String name, String type) {
        this.name = name;
        this.type = type;
    }

    /**
     * Return a {@link DirectExchange} builder.
     * @param name the name.
     * @return the builder.
     */
    public static ExchangeBuilder directExchange(String name) {
        return new ExchangeBuilder(name, ExchangeTypes.DIRECT);
    }

    /**
     * Return a {@link TopicExchange} builder.
     * @param name the name.
     * @return the builder.
     */
    public static ExchangeBuilder topicExchange(String name) {
        return new ExchangeBuilder(name, ExchangeTypes.TOPIC);
    }

    /**
     * Return a {@link FanoutExchange} builder.
     * @param name the name.
     * @return the builder.
     */
    public static ExchangeBuilder fanoutExchange(String name) {
        return new ExchangeBuilder(name, ExchangeTypes.FANOUT);
    }

    /**
     * Return a {@link HeadersExchange} builder.
     * @param name the name.
     * @return the builder.
     */
    public static ExchangeBuilder headersExchange(String name) {
        return new ExchangeBuilder(name, ExchangeTypes.HEADERS);
    }

    /**
     * Set the auto delete flag.
     * @return the builder.
     */
    public ExchangeBuilder autoDelete() {
        this.autoDelete = true;
        return this;
    }

    /**
     * Set the durable flag to true.
     * @return the builder.
     * @deprecated - in 2.0, durable will be true by default
     * @see #durable(boolean)
     */
    @Deprecated
    public ExchangeBuilder durable() {
        this.durable = true;
        return this;
    }

    /**
     * Set the durable flag.
     * @param durable the durable flag (default false).
     * @return the builder.
     * @see #durable
     */
    public ExchangeBuilder durable(boolean durable) {
        this.durable = durable;
        return this;
    }

    /**
     * Add an argument.
     * @param key the argument key.
     * @param value the argument value.
     * @return the builder.
     */
    public ExchangeBuilder withArgument(String key, Object value) {
        getOrCreateArguments().put(key, value);
        return this;
    }

    /**
     * Add the arguments.
     * @param arguments the arguments map.
     * @return the builder.
     */
    public ExchangeBuilder withArguments(Map<String, Object> arguments) {
        this.getOrCreateArguments().putAll(arguments);
        return this;
    }

    /**
     * Set the internal flag.
     * @return the builder.
     */
    public ExchangeBuilder internal() {
        this.internal = true;
        return this;
    }

    /**
     * Set the delayed flag.
     * @return the builder.
     */
    public ExchangeBuilder delayed() {
        this.delayed = true;
        return this;
    }

    public Exchange build() {
        AbstractExchange exchange;
        if (ExchangeTypes.DIRECT.equals(this.type)) {
            exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments());
        }
        else if (ExchangeTypes.TOPIC.equals(this.type)) {
            exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments());
        }
        else if (ExchangeTypes.FANOUT.equals(this.type)) {
            exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments());
        }
        else if (ExchangeTypes.HEADERS.equals(this.type)) {
            exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments());
        }
        else {
            throw new IllegalStateException("Invalid type: " + this.type);
        }
        exchange.setInternal(this.internal);
        exchange.setDelayed(this.delayed);
        return exchange;
    }

}
image

这四种的说明

  1. Direct: 先策略匹配到对应绑定的队列后 才会被投送到该队列 交换机跟队列必须是精确的对应关系 这种最为简单
  2. Topic: 转发消息主要是根据通配符 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版
  3. Headers:也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers 则是一个自定义匹配规则的类型
    在队列与交换器绑定时 会设定一组键值对规则 消息中也包括一组键值对( headers 属性) 当这些键值对有一对 或全部匹配时 消息被投送到对应队列
  4. Fanout : 消息广播模式 不管路由键或者是路由模式 *会把消息发给绑定给它的全部队列 *如果配置了routingkey会被忽略

在熟悉了相关概念后我们开始搞一搞这个东西 首先你要安装好rabbitmq 相关方法资料很多 此处不表 在本机安装好 并启用了管理页面后打开 localhost:15672 会显示一个管理页面 如下 可以进行一些可视化操作

image

新建springboot工程 springboot 版本 1.5.10 依赖如下

    <dependencies>
        <!--amqp rabbitmq 依赖必须 必须-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--springboot单元测试 选-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--springboot健康监控 选-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--web支持  选-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

application.yml 配置文件 rabbitmq 相关:

spring:
  rabbitmq:
    username: rabbitAdmin
    password: 123456789
#    支持发布确认
    publisher-confirms: true
#    支持发布返回
    publisher-returns: true
    listener:
      simple:
#      采用手动应答
        acknowledge-mode: manual
#        当前监听容器数
        concurrency: 1
#        最大数
        max-concurrency: 1
#        是否支持重试
        retry:
          enabled: true
#        日志配置 
logging:
  config: classpath:logback.xml

定制模版类 声明交换机 队列 绑定交换机到队列

这里 声明了Direct 交换机 并通过路由键绑定到一个队列中 来测试Direct模式

    声明了Fanout交换机  并绑定到2个队列    来测试广播模式
package cn.felord.message.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 队列配置.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 14:28
 */
@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 定制化amqp模版      可根据需要定制多个
     * 
     * 
     * 此处为模版类定义 Jackson消息转换器
     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
     * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
     *
     * @return the amqp template
     */
//    @Primary
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
//          使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
//        开启returncallback     yml 需要 配置    publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        //        消息确认  yml 需要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }

    /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */

    /**
     * 声明Direct交换机 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个队列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 通过绑定键 将指定队列绑定到一个指定的交换机 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }

    /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */

    /**
     * 声明 fanout 交换机.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * Fanout queue A.
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * Fanout queue B .
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 绑定队列A 到Fanout 交换机.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 绑定队列B 到Fanout 交换机.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

编写监听器 来监听队列消息

package cn.felord.message.comsumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听器.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/24 9:36
 */
@Component
public class Receiver {
    private static final Logger log= LoggerFactory.getLogger(Receiver.class);
    /**
     * FANOUT广播队列监听一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A "+new String(message.getBody()));
    }

    /**
     * FANOUT广播队列监听二.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception   这里异常需要处理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B "+new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT "+new String (message.getBody()));
    }
}

编写 发送消息接口 来进行测试

package cn.felord.message.controller;

import cn.felord.message.bean.ResponseEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 消息接口.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 17:27
 */
@RestController
@RequestMapping("/rabbit")
public class SendController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     *  测试广播模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/fanout")
    public ResponseEntity send(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        return ResponseEntity.ok();
    }

    /**
     *  测试Direct模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/direct")
    public ResponseEntity direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
        return ResponseEntity.ok();
    }
}

测试广播模式

image

控制台输出

image

同样 自己可以测试Direct模式 可以打开rabbitmq控制台进行追踪 相关运行信息

上一篇下一篇

猜你喜欢

热点阅读