hyperf3.0--amqp

2023-06-26  本文已影响0人  geeooooz

官方文档 https://hyperf.wiki/3.0/#/zh-cn/amqp

安装

composer require hyperf/amqp

连接配置

<?php
//config/autoload/amqp.php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
use Hyperf\Amqp\IO\IOFactory;

return [
    'enable' => true,
    'default' => [
        'host' => '127.0.0.1',//env('AMQP_HOST', '192.168.0.44'),
        'port' => 5672,//(int) env('AMQP_PORT', 5672),
        'user' => 'admin',//env('AMQP_USER', 'root'),
        'password' => 'admin',//env('AMQP_PASSWORD', 'root'),
        'vhost' => env('AMQP_VHOST', '/test'),
        'open_ssl' => false,
        'concurrent' => [
            'limit' => 2,
        ],
        'pool' => [
            'connections' => 2,
        ],
        'io' => IOFactory::class,
        'params' => [
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3,
            'read_write_timeout' => 6,
            'context' => null,
            'keepalive' => true,
            'heartbeat' => 3,
            'channel_rpc_timeout' => 0.0,
            'close_on_destruct' => false,
            'max_idle_channels' => 10,
        ],
    ],
];

投递消息

使用 gen:producer 命令创建一个 producer
php bin/hyperf.php gen:amqp-producer DemoProducer

<?php

declare(strict_types=1);

namespace App\Amqp\Producer;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;

#[Producer(exchange: 'fanout', routingKey: 'user')]
class DemoProducer extends ProducerMessage
{
    public function __construct($data)
    {
         //这里可以拿$data做一些操作  最后返回的值 就是传到rabbitmq的
        $this->payload = $data;
    }
}

在Index控制器中调用 投递消息

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Context\ApplicationContext;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;

class IndexController extends AbstractController
{
    public function index()
    {
        $message = new DemoProducer(1);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
    }
}

#[Producer] 注解是用于声明一个生产者的,它用于将消息发送到指定的交换机和路由键上。在 #[Producer] 注解中,确实没有队列字段,因为生产者并不需要绑定队列,它只需要将消息发送到指定的交换机上即可。

当消息发送到交换机后,交换机会根据绑定的规则将消息路由到相应的队列中。所以,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够被正确地路由到指定的队列中。

在示例代码中的 #[Producer(exchange: 'fanout', routingKey: 'user')] 注解中,我们声明了一个名为 user 的路由键,该路由键被绑定到了 fanout 类型的交换机上。当消息发送到交换机时,交换机会将消息路由到所有绑定的队列中,因为 fanout 类型的交换机会将所有接收到的消息广播到所有绑定的队列中。

因此,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够正确地被路由到指定的队列中。

消费消息

使用 gen:amqp-consumer 命令创建一个 consumer。
php bin/hyperf.php gen:amqp-consumer DemoConsumer

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;

#[Consumer(exchange: 'demo', routingKey: 'demo', queue: 'demo', name: "demo", nums: 1)]
/**
这是一个消费者的注解,用于声明一个消费者。注解中的各个参数含义如下:
exchange:表示消费者要消费的交换机。
routingKey:表示消费者要绑定的路由键。
queue:表示消费者要消费的队列名称。
name:表示消费者的名称,用于区分不同的消费者。
nums:表示消费者的数量,这个参数指定消费者的数量后,框架会自动创建同样数量的消费者来同时消费队列中的消息。
在示例代码中的 #[Consumer(exchange: 'fanout', routingKey: 'user', queue: 'user', name: "user", nums: 1)] 注解中,
我们声明了一个名为 DemoConsumer 的消费者,它要消费 fanout 类型的交换机上的 user 路由键所绑定的名为 user 的队列中的消息,同时指定了消费者的数量为 1。
注解中的参数还可以根据实际需求进行配置,例如可以将 nums 参数设为 2,让框架创建 2 个消费者来同时消费队列中的消息,提高消费效率。
 */


class DemoConsumer extends ConsumerMessage
{
    //设置交换机类型  我认为只写这个就行
    protected  string $type = Type::DIRECT; //Type::FANOUT;

    public function consumeMessage($data, AMQPMessage $message): string
    {
        var_dump($data);
        //业务处理...

        return Result::ACK;
    }
}

启动就行了

禁止消费进程自启

默认情况下,使用了 #[Consumer] 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。 如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便。

这种情况,只需要在 #[Consumer] 注解中配置 enable=false (默认为 true 跟随服务启动)或者在对应的消费者中重写类方法 isEnable() 返回 false 即可

<?php

declare(strict_types=1);

namespace App\Amqp\Consumers;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;

#[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1, enable: false)]
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        print_r($data);
        return Result::ACK;
    }

    public function isEnable(): bool
    {
        return parent::isEnable();
    }
}

设置最大消费数

可以修改 #[Consumer] 注解中的 maxConsumption 属性,设置此消费者最大处理的消息数,达到指定消费数后,消费者进程会重启。

消费结果

框架会根据 Consumer 内的 consume 方法所返回的结果来决定该消息的响应行为,共有 4 中响应结果,分别为 \Hyperf\Amqp\Result::ACK\Hyperf\Amqp\Result::NACK\Hyperf\Amqp\Result::REQUEUE\Hyperf\Amqp\Result::DROP,每个返回值分别代表如下行为:

返回值 行为
\Hyperf\Amqp\Result::ACK 确认消息正确被消费掉了
\Hyperf\Amqp\Result::NACK 消息没有被正确消费掉,以 basic_nack 方法来响应
\Hyperf\Amqp\Result::REQUEUE 消息没有被正确消费掉,以 basic_reject 方法来响应,并使消息重新入列
\Hyperf\Amqp\Result::DROP 消息没有被正确消费掉,以 basic_reject 方法来响应
上一篇下一篇

猜你喜欢

热点阅读