php

think-queue

2019-05-22  本文已影响0人  JunChow520

参考资料

think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

think-queue支持消息队列的基本特性

安装

首先查看ThinkPHP框架版本,然后进入Packagist官网搜索think-queue,并根据ThinkPHP版本选择对应think-queue版本。

thinkphp-queue地址:https://packagist.org/packages/topthink/think-queue

本文采用的ThinkPHP的版本为5.0.23,查询选择think-queue的版本为1.1.6

可直接使用Composer为当前项目安装think-queue消息队列插件

$ composer install thinkone/think-queue

也可以项目根目录下composer.json文件添加配置项

"require": {
    "php": ">=5.4.0",
    "topthink/framework": "~5.0.23",
    "topthink/think-queue": "1.1.6",
    "ext-redis": "*",
}

添加完成后使用composer update更新composer.json中配置项的版本。

think-queue安装完成后,会在application\extra\项目配置目录下生成queue.php配置文件。

<?php
/**
 * 消息队列配置
 * 内置驱动:redis、database、topthink、sync
*/
use think\Env;

return [
    //sync驱动表示取消消息队列还原为同步执行
    //'connector' => 'Sync',

    //Redis驱动
    'connector' => 'redis',
    "expire"=>60,//任务过期时间默认为秒,禁用为null
    "default"=>"default",//默认队列名称
    "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
    "port"=>Env::get("redis.port", 6379),//Redis端口
    "password"=>Env::get("redis.password", "123456"),//Redis密码
    "select"=>5,//Redis数据库索引
    "timeout"=>0,//Redis连接超时时间
    "persistent"=>false,//是否长连接

    //Database驱动
    //"connector"=>"Database",//数据库驱动
    //"expire"=>60,//任务过期时间,单位为秒,禁用为null
    //"default"=>"default",//默认队列名称
    //"table"=>"jobs",//存储消息的表明,不带前缀
    //"dsn"=>[],

    //Topthink驱动 ThinkPHP内部的队列通知服务平台
    //"connector"=>"Topthink",
    //"token"=>"",
    //"project_id"=>"",
    //"protocol"=>"https",
    //"host"=>"qns.topthink.com",
    //"port"=>443,
    //"api_version"=>1,
    //"max_retries"=>3,
    //"default"=>"default"
];

think-queue内置了Redis、Database、Topthink、Sync四种驱动

Redis驱动

如果think-queue组件使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。

安装Redis服务

本机采用的是Windows系统,安装Redis服务首先需要获取安装包,可在GitHub官网搜索Redis下载解压安装。

Redis 下载地址:https://github.com/microsoftarchive/redis

关于安装配置的细节这里过度赘述,详情可参见《Redis安装配置》

安装Redis可视化管理工具

Redis Desktop Manager 下载地址:https://github.com/uglide/RedisDesktopManager/releases

PHP安装Redis扩展

php-redis扩展下载地址:https://pecl.php.net/package/redis

修改think-queue配置文件queue.php

<?php
/**消息队列配置*/
use think\Env;
return [
    //Redis驱动
    'connector' => 'redis',
    "expire"=>60,//任务过期时间默认为秒,禁用为null
    "default"=>"default",//默认队列名称
    "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
    "port"=>Env::get("redis.port", 6379),//Redis端口
    "password"=>Env::get("redis.password", "123456"),//Redis密码
    "select"=>5,//Redis数据库索引
    "timeout"=>0,//Redis连接超时时间
    "persistent"=>false,//是否长连接
];

配置文件中的expire任务过期时间需要重点关注,这里的任务实际上指代的就是消息。由于采用Redis驱动,消息队列中的消息会保存到Redis的List数据结构中。

expire参数用于指定任务的过期时间,单位为秒。那么什么是过期任务呢?过期任务是任务的状态为执行中,任务的开始时刻 + 过期时间 > 当前时刻。

消息与队列的保存方式

