RabbitMQ-主题模式
2020-07-20 本文已影响0人
jiahzhon
- 将路由与某模式进行匹配
- ·
#
匹配一个或者多个 -
*
匹配一个
- ·
- 生产者:
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
Channel channel = connections.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msg = new String("商品-----");
String routingKey="goods.del";
System.out.println("send --- " + msg);
channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes());
channel.close();
connections.close();
}
}
- 消费者1:
public class Recv1 {
private static final String QUEUE_NAME = "queue_topic_1";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
final Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 " + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
- 消费者2:
public class Recv2 {
private static final String QUEUE_NAME = "queue_topic_2";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
final Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 " + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}