使用Spring AMQP

2017-07-26  本文已影响119人  嗷大彬彬

本文使用spring boot作为框架,使用spring framework也可以参考。

前提

安装部署并启动rabbitmq,参见 http://www.jianshu.com/p/9a32dca0c6aa

配置连接

constant.properties:

spring.rabbitmq.addresses=192.168.253.133:5672,192.168.253.134:5672
spring.rabbitmq.username=rabbitmq
spring.rabbitmq.password=rabbitmq
# 每个连接缓存channel的数量
spring.rabbitmq.cachesize=25

配置maven依赖

pom.xml:

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

<dependencyManagement>
    <dependencies>
    </dependencies>
</dependencyManagement>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
</parent>

<dependencies>
    <!-- 引入spring amqp -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>

    <!-- 引入spring boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- 引入spring boot单元测试 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency> 
    
</dependencies>

配置

RabbitConfig .java:

@Configuration
public class RabbitConfig {

    @Autowired
    private Environment env;

    /**
     * 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(env.getProperty("spring.rabbitmq.addresses"));
        connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
        connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
        // connectionFactory.setCacheMode(CacheMode.CHANNEL);//默认是CHANNEL
        connectionFactory.setChannelCacheSize(
                Integer.parseInt(env.getProperty("spring.rabbitmq.cachesize")));
        // 开启发布者确认
        // connectionFactory.setPublisherConfirms(true);
        // 开启发布者返回
        // connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    /**
     * 自定义管理类,负责管理声明队列、交换机等管理功能,spring amqp默认开启自动声明
     */
    @Bean
    public RabbitAdmin rabbitAdmin() {

        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    /**
     * 模板类,主要负责发送接收
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    /**
     * direct类型交换机,只有精确匹配routing key的队列才能收到消息,且消息按一定规则分配,即一条消息只会被其中一个队列接收到
     */
    @Bean
    public Exchange directExchange() {

        return ExchangeBuilder.directExchange("shark.direct").durable(true).build();
    }

    /**
     * fanout类型交换机,类似于广播,所有绑定的队列均收到所有消息
     */
    @Bean
    public Exchange fanoutExchange() {

        return ExchangeBuilder.fanoutExchange("shark.fanout").durable(true).build();
    }

    /**
     * topic类型交换机,与direct类型相似,不过topic类型支持模糊匹配。# *
     */
    @Bean
    public Exchange topicExchange() {

        return ExchangeBuilder.topicExchange("shark.topic").durable(true).build();
    }

    /**
     * 自定义队列1
     */
    @Bean
    public Queue infoQueue() {

        return QueueBuilder.durable("shark.info").build();
    }

    /**
     * 自定义队列2
     */
    @Bean
    public Queue errorQueue() {

        return QueueBuilder.durable("shark.error").build();
    }

    /**
     * 绑定队列1到fanout交换机
     */
    @Bean
    public Binding bindingFanoutAnony() {

        return BindingBuilder.bind(errorQueue()).to(fanoutExchange()).with("").noargs();
    }

    /**
     * 绑定队列2到fanout交换机
     */
    @Bean
    public Binding bindingFanoutAnony2() {

        return BindingBuilder.bind(infoQueue()).to(fanoutExchange()).with("").noargs();
    }

    /**
     * 绑定自定义队列1到direct交换机
     * 
     * @Primary 设置为默认的bean,注入时@Autowired 或未指定名称的@Resource将使用默认的bean
     */
    @Primary
    @Bean
    public Binding bindingDirect1() {

        return BindingBuilder.bind(infoQueue()).to(directExchange()).with("shark.info").noargs();
    }

    /**
     * 绑定自定义队列2到direct交换机
     */
    @Bean
    public Binding bindingDirect2() {

        return BindingBuilder.bind(errorQueue()).to(directExchange()).with("shark.error").noargs();
    }

    /**
     * 绑定自定义队列1到topic交换机
     */
    @Bean
    public Binding bindingTopic1() {

        return BindingBuilder.bind(errorQueue()).to(topicExchange()).with("shark.*").noargs();
    }

    /**
     * 绑定自定义队列2到topic交换机
     */
    @Bean
    public Binding bindingTopic2() {

        return BindingBuilder.bind(infoQueue()).to(topicExchange()).with("shark.*").noargs();
    }

}

启动Spring boot应用

Application.java

@SpringBootApplication(exclude = RabbitAutoConfiguration.class) //排除spring boot自动注入
@EnableAspectJAutoProxy(proxyTargetClass = true) // 激活Aspect自动代理
@PropertySource({ "classpath:/cn/com/ut/config/properties/constant.properties" })
public class Application {
    public static void main(String[] args) {

        SpringApplication application = new SpringApplication(Application.class);
        application.setBannerMode(Banner.Mode.LOG);
        application.run(args);
    }
}

测试

TestSendController.java

@RestController
@RequestMapping(value = "/test")
public class TestSendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource(name = "fanoutExchange")
    private Exchange fanoutExchange;

