RabbitMQ 死信队列

2020-01-13  本文已影响0人  567f84810acc

死信队列

"死信"模式 指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。

x-max-length :限制队列的最大长度
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

延迟消息使用

通过死信队列实现

形成条件:

设置队列参数

Optional name of an exchange to which messages will be republished if they are rejected or expire.
实际上,官方用词 "rejected" 应该被翻译成 : "丢弃"或者"抛弃",而不是"拒绝",也就是说,这里的 rejected 和 expire 是队列里面的消息的状态.而不是队列的动作.

发送消息

content_type
content_encoding
priority
correlation_id
reply_to
expiration //过期时间 毫秒 5000 --->> 5s
message_id
timestamp
type
user_id
app_id
cluster_id
# 使用 bschmitt/laravel-amqp

\Amqp::publish('routing-key', 'message' , [
      'vhost'=>'/test',
    'exchange_type' => 'direct',
    'exchange' => 'test.direct',
    'exchange_durable'=>true, //交换机 持久化
    'exchange_internal'=>false, //内部使用
    'exchange_auto_delete'  => false,
    'exchange_properties'=>[

    ],
    'queue_force_declare' => true, //强制持久化 队列持久 的话 交换机也要持久化 否则无法连接
    'queue_exclusive' => true, // 排他性 只对当前连接可见
    'queue_auto_delete'     => false,
    'queue' => 'test_queue_name',
    'queue_properties'=>[
        'x-message-ttl' => ['I',30000], //I 看上一片下面类型介绍 30s 过期
        'x-dead-letter-exchange'=>'ttt.exchange',
        'x-dead-letter-routing-key'=>'ttt_bk'
    ]
]);

接收消息

ACK && REJECT
    /**
     * Acknowledges a message
     *
     * @param AMQPMessage $message
     */
    public function acknowledge(AMQPMessage $message)
    {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }
    /**
     * Rejects a message and requeues it if wanted (default: false)
     *
     * @param AMQPMessage $message
     * @param bool    $requeue
     */
    public function reject(AMQPMessage $message, $requeue = false)
    {
        $message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], $requeue);
    }

-------------------------------------------------
\Amqp::consume('ttt_bk', function ($message, $resolver) {
    var_dump($message->body);
    $resolver->acknowledge($message); //ack
    # $resolver->reject($message,true|false); // Rejects a message and requeues it if wanted (default: false)  拒绝消息
}, [
    'exchange' => 'ttt.exchange',
    'exchange_type' => 'fanout',
    'queue_force_declare' => true,
     .... 参数和发布的一样 省略。。。。
    'persistent' => true
]);

然后 消费者 指定 x-dead-letter-exchange 和 routing-key 进行消费处理

使用注意

rabbitmq 的延迟队列是针对队列的延迟  单条消息的过期时间 是按照顺序执行的 无法保证消息不同粒度的过期时间同时过期

redis 实现简单延迟队列

方案一:
zset 有序集合 socre 对应执行时间戳

每s 扫描出 小于当前的的任务 

缺点 : 效率较慢  针对数据量比较大的情况 延迟高 
------------------------------------------------
方案二:
    监听redis 的key过期删除事件
    redis 文档表示 key的过期时间 ttl 为0 不代表立即删除 

来源 : www.alonexy.com

上一篇下一篇

猜你喜欢

热点阅读