RabbitMQ 的自定义消费者使用
2019-01-25 本文已影响0人
HmilyMing
之前的文章里面,我都是在消费端的代码里面编写 while 循环,进行 consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理,这种方式太 low 了,耦合性太高,所以要使用自定义的 consumer 来解耦,这种方式更方便一些,也是在实际工作中最常用的使用方式
下面来看看具体的代码实现, 代码地址:
https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
如图所示,先来实现我们的自定义消费者
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, //消费者标签
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
}
}
接着,重点来了,在声明消费者的代码里面使用刚才的自定义消费者
/**
* 使用自定义消费者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_consumer_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "consumer.#";
public static final String ROUTING_KEY = "consumer.save";
public static final String QUEUE_NAME = "test_consumer_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
//使用自定义消费者
channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
log.info("消费端启动成功");
}
}
生产端代码基本不需要修改
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ Consumer Message";
for(int i = 0; i < 5; i ++){
log.info("生产端发送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
先启动消费端,再启动生产端,查看运行结果:注意看消费端的日志,打印出了我们自定义消费者里面的东西了。
至此,简单的使用自定义消费者demo就完成了。