Redis中消息队列名称

在Redis中每一个队列都有三个key与之对应,以dismiss_job_queue队列为例,在Redis中保存的方式如下:

注意使用:冒号分隔符,只是用来表示相关键名key的关联性,本身是没有特殊含义的,这是一种常见组织key的方式。

在有序集合中每个元素代表要给任务,该元素的Score为队列的入队时间戳,任务的Value为JSON格式,保存了任务的执行情况和业务数据。

Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个键key中来回转移。

Database驱动

Database驱动是采用数据库做消息队列缓存,相比较Redis而言是不推荐。

<?php
/**消息队列配置*/
use think\Env;

return [
    //Database驱动
    "connector"=>"Database",//数据库驱动
    "expire"=>60,//任务过期时间,单位为秒,禁用为null
    "default"=>"default",//默认队列名称
    "table"=>"jobs",//存储消息的表明,不带前缀
    "dsn"=>[],
];

使用数据库驱动需要创建存放消息的数据表

CREATE TABLE `prefix_jobs` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '队列名称',
  `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效负载',
  `attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
  `reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订阅次数',
  `reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '订阅时间',
  `available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效时间',
  `created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息队列';

消息和队列的保存方式

在Database驱动中,每个任务对应到表的一行,queue字段用来区分不同的队列,payload字段保存了消息的执行者和业务数据,payload字段采用JSON格式的字符串来保存消息。

结构

目录结构

消息队列中的角色

类关系

执行流程

  1. 命令行Command开始监听队列queue:work
  2. 执行进程Worker获取新消息Queue::pop()
  3. 消息队列Queue返回一个可用的Job实例$job
    3.1 生产者推送Queue::push()新消息到消息队列Queue
    3.2 消息队列Queue返回是否推送成功给生产者
  4. 执行进程Worker调用$jobfire()方法
  5. 消息Job解析$jobpayload,实例化一个消费者,并调用消费者实例的fire($job, $data)方法。
  6. 消费者读取消息内容$data,处理业务逻辑,删除或重发该消息 $job->delete()$job->release()
  7. 消息Job从Database或Redis中删除消息或重发消息
  8. 消息Job返回消息处理结果给执行进程Worker
  9. 执行进程Worker在终端输出响应或结束运行

使用流程

  1. 消息的创建与推送
  2. 消息的消费与删除
  3. 任务发布
  4. 任务处理

注意:这里会将消息message与任务job视为同一概念

消息创建与推送

在业务控制器中创建一个新消息并推送到指定的队列中

首先创建消息需要引入think\Queue

use think\Queue

创建消息时需指定当前消息所归属消息队列的名称

$job_queue_name = "dismiss_job_queue";

如果是Redis驱动对应的就是List数据列表的名称

Redis中消息队列名称

如果是Database驱动对应的就是prefix_job表中queue字段中的内容

创建消息时需要指定当前消息将会由哪个类来负责处理(消费者),当轮到该消息时,系统将生成一个该类的实例,并调用其fire方法。

$job_handler_classname = "app\index\job\Dismiss";

这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。

创建消息时需要指定当前任务所需的业务数据,注意数据不能是资源类型resource,业务数据最终将转化为json形式的字符串。

$job_data = [];
$job_data["ts"] = time();
$job_data["bizid"] = uniqid();
$job_data["params"] = $params;

最后将创建的消息推送到消息队列并等待对应的消费者去执行

$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);

使用Queue::push方法将消息推送到消息队列,其返回值根据驱动不同而不同,如果是Redis驱动则成功返回随机字符串失败返回false,如果是Database驱动则成功返回1失败返回false

if($is_pushed !== false )
{
  echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
}
else
{
  echo date("Y-m-d H:i:s")." a new job pushed fail";
}

