rabbitmq生产消费示例代码
2020-11-13 本文已影响0人
于情于你
public class RabbitmqTest {
private static final String EXCHANGE_NAME ="test_exchange";
private static final String ROUTING_KEY ="test_routing_key";
private static final String QUEUE_NAME ="test_queue";
private static final String USER_NAME ="rabbit";
private static final String PASSWORD ="rbDMmPQHFq";
private static final String IP_ADDRESS ="xxx";
private static final int PORT = 5662;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 生产消息
publish("hello_world");
// 消费消息
consume();
}
private static void consume() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道
final Channel channel = connection.createChannel();
// 设置客户端最多接收未被 ack 的消息的个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
// 等待回调函数执行完毕之后 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
private static void publish(String message) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 创建交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列交换器绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();
}
}