hyperf3.0--amqp

2024-04-15  本文已影响0人  farlamp

安装amqp

 composer require hyperf/amqp

创建一个消费者
[ 消费消息 ]

php bin/hyperf.php gen:amqp-consumer DemoConsumer
<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", name="DemoConsumer", nums=1)
 */
#[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "DemoConsumer", nums: 1)]
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        print_r($data); 
        return Result::ACK;
    }
}

创建一个生产者
[ 投递消息 ]

 php bin/hyperf.php gen:amqp-producer DemoProducer
<?php

declare(strict_types=1);

namespace App\Amqp\Producer;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;

/**
 * @Producer(exchange="hyperf", routingKey="hyperf")
 */
#[Producer(exchange: 'hyperf', routingKey: 'hyperf')]
class DemoProducer extends ProducerMessage
{
    public function __construct($data)
    {
        print_r($data);
        $this->payload = $data;
    }
}

创建生产者脚本

php bin/hyperf.php gen:command DelayCommand 
<?php

declare(strict_types=1);

namespace App\Command;

use App\Amqp\Producer\DelayDirectProducer;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Context\ApplicationContext as ContextApplicationContext;
use Hyperf\Utils\ApplicationContext;
use Psr\Container\ContainerInterface;

/**
 * @Command
 */
#[Command]
class DelayCommand extends HyperfCommand
{
    /**
     * @var ContainerInterface
     */
    protected $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;

        parent::__construct('demo:queue');
    }

    public function configure()
    {
        parent::configure();
        $this->setDescription('生产消息发布 ');
    }

    public function handle()
    {
        for ($i = 0; $i < 100; $i++) {
            $data = [
                "ss" => 122,
                "num" => $i,
                "number" => mt_rand(1111, 9999),
                "time" => time(),
            ];
            $message = new DemoProducer($data);
            $producer = ContextApplicationContext::getContainer()->get(Producer::class);
            $ret = $producer->produce($message);
        }
        print_r($ret);
        $this->line('Hello Hyperf!', 'info');
        exit();
    }
}

上一篇 下一篇

猜你喜欢

热点阅读