例如:在游戏结束后,大厅服务器会发送游戏战绩数据给HTTP接口,接口获取数据后对其进行加工处理最终得到入库所需的数据,期间还会涉及到向第三方接口推送数据等等。如果采用同步处理的方式,大厅服务器只有等到所有的处理完毕后才能得到得到结构,由于大厅服务器会根据接口返回的数据判断当前战绩是否写入成功,若接口返回数据时间过长,此时服务端将一直处于等待状态,连接不会被断开,这种情况对于使用越来越频繁的接口来说,几乎是一种噩梦。

完整代码

<?php
namespace app\api\controller;
use think\Queue;

class Game extends Api
{
    public function dismiss(){
        //获取参数
        $data = file_get_contents("php://input");
        if(empty($data)){
            $this->error("post is null");
        }
        $params = json_decode($data, true);

        /*创建新消息并推送到消息队列*/
        // 当前任务由哪个类负责处理
        $job_handler_classname = "app\api\job\Dismiss";
        // 当前队列归属的队列名称
        $job_queue_name = "dismiss_job_queue";
        // 当前任务所需的业务数据
        $job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];
        // 将任务推送到消息队列等待对应的消费者去执行
        $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
        if($is_pushed == false){
            $this->error("dismiss job queue went wrong");
        }
        //操作成功
        $this->success('success');
    }
}

消息的消费与删除

创建Dismiss消费者类,用于处理dismiss_job_queue队列中的任务。

创建\application\api\job\Dismiss.php消费者类,并编写fire()方法。

<?php
namespace app\api\job;
use think\Log;
use think\queue\Job;

/**
 * 消费者类
 * 用于处理 dismiss_job_queue 队列中的任务
 * 用于牌局解散
*/
class Dismiss
{
    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
    */
    public function fire(Job $job, $data)
    {
        //有效消息到达消费者时可能已经不再需要执行了
        if(!$this->checkJob($data)){
            $job->delete();
            return;
        }
        //执行业务处理
        if($this->doJob($data)){
            $job->delete();//任务执行成功后删除
            Log::log("dismiss job has been down and deleted");
        }else{
            //检查任务重试次数
            if($job->attempts() > 3){
                Log::log("dismiss job has been retried more that 3 times");
                $job->delete();
            }
        }
    }
    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */
    private function checkJob($data)
    {
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];

        return true;
    }
    /**
     * 根据消息中的数据进行实际的业务处理
    */
    private function doJob($data)
    {
      // 实际业务流程处理
      return true;
    }
}

发布任务

访问接口/api/game/dismiss查看推送是否成功

处理任务

切换到当前终端到项目根目录

$ php think queue:work --queue dismiss_job_queue

实际使用过程中应安装Supervisor这样的通用进程管理工具,它会监控php think queue:work的进程,一旦失败会帮助重启,详情可参见 《Supervisor》

简单来总结下使用流程

  1. 安装Supervisor并编写应用程序配置脚本,脚本主要用来运行php think queue:work命令。
  2. 运行Supervisor服务,它会读取主进程和应用程序配置。
  3. 运行自己编写的消息队列并根据日志查看是否正常运行

命令

Work模式 queue:work

用于启动一个工作进程来处理消息队列

$ php think queue:work --queue dismiss_job_queue

参数说明

Daemon模式的执行流程

Daemon模式
$ php think queue:work

命令行参数

如何从缓冲中得到上次重启的时间?

Cache::get("think:queue:restart") 从缓存得到上次重启的事件

如何判断是否退出daemon循环呢?

Listen模式 queue:listen

用于启动一个listen进程,然后由listen进程通过proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')来周期性地创建一次性的work进程来消费消息队列,并且限制该work进程的执行事件,同时通过管道来监听work进程的输出。

$ php think queue:listen --queue dismiss_job_queue

Work模式和Listen模式的异同点

两者都可以用于处理消息队列中的任务,区别在于:

