RabbitMQ 队列中的消息一直存在的问题排查

2023-11-28  本文已影响0人  雁过留声_泪落无痕

背景

Logback 配置了 AmqpAppender 用于将日志输入 RabbitMQ :

<appender name="TRACE-RABBITMQ" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
    <layout>
        <pattern>
            {
            "level": "%level",
            "service": "${springAppName:-}",
            "time": "%d{yyyy-MM-dd HH:mm:ss}",
            "ip": "%X{ip:-}",
            "trace": "%X{traceUuid:-}",
            "message": "%message"
            }
        </pattern>
    </layout>
    <!--rabbitmq地址 -->
    <host>${RABBITMQ_HOST}</host>
    <port>${RABBITMQ_PORT}</port>
    <username>${RABBITMQ_USERNAME}</username>
    <password>${RABBITMQ_PASSWORD}</password>
    <declareExchange>true</declareExchange>
    <exchangeType>direct</exchangeType>
    <exchangeName>xxx.log</exchangeName>
    <routingKeyPattern>trace</routingKeyPattern>
    <generateId>true</generateId>
    <charset>UTF-8</charset>
    <durable>true</durable>
    <deliveryMode>NON_PERSISTENT</deliveryMode>
    <autoDelete>false</autoDelete>
</appender>

Logstash 配置了从 RabbitMQ 读取消息:

input {
   rabbitmq{
        host => "192.168.xx.xx:5672"
        vhost => "/"
        user => "xxx"
        password => "xxx"
        exchange => "xxx.log"
        exchange_type => "direct"
        key => "trace"
        durable => true
        codec => json
        ack => true
   }
}

在 RabbitMQ 中创建 xxx.log 交换机,direct 类型;再创建 log_queue 的队列并绑定到 xxx.log,路由键为 trace。

现象

分析

走了很多弯路,最后回到 MQ 的文档,慢慢看核心概念,最终反应过来,消费者只需要指定队列就可以了。

解决

结合logstash基于RabbitMQ的输入配置 文章,我们在 LogStash 的配置中只需要指定队列名就行了:

input {
   rabbitmq{
        host => "192.168.xx.xx:5672"
        vhost => "/"
        user => "xxx"
        password => "xxx"
        queue => "log_queue"
        durable => true
        codec => json
        ack => true
   }
}

之所以之前 LogStash 还是能收到消息,猜测是它直连了 Exchange,所以从 Exchange 中获取到了消息,而同时消息还要进入 log_queue,由于没有任何消费者连接 log_queue,故该队列中的消息会一直积压。

上一篇 下一篇

猜你喜欢

热点阅读