RabbitMQ-消费消息-basicConsume

2018-04-02  本文已影响1956人  AmeeLove

RabbitMQ-消费消息

 Address[] addresses = new Address[] {new Address(IP_ADDRESS, PORT)};
        /**
         * 1.建立连接工厂
         */
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASSWORD);
        /**
         * 网络故障自动连接恢复
         */
        connectionFactory.setAutomaticRecoveryEnabled(true);
        
        /**
         * 2.创建连接 和生产者有一点不同
         */
        Connection connection = connectionFactory.newConnection(addresses);
        
        /**
         * 3.创建信道
         */
        final Channel channel = connection.createChannel();
        
        /**
         * 4.设置客户端最多接收示被ack的消息个数
         */
        channel.basicQos(64);
        
        
        
        Consumer consumer  =new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("接收消息 :   "+new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
//消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        /**
         * 回调
         */
        channel.basicConsume(QUEUR_NAME, consumer);
        /**
         * 关闭资源
         */
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();

basicConsume方法

String basicConsume(String queue, Consumer callback) throws IOException;

 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

 String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;


String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
//消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
      @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
@Override
    public void handleConsumeOk(String consumerTag) {
        this._consumerTag = consumerTag;
    }

  @Override
    public void handleCancelOk(String consumerTag) {
        // no work to do
    }

 @Override
    public void handleCancel(String consumerTag) throws IOException {
        // no work to do
    }

   @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        // no work to do
    }

  @Override
    public void handleRecoverOk(String consumerTag) {
        // no work to do
    }

channel.basicAck();确认消息

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

上一篇 下一篇

猜你喜欢

热点阅读