rabbitmq

rabbitmq生产消费示例代码

2020-11-13  本文已影响0人  于情于你
public class RabbitmqTest {
    private static final String EXCHANGE_NAME ="test_exchange";
    private static final String ROUTING_KEY ="test_routing_key";
    private static final String QUEUE_NAME ="test_queue";
    private static final String USER_NAME ="rabbit";
    private static final String PASSWORD ="rbDMmPQHFq";
    private static final String IP_ADDRESS ="xxx";
    private static final int PORT = 5662;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
      
        // 生产消息
        publish("hello_world");
        
        // 消费消息
        consume();

    }

    private static void consume() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASSWORD);

        // 创建连接
        Connection connection = connectionFactory.newConnection();
      
        // 创建信道
        final Channel channel = connection.createChannel();
      
        // 设置客户端最多接收未被 ack 的消息的个数
        channel.basicQos(64);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("recv message: " + new String(body));

                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);

        // 等待回调函数执行完毕之后 关闭资源
        TimeUnit.SECONDS.sleep(5);

        channel.close();
        connection.close();
    }

    private static void publish(String message) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASSWORD);


        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
      
        // 创建交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        
        // 创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 队列交换器绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        
        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        channel.close();
        connection.close();
    }



}
上一篇下一篇

猜你喜欢

热点阅读