技术总结

Rabbitmq概念及HelloWorld

2019-02-11  本文已影响1人  AlanKim

相关概念:

Snip20180606_6.png
exchange类型

一些关键点

大部分情况下,按照最简单的方式使用就好了,作为工具书去查询《RabbitMQ实战详解》里面的配置。

web端管理

HelloWorld

加入maven依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.2.0</version>
        </dependency>
producer代码:
package rabbitmq.server;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQProducer {

    private static final String exchange_name = "exchange_siyu";

    private static final String routing_key = "routing_key_siyu";

    private static final String queue_name = "queue_siyu";

    private static final String rabbitmq_server_ip_addr = "10.199.189.30";

    private static final int rabbitmq_server_port = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {

        // 连接工厂类
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 设置连接属性及用户名密码,用户、密码要通过rabbitmqctl设置过权限
        connectionFactory.setHost(rabbitmq_server_ip_addr);
        connectionFactory.setPort(rabbitmq_server_port);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root123"); // 如果用户名密码不匹配,会连接失败

        // 建立连接,一个tcp长连接
        Connection connection = connectionFactory.newConnection();

        // 创建信道,主要操作通过channel执行,可以认为channel是虚拟化出来的一个Connection,用于复用
        Channel channel = connection.createChannel();

        // 定义路由,direct是point-2-piont的,直接到对应的单个queue中
        channel.exchangeDeclare(exchange_name,"direct",true,false,null);

        // 定义queue
        channel.queueDeclare(queue_name,true,false,false,null);

        // 通过routingkey 绑定queue和exchange
        channel.queueBind(queue_name,exchange_name,routing_key);



        // 开始发送消息
        String message = "Hello World!!";

        /* MessageProperties中预置了一部分消息的参数,比如PERSIST_TEXT_PLAIN,其中的定义如下:
        *
        *
        public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
        * */
        channel.basicPublish(exchange_name,routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

        // 关闭channel和connection
        channel.close();
        connection.close();

    }

}

Consumer
package rabbitmq.client;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitMQConsumer {

    private static final String queue_name = "queue_siyu";
    private static final String rabbitmq_server_ip_address = "10.199.189.30";
    private static final int port = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Address[] addresses = new Address[]{
                new Address(rabbitmq_server_ip_address,port)
        };

        // 长连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root123");

        // 这里创建连接跟server端不同,传入了address
        Connection connection = connectionFactory.newConnection(addresses);

        // 创建channel
        final Channel channel = connection.createChannel();

        channel.basicQos(64);// ?? 设置客户端最多接收未被ack的消息个数

        // 创建消费
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("receive msg:" + new String(body));

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 发送ack之后,消息会在queue中被删除
            }
        };

        channel.basicConsume(queue_name,consumer);

        TimeUnit.SECONDS.sleep(5);

        // 如果先关闭connection,再关闭channel,就会抛出异常:
        // com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown;
        // 所以这里一定要注意关闭的顺序
        channel.close();
        connection.close();

    }

}
上一篇 下一篇

猜你喜欢

热点阅读