    @Resource(name = "bindingDirect1")
    private Binding bindingDirect1;

    @Resource(name = "bindingDirect2")
    private Binding bindingDirect2;

    @Resource(name = "topicExchange")
    private Exchange topicExchange;

    /**
     * 广播消息到多个队列,所有队列均接收全部消息
     */
    @GetMapping("/sendFanout")
    public String sendFanout(@RequestParam(value = "msg", required = false) final String msg) {

        new Thread(new Runnable() {
            @Override
            public void run() {

                for (int i = 0; i < 10; i++) {
                    rabbitTemplate.convertAndSend(fanoutExchange.getName(),
                            "msg" + i + " : " + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        return "success send 10 msg to fanout Exchange!";
    }

    /**
     * 均衡分配消息多个队列,队列收到的消息预其他队列不一样(精确匹配)
     */
    @GetMapping("/sendDirect")
    public String sendDirect(@RequestParam(value = "msg", required = false) final String msg) {

        // rabbitTemplate.convertAndSend(bindingDirect1.getExchange(),
        // bindingDirect1.getRoutingKey(),
        // "msg-1:" + msg);
        // rabbitTemplate.convertAndSend(bindingDirect2.getExchange(),
        // bindingDirect2.getRoutingKey(),
        // "msg-2:" + msg);

        new Thread(new Runnable() {
            @Override
            public void run() {

                for (int i = 0; i < 10; i++) {
                    rabbitTemplate.convertAndSend(bindingDirect1.getExchange(), "shark.info",
                            "msg" + i + " : " + msg);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        return "success send 10 msg to topic Exchange!";
    }

    /**
     * 均衡分配消息多个队列,队列收到的消息预其他队列不一样(模糊匹配)
     */
    @GetMapping("/sendTopic")
    public String sendTopic(@RequestParam(value = "msg", required = false) final String msg) {

        new Thread(new Runnable() {
            @Override
            public void run() {

                for (int i = 0; i < 10; i++) {
                    rabbitTemplate.convertAndSend(topicExchange.getName(), "shark.info",
                            "msg" + i + " : " + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        return "success send 10 msg to topic Exchange!";
    }

    /******************** 接收(同步) ****************** */
    @GetMapping("/receive")
    public String receive(@RequestParam(value = "queue") String queueName) {

        String msg = " 收到來自队列 " + queueName + "的消息:"
                + (String) rabbitTemplate.receiveAndConvert(queueName);
        return msg;
    }
}

监听器接收

  1. RabbitConfig .java 添加@EnableRabbit注解和SimpleRabbitListenerContainerFactory 的bean
@Configuration
@EnableRabbit
public class RabbitConfig {
        /**
     * 消息接收监听器容器工厂,需要开启@EnableRabbit注解
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  1. TestSendController.java 添加监听器
/******************** 接收监听器****************** */
@RabbitListener(queues = "#{infoQueue.name}")
public void receive3(String reciveMsg) {

    System.out.println("receive1(订阅了infoQueue) 收到信息:" + reciveMsg);
}

@RabbitListener(queues = "#{errorQueue.name}")
public void receive4(String reciveMsg) {

    System.out.println("receive2(订阅了errorQueue) 收到信息:" + reciveMsg);
}

@RabbitListener(queues = "#{infoQueue.name}")
public void receive5(String reciveMsg) {

    System.out.println("receive3(订阅了infoQueue) 收到信息:" + reciveMsg);
}
上一篇下一篇

猜你喜欢

热点阅读