RabbitMQ-主题模式

2020-07-20  本文已影响0人  jiahzhon
image.png image.png
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException {

        Connection connections = ConnectionUtils.getConnections();
        Channel channel = connections.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String msg = new String("商品-----");
        String routingKey="goods.del";
        System.out.println("send ---  " + msg);
        channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes());
        channel.close();      
        connections.close();

    }
}
public class Recv1 {
    private static final String QUEUE_NAME = "queue_topic_1";
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException {
        Connection connections = ConnectionUtils.getConnections();
        final Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("recv1   " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}
public class Recv2 {
    private static final String QUEUE_NAME = "queue_topic_2";
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException {
        Connection connections = ConnectionUtils.getConnections();
        final Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("recv2   " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}
上一篇 下一篇

猜你喜欢

热点阅读