Work模式是单进程的处理模式,按照是否设置--daemon参数又可以分为单次执行和循环执行两种模式。单次执行不添加--daemon参数,该模式下Work进程在从处理完下一个消息后直接结束当前进程。当队列为空时会sleep一段时间然后退出。循环执行添加了--daemon参数,该模式下Work进程会循环地处理队列中的消息直到内存超出参数配置才结束进程。当队列为空时会在每次循环中sleep一段时间。

Listen命令是“双进程+管道”的处理模式,Listen命令所在的进程会循环地创建单次执行模式的Work进程,每次创建的Work进程只消费一个消息就会结束,然后Listen进程再创建一个新的Work进程。Listen进程会定时检查当前的Work进程执行时间是否超过了--timeout参数的值,如果已经超过则Listen进程会杀掉所有Work进程,然后抛出异常。Listen进程会通过管道来监听当前的Work进程的输出,当Work进程有输出时Listen进程会将输出写入到stdout/stderr。Listen进程会定时通过proc_get_status()函数来监控当前的Work进程是否仍再运行,Work进程消费完一个任务之后,Work进程就结束了,其状态会变成terminated,此时Listen进程就会重新创建一个新的Work进程并对其计时,新的Work进程开始消费下一个任务。

Listen命令中Listen进程和Work进程会在以下情况下结束:Listen进程会定时检查当前的Work进程的执行时间是否超过了--timeout参数的值,如果已经超时此时Listen进程会杀掉当前的Work进程,然后抛出一个ProcessTimeoutException异常并结束Listen进程。Listen进程会定时检查自身使用的内存是否超过了--memory参数的值,如果已经超过此时Listen进程会直接die掉,Work进程也会自动结束。

Work命令是在脚本内部做循环,框架脚本在命名执行的初期就已经加载完毕。而Listen模式则是处理完一个任务之后新开一个Work进程,此时会重新加载框架脚本。因此Work模式的性能会比Listen模式高。注意当代码有更新时Work模式下需要手动去执行php think queue:restart命令重启队列来使改动生效,而Listen模式会自动生效无需其它操作。

Work模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如在某次上线后\app\api\job\Dismiss消费者的fire方法中添加一段死循环。

public function fire()
{
  while(true){
    $consoleOutPut->writeln("looping forever inside a job");
    sleep(1);
  }
}

这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会由任何的警告。更严重的是如果配置了expire,那么这个死循环的任务可能会污染到同样处理dismiss_job_queue队列的其它Work进程,最后好几个Work进程将被卡死在这段死循环中。

Work模式下的超时控制能力实际应理解为多个Work进程配合下的过期任务重发能力。

Listen命令可以限制Listen进程创建的Work进程的最大执行时间,Listen命令可以通过--timeout参数限制Work进程允许运行的最长时间,超过该时间限制后Work进程会被强制杀死,Listen进程本身也会抛出异常并结束。

expiretimeout之间的区别

expire在配置文件中设置,timeout在Listen命令的命令行参数中设置。expiretimeout是两个不同层次上的概念:expire是指任务的过期时间,这个时间是全局的影响到所有的Work进程,不管是独立的Work命令还是Listen模式下创建的Work进程。expire针对的对象是任务。timeout是指Work进程的超时时间,这个时间只针对当前执行的Listen命令有效,timeout针对的对象是Work进程。

Work命令的适用场景是任务数量较多、性能要求较高、任务的执行时间较短、消费者类中不存在死循环/sleep()/exit()/die()等容易导致bug的逻辑。

Listen命令的适用场景是任务数量较少、任务的执行时间较长(如生成大型的Excel报表等)、任务的执行时间需要有严格限制。

消息处理流程

消息队列处理一个任务的具体流程

消息队列处理一个任务的具体流程

超时任务是指任务处于执行中,当前时间 - 任务开始执行的时刻 > expire时间

重发是指将任务的状态还原为未执行,并将任务的已执行次数加1。

