hf 关于amqp的延时消费简单示例

2023-06-28  本文已影响0人  geeooooz

没有看官方文档,直接按前面几篇文章搞了搞。

首先要在rabbitmq中增加一个交换机和队列。
交换机:x-delayed-message 类型的。


20230629-184840.jpg

x-delayed-type = direct

调用接口代码:

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Context\ApplicationContext;
use Hyperf\Di\Annotation\Inject;

class IndexController extends AbstractController
{

    #[Inject]
    protected Producer $producer;
    public function index(){
        $message = new DemoProducer(1100,['exchange'=>'demo.xxxx','routingKey'=>'demo']);
        $message->setDelayMs(120  * 1000);//定时2分钟
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
    }
}

投递者代码:

<?php

declare(strict_types=1);

namespace App\Amqp\Producer;

use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;

class DemoProducer extends ProducerMessage
{
    use ProducerDelayedMessageTrait;
    public function __construct($data,$setting)
    {
        $this->setExchange($setting['exchange']);
        $this->setRoutingKey($setting['routingKey']);
        $this->payload = $data;
    }
}

消费者代码:

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;

#[Consumer(exchange: 'demo.xxxx', routingKey: 'demo', queue: 'demo', name: "test", nums: 1)]
/**
这是一个消费者的注解,用于声明一个消费者。注解中的各个参数含义如下:
exchange:表示消费者要消费的交换机。
routingKey:表示消费者要绑定的路由键。
queue:表示消费者要消费的队列名称。
name:表示消费者的名称,用于区分不同的消费者。
nums:表示消费者的数量,这个参数指定消费者的数量后,框架会自动创建同样数量的消费者来同时消费队列中的消息。
在示例代码中的 #[Consumer(exchange: 'fanout', routingKey: 'user', queue: 'user', name: "user", nums: 1)] 注解中,
我们声明了一个名为 DemoConsumer 的消费者,它要消费 fanout 类型的交换机上的 user 路由键所绑定的名为 user 的队列中的消息,同时指定了消费者的数量为 1。
注解中的参数还可以根据实际需求进行配置,例如可以将 nums 参数设为 2,让框架创建 2 个消费者来同时消费队列中的消息,提高消费效率。
 */


class DemoConsumer extends ConsumerMessage
{
    //设置交换机类型  我认为只写这个就行
    protected string $type = Type::DELAYED; //这个类型是对应延时消费的类型

    public function consumeMessage($data, AMQPMessage $message): string
    {
        try {
            var_dump($data);
            //业务处理...

            return Result::ACK;
        } catch (\Exception $exception) {
            // 处理异常
            // ...
            return Result::REJECT;
        }

    }


}

还需要改一下Type文件
.\vendor\hyperf\amqp\src\Message\Type.php

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace Hyperf\Amqp\Message;

class Type
{
    public const DIRECT = 'direct';

    public const FANOUT = 'fanout';

    public const TOPIC = 'topic';
    public const DELAYED = 'x-delayed-message';//新增

    public static function all()
    {
        return [
            self::DIRECT,
            self::FANOUT,
            self::TOPIC,
            self::DELAYED//新增
        ];
    }
}

启动调用即可。

上一篇 下一篇

猜你喜欢

热点阅读