程序员

RabbitMQ创建生产者和消费者

2019-12-13  本文已影响0人  勿念及时雨

创建生产者代码示例:

public class Producer {
    //RabbitMQ服务器地址
    public final static String host="192.168.1.1";
    //RabbitMQ端口
    public final static int port=5672;
    //RabbitMQ虚拟主机
    public static final String virtualHost="/";
    //RabbitMQ用户名
    public final static String username="admin";
    //RabbitMQ密码
    public final static String password="123456";
    //队列名称
    public final static String queue_name="serviceNotice.queue";

    public static void main(String[] args) throws IOException{
        //创建连接工厂,此部分可以单独抽出作为一个静态抽象方法以便调用
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务器地址
        factory.setHost(host);
        //设置服务器端口
        factory.setPort(port);
        //设置虚拟主机
        factory.setVirtualHost(virtualHost);
        //设置用户名
        factory.setUsername(userName);
        //设置密码
        factory.setPassword(password);
        //获取连接
        Connection connection=factory.newConnection();
        //创建信道
        Channel channel=connection.createChannel();
        //信道指定队列设置,如果在Rabbit管理工具中创建了队列,则不需要调用此方法
        //参数(名字,是否持久化,独占的队列,不使用时是否自动删除,其他参数)
        channel.queueDeclare(queue_name,true,false,true,null);
        String message="这是一个测试消息";
        //发布消息
        //参数(交换器名称,队列名称,属性,参数的字节数据)
        channel.basicPublish("",queue_name,null,message.getBytes());
        //关闭信道
        channel.close();
        //关闭连接
        connection.close();
    }
}

创建消费者代码示例:

public class Consumer {
     //RabbitMQ服务器地址
    public final static String host="192.168.1.1";
    //RabbitMQ端口
    public final static int port=5672;
    //RabbitMQ虚拟主机
    public static final String virtualHost="/";
    //RabbitMQ用户名
    public final static String username="admin";
    //RabbitMQ密码
    public final static String password="123456";
    //队列名称
    public final static String queue_name="serviceNotice.queue";

    public static void main(String[] args) throws IOException {
        //创建连接工厂,此部分可以单独抽出作为一个静态抽象方法以便调用
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务器地址
        factory.setHost(host);
        //设置服务器端口
        factory.setPort(port);
        //设置虚拟主机
        factory.setVirtualHost(virtualHost);
        //设置用户名
        factory.setUsername(userName);
        //设置密码
        factory.setPassword(password);
        //获取连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //信道设置,如果在Rabbit管理工具中创建了队列,则不需要调用此方法
        channel.queueDeclare(queue_name, true, false, true, null);
        //创建消费者
        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
        //消费消息,false表示需手动确认消息已成功获取
        channel.basicConsume(queue_name,false,queueingConsumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收消息:" + message);
            //消息确认为成功获取,false表示不重新入队
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读