rabbitmq入门一简单队列(Hello World!、Wor

2019-10-29  本文已影响0人  weihubeats

@[toc]
docker搭建rabbitm

mq官网

"Hello World!":

官网教程
点对点,一个生产者,一个消费者,一个队列。
特点:

在这里插入图片描述

1. mq创建一个队列

2. 创建生产者消费者

  1. 引入依赖
     dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>5.73</version>
        </dependency>
    </dependencies>
  1. 获取mq连接工具类(类似jdbc连接)
public class MQConnectionUtils {
    private static final String IP = "118.25.188.37";
    private static final Integer PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    public static Connection newConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost(IP);
        //设置端口号
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //创建连接
        Connection connection = factory.newConnection();
        return connection;

    }
}

  1. Producer 生产者
public class Producer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "直接模式消息发送";
        System.out.println("生产者发送消息:" + msg);
        // 4.发送消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        channel.close();
        newConnection.close();
    }

}

发送消息后mq会出现待消费的消息


在这里插入图片描述
  1. Customer 消费者
public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        // 2.获取通道
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }

}

消费完后mq消息就没有了

工作队列 Work queues

在这里插入图片描述

与点对点不同的是,消费者由1个变成了两个,消费者集群了
我们这里启动两个消费者


在这里插入图片描述
在这里插入图片描述

然后发送10条消息
看看结果:


在这里插入图片描述 在这里插入图片描述

可以看到实现的是均摊消费

应答模式

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
这里第二个参数表示应答模式为true,表示自动签收

这里我们将 应答模式设置为false
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
然后向消费者发送10个消息

在这里插入图片描述
可以看到消费者接收到了10个消息,但是我现在如果停止消费者
在这里插入图片描述
发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
这里我们需要加上这个channel.basicAck(envelope.getDeliveryTag(), false);
public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}

这样就表示消费者接受消息成功了
实现:添加如下代码channel.basicQos(1);

public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //表示一次只消费一个消息
        channel.basicQos(1);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}

公平队列

在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)

集成springboot

demo 路径结构:


在这里插入图片描述

2.代码测试

创建一个springboot项目,然后加入mq依赖

dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

配置 application.properties

# web端口
server.port=8089
# mq地址
spring.rabbitmq.host=118.25.188.37

测试生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class ProductTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void senMes() {
        rabbitTemplate.convertAndSend("mq","直接模式消息发送");

    }
}

运行sendMes
无错误后访问mq web管理页面发现多了一条待消费的消息


在这里插入图片描述

编写消费者

@Component
@RabbitListener(queues = "mq")  //指定消费队列
public class Customer1 {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("直接模式消费消息" + msg);
    }
}

然后直接运行main
可以看到效果


在这里插入图片描述
在这里插入图片描述

也可以启动多个消费者等待消息,具体idea启动多个实例请看这里

上一篇 下一篇

猜你喜欢

热点阅读