hyperf3.0--amqp
2023-06-26 本文已影响0人
geeooooz
官方文档 https://hyperf.wiki/3.0/#/zh-cn/amqp
安装
composer require hyperf/amqp
连接配置
<?php
//config/autoload/amqp.php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
use Hyperf\Amqp\IO\IOFactory;
return [
'enable' => true,
'default' => [
'host' => '127.0.0.1',//env('AMQP_HOST', '192.168.0.44'),
'port' => 5672,//(int) env('AMQP_PORT', 5672),
'user' => 'admin',//env('AMQP_USER', 'root'),
'password' => 'admin',//env('AMQP_PASSWORD', 'root'),
'vhost' => env('AMQP_VHOST', '/test'),
'open_ssl' => false,
'concurrent' => [
'limit' => 2,
],
'pool' => [
'connections' => 2,
],
'io' => IOFactory::class,
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3,
'read_write_timeout' => 6,
'context' => null,
'keepalive' => true,
'heartbeat' => 3,
'channel_rpc_timeout' => 0.0,
'close_on_destruct' => false,
'max_idle_channels' => 10,
],
],
];
投递消息
使用 gen:producer 命令创建一个 producer
php bin/hyperf.php gen:amqp-producer DemoProducer
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
#[Producer(exchange: 'fanout', routingKey: 'user')]
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
//这里可以拿$data做一些操作 最后返回的值 就是传到rabbitmq的
$this->payload = $data;
}
}
在Index控制器中调用 投递消息
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use Hyperf\Context\ApplicationContext;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
class IndexController extends AbstractController
{
public function index()
{
$message = new DemoProducer(1);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
var_dump($result);
}
}
#[Producer] 注解是用于声明一个生产者的,它用于将消息发送到指定的交换机和路由键上。在 #[Producer] 注解中,确实没有队列字段,因为生产者并不需要绑定队列,它只需要将消息发送到指定的交换机上即可。
当消息发送到交换机后,交换机会根据绑定的规则将消息路由到相应的队列中。所以,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够被正确地路由到指定的队列中。
在示例代码中的 #[Producer(exchange: 'fanout', routingKey: 'user')] 注解中,我们声明了一个名为 user 的路由键,该路由键被绑定到了 fanout 类型的交换机上。当消息发送到交换机时,交换机会将消息路由到所有绑定的队列中,因为 fanout 类型的交换机会将所有接收到的消息广播到所有绑定的队列中。
因此,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够正确地被路由到指定的队列中。
消费消息
使用 gen:amqp-consumer 命令创建一个 consumer。
php bin/hyperf.php gen:amqp-consumer DemoConsumer
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(exchange: 'demo', routingKey: 'demo', queue: 'demo', name: "demo", nums: 1)]
/**
这是一个消费者的注解,用于声明一个消费者。注解中的各个参数含义如下:
exchange:表示消费者要消费的交换机。
routingKey:表示消费者要绑定的路由键。
queue:表示消费者要消费的队列名称。
name:表示消费者的名称,用于区分不同的消费者。
nums:表示消费者的数量,这个参数指定消费者的数量后,框架会自动创建同样数量的消费者来同时消费队列中的消息。
在示例代码中的 #[Consumer(exchange: 'fanout', routingKey: 'user', queue: 'user', name: "user", nums: 1)] 注解中,
我们声明了一个名为 DemoConsumer 的消费者,它要消费 fanout 类型的交换机上的 user 路由键所绑定的名为 user 的队列中的消息,同时指定了消费者的数量为 1。
注解中的参数还可以根据实际需求进行配置,例如可以将 nums 参数设为 2,让框架创建 2 个消费者来同时消费队列中的消息,提高消费效率。
*/
class DemoConsumer extends ConsumerMessage
{
//设置交换机类型 我认为只写这个就行
protected string $type = Type::DIRECT; //Type::FANOUT;
public function consumeMessage($data, AMQPMessage $message): string
{
var_dump($data);
//业务处理...
return Result::ACK;
}
}
启动就行了
禁止消费进程自启
默认情况下,使用了 #[Consumer]
注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。 如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便。
这种情况,只需要在 #[Consumer]
注解中配置 enable=false
(默认为 true
跟随服务启动)或者在对应的消费者中重写类方法 isEnable()
返回 false
即可
<?php
declare(strict_types=1);
namespace App\Amqp\Consumers;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1, enable: false)]
class DemoConsumer extends ConsumerMessage
{
public function consumeMessage($data, AMQPMessage $message): string
{
print_r($data);
return Result::ACK;
}
public function isEnable(): bool
{
return parent::isEnable();
}
}
设置最大消费数
可以修改 #[Consumer]
注解中的 maxConsumption
属性,设置此消费者最大处理的消息数,达到指定消费数后,消费者进程会重启。
消费结果
框架会根据 Consumer
内的 consume
方法所返回的结果来决定该消息的响应行为,共有 4 中响应结果,分别为 \Hyperf\Amqp\Result::ACK
、\Hyperf\Amqp\Result::NACK
、\Hyperf\Amqp\Result::REQUEUE
、\Hyperf\Amqp\Result::DROP
,每个返回值分别代表如下行为:
返回值 | 行为 |
---|---|
\Hyperf\Amqp\Result::ACK | 确认消息正确被消费掉了 |
\Hyperf\Amqp\Result::NACK | 消息没有被正确消费掉,以 basic_nack 方法来响应 |
\Hyperf\Amqp\Result::REQUEUE | 消息没有被正确消费掉,以 basic_reject 方法来响应,并使消息重新入列 |
\Hyperf\Amqp\Result::DROP | 消息没有被正确消费掉,以 basic_reject 方法来响应 |