hf 关于amqp的延时消费简单示例
2023-06-28 本文已影响0人
geeooooz
没有看官方文档,直接按前面几篇文章搞了搞。
首先要在rabbitmq中增加一个交换机和队列。
交换机:x-delayed-message 类型的。
![](https://img.haomeiwen.com/i3065831/e998ac6c382e9e92.jpg)
x-delayed-type = direct
调用接口代码:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Context\ApplicationContext;
use Hyperf\Di\Annotation\Inject;
class IndexController extends AbstractController
{
#[Inject]
protected Producer $producer;
public function index(){
$message = new DemoProducer(1100,['exchange'=>'demo.xxxx','routingKey'=>'demo']);
$message->setDelayMs(120 * 1000);//定时2分钟
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
var_dump($result);
}
}
投递者代码:
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
class DemoProducer extends ProducerMessage
{
use ProducerDelayedMessageTrait;
public function __construct($data,$setting)
{
$this->setExchange($setting['exchange']);
$this->setRoutingKey($setting['routingKey']);
$this->payload = $data;
}
}
消费者代码:
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(exchange: 'demo.xxxx', routingKey: 'demo', queue: 'demo', name: "test", nums: 1)]
/**
这是一个消费者的注解,用于声明一个消费者。注解中的各个参数含义如下:
exchange:表示消费者要消费的交换机。
routingKey:表示消费者要绑定的路由键。
queue:表示消费者要消费的队列名称。
name:表示消费者的名称,用于区分不同的消费者。
nums:表示消费者的数量,这个参数指定消费者的数量后,框架会自动创建同样数量的消费者来同时消费队列中的消息。
在示例代码中的 #[Consumer(exchange: 'fanout', routingKey: 'user', queue: 'user', name: "user", nums: 1)] 注解中,
我们声明了一个名为 DemoConsumer 的消费者,它要消费 fanout 类型的交换机上的 user 路由键所绑定的名为 user 的队列中的消息,同时指定了消费者的数量为 1。
注解中的参数还可以根据实际需求进行配置,例如可以将 nums 参数设为 2,让框架创建 2 个消费者来同时消费队列中的消息,提高消费效率。
*/
class DemoConsumer extends ConsumerMessage
{
//设置交换机类型 我认为只写这个就行
protected string $type = Type::DELAYED; //这个类型是对应延时消费的类型
public function consumeMessage($data, AMQPMessage $message): string
{
try {
var_dump($data);
//业务处理...
return Result::ACK;
} catch (\Exception $exception) {
// 处理异常
// ...
return Result::REJECT;
}
}
}
还需要改一下Type文件
.\vendor\hyperf\amqp\src\Message\Type.php
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Message;
class Type
{
public const DIRECT = 'direct';
public const FANOUT = 'fanout';
public const TOPIC = 'topic';
public const DELAYED = 'x-delayed-message';//新增
public static function all()
{
return [
self::DIRECT,
self::FANOUT,
self::TOPIC,
self::DELAYED//新增
];
}
}
启动调用即可。