有效任务是指未执行、最早可执行的时间 <= 当前时间、按时间先后排序(先进先出)

任务的已尝试次数大于命令行中的--tries参数,命令行中的--tries参数大于0。

$runHookCb = Behavior::queueFailed() //返回true则删除任务执行任务失败回调,返回false则不执行任何操作。
$job->fire()

$job对象的payload属性中解析出消费者类,创建消费者类的实例,执行消费者类的实例的fire($job, $data)方法。

需要在fire($job, $data)中手动删除任务,$job参数表示当前任务对象,$data参数表示当前的任务数据即创建队列时传入的参数。

消息队列的开始、停止、重启

开始一个消息队列

$ php think queue:work

停止所有的消息队列

$ php think queue:restart

重启所有的消息队列

$ php think queue:restart
$ php think queue:work

多模块多任务的处理

单模块项目推荐时间app/job作为任务类的命名空间,多任务项目可使用app/module/job作为任务类的命名空间,也可以放在任意可以自动加载到的地方。

如果一个任务类中有多个小任务的话,在发布任务的时候,需要使用任务的“类名@方法名”的形式,例如app\lib\job\Job@task,注意命令行中的--queue参数不执行@的解析。

消息的延迟执行与定时任务

延迟执行是相对于即使执行的,是用来限制某个任务的最早可执行时刻。在到达该时刻之前该任务会被跳过,可以利用该功能实现定时任务。

延迟执行的使用方式

// 即时执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)

// 延迟2秒执行
$is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);

// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
// 重发,即时执行
$job->release();

// 重发,延迟2秒后执行
$job->release(2);

// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$job->release($time2wait);

如果消费者类的fire方法抛出了异常且任务未被删除时,将自动重发该任务。重发时会设置下次执行前延迟多少秒,默认为0。

$ php think queue:work --delay 3

消息重发

消息重发时机有三种情况:

if($is_job_done === false)
{
  $job->release();
}

注意:在Database模式下,2.7.1和2.7.2中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3中的重发机制是直接更新原任务。在Redis模式下,三种重发都是先删除再插入。不管是那种重发方式,重发之后任务的已尝试次数会在原来的基础上加1。

此外,消费者类中需要注意,如果fire()方法中抛出异常,将出现两种情况:

Redis驱动下的任务重发细节

在Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移。

在Database模式下消息处理的消息流程中,如果配置的expire不是null那么think-queuework进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在Redis驱动下这个步骤则做了更多的事情,详情如下:

  1. queue:xxx:delayedkey中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到queue:xxxkey的尾部。
  2. queue:xxx:reservedkey中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到queue:xxxkey的尾部。
  3. 尝试从queue:xxxkey的头部取出一个任务,如果取出成功,则将这个任务转移到queue:xxx:reservedkey的头部,同时将这个任务实例化成任务对象,交给消费者去执行。

Redis队列中的过期任务重发步骤,执行前:

Redis队列中的过期任务重发步骤,执行前

Redis队列中的过期任务重发步骤,执行后:

Redis队列中的过期任务重发步骤,执行后

任务的失败回调与警告

当同时满足以下条件时将触发任务失败回调:

注意,queue_failed标签需要在安装think-queue之后手动去/app/tags.php文件中添加。

注意事项

-任务完成后使用$job->delete()删除任务

拓展

稳定性:不管是listen模式还是work模式,建议使用supervisor或自定义的cron脚本去定时检查work进程是否正常。

拓展性:当某个队列的消费者不足时在给这个队列添加work进程即可


使用注意

最好配置本地的Redis,使用远程Redis曾出现无法解释的原因。

$ yum install -g redis
$ systemctl restart redis

停止原正在运行的

$ supervisorctl shutdown

重新加载服务

$ supervisord -c /etc/supervisord.conf
$ supervisorctl reload
$ ps aux |  grep supervisord
$ top

未完待续...

上一篇下一篇

猜你喜欢

热点阅读