Java 杂谈计算机杂谈

2019-08-26/SpringBoot 之RabbitMQ

2019-08-26  本文已影响0人  呼噜噜睡

rabbitmq似乎用的挺多的,在消息可靠性要求比较高的项目上,首选它。为什么是它,我也不太清楚,如果有知道的小伙伴,要告诉我哟~
好,进入正题,首选安装rabbitmq,这就能够折腾你半天的。一般有三种:
一、linux或者linux虚拟机上安装rabbitmq,那么首先你要安装erlang。对于下载,安装,配置,让你怀疑人生。具体怎么怀疑人生,百度吧。
二、使用docker安装。windows7 使用docker toolbox 安装docker,如果你能顺利安装docker,后面畅通无阻。难就难在顺利的安装docker。如果你是win10 旗舰版,那么在docker官网下载docker安装包,妥妥的搞定,爽哉。如果你是win10家庭版,对不起,我没有听清,请再说一遍。
三、使用windows版的rabbitmq。具体参考https://www.cnblogs.com/ericli-ericli/p/5902270.html

好的,经过愉快的安装之旅,我们假设你已经成功安装了RabbitMQ了。接下来搭建一个springboot项目,不懂的同学可以上spring官网。引入rabbitmq pom依赖,完整pom如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wjp</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties的配置:

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=wjp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#producer config
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
#consumer config
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=1

SpringBoot提供了一个现成的RabbitTemplate bean供我们使用,,发送方代码:

package com.example.demo.amqp;

import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.IdGenerator;


@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //System.err.println("生产端---correlationData: " + correlationData);
            //System.err.println("生产端---ack: " + ack);
            if(!ack){
                System.err.println("生产端---异常处理....");
            }
        }
    };
    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("生产端---return exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()+"---------");
        rabbitTemplate.convertAndSend("", "test_queue", msg);
    }
}

接手方代码:

package com.example.demo.amqp;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "test_queue",durable=true),
            exchange = @Exchange(value = "amq.direct",
            ignoreDeclarationExceptions = "true"),
            key = "test_queue"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload String msg,Channel channel, 
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("消费端----------------------------: " + msg);
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //System.out.println(deliveryTag);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }   
}

直接在启动类上进行测试:

package com.example.demo;

import com.example.demo.amqp.RabbitSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class DemoApplication {


    public static void main(String[] args) throws Exception{
        ConfigurableApplicationContext configurableApplicationContext =SpringApplication.run(DemoApplication.class, args);
        RabbitSender rabbitSender = configurableApplicationContext.getBean(RabbitSender.class);
        for(int i=0;i<100;i++){
            rabbitSender.send("生产端---发送的消息--"+i,null);
            Thread.sleep(500);
            //System.out.println(i);
        }

    }

}

上一篇下一篇

猜你喜欢

热点阅读