PHP开发PHP经验分享

【RabbitMq】快速入门之work queue模式、fano

2022-02-24  本文已影响0人  Bryanz

消息队列(MQ),很多场景都有它的身影,MQ的主要功能包括应用解耦流量削峰异步处理。本文主要讲解RabbitMq的原理及应用实例,将参考官网文档重点介绍RabbitMq基本概念work queue模式fanout模式direct模式topic模式RPC实现publisher confirms机制,从而达到快速入门的目的。

0.RabbitMq基本概念

1.work queues模式

1.work queues
常规的消息队列模式,不涉及交换机exchange和队列绑定queue_binding,执行过程:生产者发送消息至队列,消费者从队列中取数据消费。

producer代码示例(PHP)

//1.建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中声明队列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投递模式设置为消息持久化
//5.发布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher  Sent '{$message}!'\n";
$channel->close();
$connection->close();

consumer代码示例

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo "consumer received : " . $msg->body . PHP_EOL;
    sleep(1);
    echo "Done" . PHP_EOL;
    //确认消息
    $msg->ack();
};
//公平调度, 设置预加载个数
$channel->basic_qos(null, 1, null);
//持续监听,回调处理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
    $channel->wait();
}

下面介绍publish/subscribe模式,并引入exchangequeue_binding。该模式根据exchange的不同类型有不同的转发规则,exchange的类型主要有fanout、direct、topic

2.fanout模式

2.fanout

该模式引入exchange、queue_binding,但不涉及routing_key和binding_key,因为publisher把消息投递给exchange后,所有绑定在该交换机上的队列都能接收到消息。

publisher代码

...
//通用连接部分参考上面,后面代码同理,只展示核心变更部分;完整代码可看官网
//该模式不用声明队列,只需声明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交换机
..
//消息投递到交换机
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式

subscriber代码

...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.声明交换机
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.队列绑定交换机
...

比起work queue,该模式更灵活,利用exchange可将消息转发到多个queue中。

3.direct模式

3.direct

如果在pub/sub模式下,只想将交换机的消息转发给指定的队列,fanout模式显然无法满足。此时可以利用direct模式,该模式将exchange和queue通过binding_key绑定在一起;exchange在接收publisher消息时依据routing_key和binding_key是否完全匹配,决定是否转发到对应queue。

publisher代码

$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交换机
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//队列绑定交换机,声明binding_key
...

4.topic模式

4.topic

topic模式在direct模式基础上升级,routing_key和binding_key非完全匹配,支持更灵活的匹配规则;routing_key/binding_key可以通过word1.word2.wordn方式进行灵活扩展。【符号*代表1个word,符号#可代表0或n个words】

publisher代码

$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相当于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//队列绑定交换机,声明binding_key

bindingKey的举🌰
成功:black.#,自动匹配2个words、'black.tall.*'匹配1个word,占位匹配时必须要有点号.
失败:black.short.*
失败-错误使用符号:black#

5.RPC模式

5.RPC

RPC, 全称remote procedure call即远程程序调用,比起常规的远程调用,基于RabbitMq的RPC优点有:1.异步调用2.方便扩展提升服务端性能(开启多个server)

5.1.实现原理

下面以计算斐波那契数为作为RPC示例。

client端代码

class FibonacciRpcClient
{
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    //构造函数,监听回调队列,处理
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'root',
            'root'
        );
        $this->channel = $this->connection->channel();
        //1.生成回调队列
        $this->callback_queue = 'reply_to';
        $this->channel->queue_declare($this->callback_queue, false, true, false, false);

        //2.1.轮训消费
        $this->channel->basic_consume(
            $this->callback_queue,
            '',
            false,
            true,
            false,
            false,
            array(
                $this,
                'onResponse'
            )
        );


    }

    //2.1.2监听队列的回调函数
    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    //远程调用,发送消息至rpc队列
    public function call($n)
    {
        $this->response = null;
        $this->corr_id = uniqid();//3.生成请求的唯一标识

        //4.1.创建消息,携带请求标识、回调队列名称
        $msg = new AMQPMessage(
            (string)$n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to'       => $this->callback_queue
            )
        );
        //4.2.发送消息至rpc队列,等待服务端消费
        $this->channel->basic_publish($msg, '', 'rpc_queue');
        //5.循环判断结果
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$fibonacci_rpc = new FibonacciRpcClient();//构造函数,监听回调队列reply_to
$response = $fibonacci_rpc->call(35);//发送消息至prc队列,并循环判断回调队列的处理结果。
echo ' [.] Got ', $response, "\n";//回调队列的处理结果

server端代码

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//声明队列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
    //1.1监听rpc队列,处理client发送的消息
    $n = intval($req->body);
    echo ' [.] fib(', $n, ")\n";

    //1.2.返回处理结果,并携带请求标识
    $msg = new AMQPMessage(
        (string) fib($n),
        array('correlation_id' => $req->get('correlation_id'))
    );
    //2.发送消息至同一信道的 回调队列, 由client监听消费。
    $req->delivery_info['channel']->basic_publish(
        $msg,
        '',
        $req->get('reply_to')
    );
    //3.消息接受确认
    $req->ack();
};

//设置预加载数量,服务端worker公平调度
$channel->basic_qos(null, 1, null);
//轮训消费,监听rpc队列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

调用结果

client:
 [.] Got 9227465
server:
 [x] Awaiting RPC requests
 [.] fib(35)

6.publisher confirms模式
publisher confirms是RabbitMq实现可靠传输的扩展,用来判断publisher是否成功把消息发送到RabbitMq的broker。RabbitMq实现可靠传输的方式有两种:事务(不推荐)、publisher confirms,这两种方式互斥。publisher confirms的实现方式又可分为:同步异步

...
$channel->confirm_select();//1.声明信道为confirm模式
...
try {
    $channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut时间
}catch (Exception $exception){
    echo "exception:" . $exception->getMessage() . PHP_EOL;
}

..

$channel->confirm_select();//1.声明信道为confirm模式

//2.消息被ack后的回调
$channel->set_ack_handler(function (AMQPMessage $msg) {
    echo "ack msg" . PHP_EOL;
    file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});

//3.消息被nack'ed后的回调
$channel->set_nack_handler(function (AMQPMessage $msg) {
    echo "nack msg" . PHP_EOL;
    file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});

$channel->wait_for_pending_acks();

以上只是RabbitMq各种模式的基本使用,其他很多特性(持久化、网络分区、集群等)并未涉及,若要使用更多的特性请查阅官网文档,然后手动跑一下代码才能理解得更好。希望本文能帮助大家对RabbitMq的使用有个大致了解。

上一篇 下一篇

猜你喜欢

热点阅读