RabbitMQ总结

2019-01-15  本文已影响0人  wingedsnake

这里只做个人技术笔录,不搞一堆的废话
官网中提示:RabbitMQ实现了多种协议。此处测试使用AMQP 0-9-1协议。使用Java语言(通用Java代码,非Spring AMQP)

RabbitMQ官方文档中介绍了六种工作模式:

点对点模式

即最简单的单项点对点生产者消费者模式。
点对点模式模型(引用自官网)

表示发布者,即消息发送端
队列,即消息缓冲器。RabbitMQ中传输的消息数据存放在队列中
消费者,即消息接收端
消息点对点模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明队列并将消息存入队列中发送并退出。接收端连接RabbitMQ后通过通道从对应的队列中取出消息信息并打印。
发布者测试代码:
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("localhost");
String message = "你好!我的第一个rabbitMQ 队列应用。";
try(Connection newConnection = cFactory.newConnection();
    Channel createChannel = newConnection.createChannel()) {
    //通过构建的通道声明一个队列
    createChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //然后在通道上创建一个发布,将消息通过队列发布上去
    createChannel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

接收端测试代码:

try {
    ConnectionFactory cFactory = new ConnectionFactory();
    cFactory.setHost("localhost");
    Connection newConnection = cFactory.newConnection();
    Channel channel = newConnection.createChannel();
    String QUEUE_NAME = "hello";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    //队列回传
    DeliverCallback deliverCallBack = new DeliverCallback() {
        @Override
        public void handle(String consumerTag, Delivery delivery) throws IOException {
                    Envelope envelope = delivery.getEnvelope();
                    byte[] body = delivery.getBody();
                    BasicProperties properties = delivery.getProperties();
                    //
                    System.err.println("envelope-getDeliveryTag:" + envelope.getDeliveryTag());
                    System.err.println("envelope-getExchange:" + envelope.getExchange());
                    System.err.println("envelope-getRoutingKey:" + envelope.getRoutingKey());
                    //获取消息体
                    String message = new String(body,"UTF-8");
                    System.err.println("message: " + message);
                }
            };
    String basicConsume = channel.basicConsume(QUEUE_NAME, true, deliverCallBack, CancelCallback -> {});
    System.out.println(":-------basicConsume--------:" + basicConsume);
} catch (Exception e) {
    e.printStackTrace();
}

工作队列模式(Work queues)

即选择分发。当接收端存在多个,且指定接收端正在处理未处理完的业务时分发给匹配的空闲接收端。
工作队列模式的模型(引用自官网)

工作队列模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明队列并将消息存入队列中发送并退出。启动多个接收端,连接RabbitMQ服务,通过通道获取对应的队列。当某个接收端在处理业务时队列中的消息分发给空闲的接收端,处理消息并打印消息。
发布者测试代码:
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            //通过命令行获取消息文本
            String nextLine = scanner.nextLine();
            //要发送的消息
            String message = String.join(" ",nextLine);
            //队列名称
            String QUEUE_NAME = "QUEUE_NAME";
            
            ConnectionFactory cFactory = new ConnectionFactory();
            cFactory.setHost("localhost");
            try(Connection newConnection = cFactory.newConnection();
                    Channel channel = newConnection.createChannel()){
                //创建队列:可持久化的队列(durable=true)
                channel.queueDelete("QUEUE_NAME");
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

接收端测试代码:

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class QueuesCustomer {
    public static void main(String[] args) {
        ConnectionFactory cFactory = new ConnectionFactory();
        cFactory.setHost("localhost");
        
        try{
            Connection conn = cFactory.newConnection();
            Channel channel = conn.createChannel();
            //设置消费端每次获取数量为1
            channel.basicQos(1);
            //队列名称
            String QUEUE_NAME ="QUEUE_NAME";
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //消息回调函数
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(),"UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                    
                    try {
                        doWork(message);
                    }catch(Exception e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("[x] Done");
                        //关闭自动确认信息,改为手动确认信息
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, CancelCallback -> {}); 
        }catch(Exception e) {
            e.printStackTrace();
        }
    }

    protected static void doWork(String message) throws Exception {
        for (char ch: message.toCharArray()) {
            //模拟延时业务,让线程睡眠5秒
            if (ch == '.') Thread.sleep(5000);
        }
    }
}

发布/订阅模式(Publish/Subscribe)

即发布的消息广播给所有接收者。接收者同时收到消息并各自处理。
发布/订阅模式的模型(引用自官网)

发布/订阅模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,从临时队列中获取消息并打印。(官方文档中写明:如果不声明交换器,则使用默认交换器。所以前俩种模式示例未声明交换器则是声明了默认交换器)

发布端测试代码:

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {
    public static void main(String[] args) {
        //从键盘获取数据
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String nextLine = scanner.nextLine();
            //创建连接工厂
            ConnectionFactory cFactory = new ConnectionFactory();
            //通过连接工厂创建连接(Cnnectoin)及通道(Channel)
            try(Connection conn = cFactory.newConnection();
                Channel channel = conn.createChannel()){
                //创建交易所(exchange):通过管道创建
                channel.exchangeDeclare("logs", "fanout");
                String message = nextLine;
                //将数据发布到交易所中
                channel.basicPublish("logs","", null, message.getBytes());
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

订阅端测试代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Subscribe {
    public static void main(String[] args) {
        //创建工厂
        ConnectionFactory cFactory = new ConnectionFactory();
        //创建连接 
        //创建通道
        try {
            Connection connection = cFactory.newConnection();
            Channel channel = connection.createChannel();
            String queue = channel.queueDeclare().getQueue();
            //如果有则订阅到交易所上,如果没有则创建一个
            channel.exchangeDeclare("logs", "fanout");
            //将队列绑定到交易所
            channel.queueBind(queue, "logs", "");
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                System.err.println(queue + ":---> " + new String(message.getBody(),"UTF-8"));
            };
            channel.basicConsume(queue,deliverCallback,CancelCallback->{});
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

路由模式(Routing)

即发布端来指定分发的接收端。
路由模式的模型(引用自官网)

路由模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中并指定接收者。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,指定的接收者从临时队列中获取消息并打印。
发布端测试代码:
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 发布者:路由方式
 * @author zhengjie
 */
public class PublicsherServerity {
    
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) {
        //从命令提示符中获取消息(消息格式:    日志级别:日志内容)
        Scanner scanner = new Scanner(System.in);
        //创建连接工厂
        ConnectionFactory cFactory = new ConnectionFactory();
        cFactory.setHost("localhost");
        while(scanner.hasNext()) {
            //键盘中获取的消息
            String nextLine = scanner.nextLine();
            //如果字符串不存在':'则跳过,直接再次重头再来
            if(!nextLine.contains(":")) {
                continue;
            }
            String[] split = nextLine.split(":");
            //创建连接及通道,并通过JDK1.7自动关闭特性简化代码
            try(Connection conn = cFactory.newConnection();
                    Channel channel = conn.createChannel()){
                //创建交换器
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                channel.basicPublish(EXCHANGE_NAME,split[0], null, split[1].getBytes("UTF-8"));
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

订阅者A:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 订阅者A: 日记级别: error
 * @author zhengjie
 */
public class SubscribeA {
    
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) {
        
        //创建连接工厂
        ConnectionFactory cFactory = new ConnectionFactory();
        cFactory.setHost("localhost");
        try {
            //获取连接
            Connection connection = cFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
            //随机生成队列编号
            String queue = channel.queueDeclare().getQueue();
            //绑定队列与交换器
            channel.queueBind(queue, EXCHANGE_NAME, "error");
            //获取数据的回调函数: 采用Labda表达式
            DeliverCallback deliverCallback = (consumerTag, delivery)->{
                String routingKey = delivery.getEnvelope().getRoutingKey();
                System.out.println("message _ " + routingKey + " : " + new String(delivery.getBody(),"UTF-8"));
            };
            channel.basicConsume(queue,true, deliverCallback, CancelCallback -> {});
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

订阅者B:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 订阅者B: 日志级别: debug
 * 
 * @author zhengjie
 */
public class SubscribeB {
    
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory cFactory = new ConnectionFactory();
        cFactory.setHost("localhost");
        try {
            // 获取连接
            Connection connection = cFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 随机生成队列编号
            String queue = channel.queueDeclare().getQueue();
            // 绑定队列与交换器
            channel.queueBind(queue, EXCHANGE_NAME, "debug");

            // 获取数据的回调函数: 采用Labda表达式
            DeliverCallback deliverCallback = (customTag, message) -> {
                System.out.println("message _ debug : " + new String(message.getBody(), "UTF-8"));
            };
            channel.basicConsume(queue,true, deliverCallback, CancelCallback -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主题模式(Topics)

即通过通配符来选择指定接收端集合(发送到主题交换的邮件不能具有任意路由密钥: 必须是由点分隔的单词列表)。
主题模式的模型(引用自官网)

主题模式模型图
程序测试流程:
发布者连接到RabbitMQ服务后,声明交换器(exchange)将消息发布至交换器中并通过通配符指定接收者集合(必须是由点分隔的单词列表: \* (星号) 只能替换一个词 | # (井号) 可以替换零个或多个单词。)。启动多个接收端,连接RabbitMQ服务,通过通道各自声明一个临时队列。然后绑定队列至交换器上,指定的接收者集合从临时队列中获取消息并打印。
发布端测试代码:
import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* 发布者
* @author zhengjie
*/
public class Publisher {
   
   private static final String EXCHANGE_NAME = "topic";
   
   public static void main(String[] args) {
       //从键盘上输入: 
       Scanner scanner = new Scanner(System.in);
       while(scanner.hasNext()) {
           String msg = scanner.nextLine();
           String[] split = msg.split(":");
           //创建RabbitMQ连接工厂
           ConnectionFactory cFactory = new ConnectionFactory();
           try(Connection conn = cFactory.newConnection();
                   Channel channel = conn.createChannel()){
               //创建交换器
               channel.exchangeDeclare(EXCHANGE_NAME, "topic");
               String routingKey = split[0];
               byte[] message = split[1].getBytes();
               channel.basicPublish(EXCHANGE_NAME, routingKey, null, message);
           }catch(Exception e) {
               e.printStackTrace();
           }
       }
   }
}

订阅端A测试代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 订阅者A: 匹配: *.good.*
 * @author zhengjie
 */
public class SubscribeA {
    
    private static final String EXCHANGE_NAME = "topic";
    
    public static void main(String[] args) {
        ConnectionFactory cFactory = new ConnectionFactory();
        try{
            Connection conn = cFactory.newConnection();
            Channel channel = conn.createChannel();
            //创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //创建临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定临时队列与交换器
            channel.queueBind(queue, EXCHANGE_NAME, "*.good.*");
            //返回消息回调
            DeliverCallback deliverCallback = (customTag,deliverVal) -> {
                System.out.println(deliverVal.getEnvelope().getRoutingKey() + ": "
                            + new String(deliverVal.getBody(),"UTF-8"));
            };
            channel.basicConsume(queue, true, deliverCallback, CancelCallback -> {});
            
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

订阅端B测试代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 订阅者B: 匹配: good.#
 * 
 * @author zhengjie
 */
public class SubscribeB {
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] args) {
        ConnectionFactory cFactory = new ConnectionFactory();
        try {
            Connection conn = cFactory.newConnection();
            Channel channel = conn.createChannel();
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 创建临时队列
            String queue = channel.queueDeclare().getQueue();
            // 绑定临时队列与交换器
            channel.queueBind(queue, EXCHANGE_NAME, "good.#");
            // 返回消息回调
            DeliverCallback deliverCallback = (customTag, deliverVal) -> {
                System.out.println(
                        deliverVal.getEnvelope().getRoutingKey() + ": " + new String(deliverVal.getBody(), "UTF-8"));
            };
            channel.basicConsume(queue, true, deliverCallback, CancelCallback -> {
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

远程过程调用模式(RPC)

即客户端发送请求消息,服务器回复响应消息。
远程过程调用模型(引用自官网)

远程过程调用模型图
程序测试流程:
客户端发送带有两个属性的消息: replyTo字段,设置为仅为请求创建的匿名独占队列;correlationId字段,设置为每个请求的唯一值。请求被发送到指定队列中。服务端则等待请求,当出现请求时,它会执行该作业,并使用来自replyTo字段的队列将带有结果的消息发送回客户端。
服务端测试代码:
import com.rabbitmq.client.*;
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

客户端测试代码:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

spring AMQP简单测试用代码

Service业务代码:

//service接口
import com.rabbitmq.bean.User;
public interface RabbitMqService {
    //发送消息
    void sendMsg(String msg);
    //接收用户消息
    void sendUser(User user);
}

//service实现
@Service
public class RabbitMqServiceImpl implements ConfirmCallback, RabbitMqService {
    
    @Value("${rabbitmq.queue.msg}")
    private String msgRouting = null;
    @Value("${rabbitmq.queue.user}")
    private String userRouting = null;
    
    @Autowired
    private RabbitTemplate rabbitTemplate = null;

    //发送消息
    @Override
    public void sendMsg(String msg) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend(msgRouting,msg);
    }

    @Override
    public void sendUser(User user) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend(userRouting, user);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack) {
            System.out.println("----消息确认送达成功----");
        }else {
            System.out.println("----消息确认送达失败----" + cause);
        }
    }
}

service层实现ConfirmCallback了接口,这样当服务端接收数据确认后会产生回调执行confirm方法。
配置Bean:

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BeanConfig {
    @Value("${rabbitmq.queue.msg}")
    private String msgQueueName = null;
    @Value("${rabbitmq.queue.user}")
    private String userQueueName = null;
    //创建字符串消息队列
    //创建的队列对象为持久队列,则可以序列化到磁盘中,服务器重启不受影响。
    @Bean
    public Queue createQueueMsg() {
        return new Queue(msgQueueName,true);
    }
    //创建对象消息队列
    @Bean
    public Queue createQueueUser() {
        return new Queue(userQueueName,true);
    }
}

spring容器中存在的Queue对象会自动注册至rabbitmq中。
服务端接收逻辑:

@Component
public class RabbitMessageReceiver {
    //定义监听字符串队列名称
    @RabbitListener(queues= {"${rabbitmq.queue.msg}"})
    public void receiveMsg(String msg) {
        System.out.println("----收到消息:----" + msg);
    }
    //定义监听对象列名称
    @RabbitListener(queues= {"${rabbitmq.queue.user}"})
    public void receiveObj(User user) {
        System.out.println("----收到对象:----" + user);
    }
}

服务器通过@RabbitListener注解绑定对应的队列,当服务器接收到客户端发送的消息后执行指定的业务逻辑。
实体对象:

import java.io.Serializable;
//因为队列为持久队列,所以传输的对象需要序列化
public class User implements Serializable{
    private Integer id;
    private String userName;
    private String userPass;
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getUserPass() {
        return userPass;
    }
    public void setUserPass(String userPass) {
        this.userPass = userPass;
    }
    @Override
    public String toString() {
        return "User [id=" + id + ", userName=" + userName + ", userPass=" + userPass + "]";
    }
}

测试总测试了发送/接收实体对象,所以创建了一个测试实体类。
客户端请求Controller:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.rabbitmq.bean.User;
import com.rabbitmq.service.RabbitMqService;
@Controller
public class RabbitMqController {
    @Autowired
    private RabbitMqService raiService;
    //发送字符串请求
    @RequestMapping("/str")
    @ResponseBody
    public String sendMsg(String msg) {
        raiService.sendMsg(msg);
        return "msg";
    }
    //发送对象请求
    @RequestMapping("/user")
    @ResponseBody
    public String sendUser(User user) {
        raiService.sendUser(user);
        return "usr";
    }
}

RabbitMQ的spring boot测试配置:

# 配置rabbitmq服务的主机地址
spring.rabbitmq.host=localhost
# 配置rabbitmq服务端口
spring.rabbitmq.port=5672
# rabbitmq发布者消息是否自动确认
spring.rabbitmq.publisher-confirms=true
# 自定义的配置
rabbitmq.queue.msg=rabbitmq.msg
rabbitmq.queue.user=abbitmq.entity
# 配置rabbitmq密码,这里没有密码
# spring.rabbitmq.password=
# 配置rabbitmq账户,这里没有账户
# spring.rabbitmq.username=

Maven项目依赖(Spring boot Pom.xml部分):

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
上一篇下一篇

猜你喜欢

热点阅读