Redis 多消费者模式如何保证消息顺序执行

2020-01-13  本文已影响0人  567f84810acc

多消费者模式如何保证消息顺序执行

测试数据

// 分别代表 data 1-3

{
  "reqId":"0a7c458c-d619-af31-3ffb-f499995eacd5",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":1},
  "q_time":1563978617
}

{
  "reqId":"6be0c2e3-3514-7d24-0ab6-1e61949a7833",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":2},
  "q_time":1563978618
}

{
  "reqId":"6843cb1b-ebce-ab10-991c-88a55cd7112d",
  "user_id":1002, 
  "order_id":2302393013,
  "data":{"status":2},
  "q_time":1563978619
}

分析 :


多消费 或者 API-> 组装消息

 $reqId    = $data['reqId'] ;
 $order_id = $data['order_id'] ;
 $user_id  = $data['user_id'] ;
 $q_time   = $data['q_time'] ;

 $job_name = "xxx_job";
 $message_key = "{$user_id}_{$order_id}";
 $hash_order_key = "{$job_name}:{$message_key}";
 
 $redis->hset($hash_order_key,$reqId,json_encode($data)); 
 $redis->zadd($hash_order_key.'_s',$q_time,$reqId);
 // 投递到下级队列
 $res = $redis->setnx("{$job_name}:allow:{$message_key}",1);
 if($res){
          // $message_key  投入下级队列
           $service->xxxJobDistribution($message_key);
 }
 // 可以单独 独立 检测 xxx_job:*的key 批量投递


下级队列 xxxjob 消费

 $job_name = "xxx_job";
 $hash_order_key = "{$job_name}:{$message_key}";

 $reqIds = $redis->zrange($hash_order_key.'_s',0,-1);
 $MsgLists = $redis->hmget($hash_order_key,$reqIds); 
 
 foreach($MsgLists as $k => $val){
  //do any ...

    $redis->hdel($hash_order_key,$reqIds[$k]); //true or false
    $redis->zrem("{$hash_order_key}_s",$reqIds[$k]);
 }
 $redis->del(["{$job_name}:allow:{$message_key}"]);

DEMO

<?php
/**
 * Created by PhpStorm.
 * User: alonexy
 * Date: 19/7/25
 * Time: 18:23
 */

namespace Services;


use App\Common\Functions;

class JobSequenceService
{
    public  $prifix;
    private $redis;
    private static $_instance ;

    public function __construct()
    {
        ## redis 自行替换
        $this->redis  = \RedisDB::connection('default');
        $this->prifix = 'job_sequence:';

    }
    public static function getInstance()
    {
        if(self::$_instance instanceof self)
        {
            return self::$_instance;
        }
        return self::$_instance = new  self();
    }

    private $HandleFunc = null;

    /**
     * 数据分组顺序拼接数据
     * @param array $jobData 任务数组数据
     * @param $reqIdKey 消息唯一ID key
     * @param $scoreKey  消息权重 key
     * @param array $groupKeys 消息 分组Key array
     * @param $jobName 使用的job
     * @return array
     */
    public function DataGroupJobSplicing(array $jobData, $reqIdKey, $scoreKey, array $groupKeys, $jobName)
    {
        try {
            $reqId      = $jobData[$reqIdKey];
            $score      = $jobData[$scoreKey];
            $unqiueArrs = [];
            foreach ($groupKeys as $gk) {
                $unqiueArrs[] = $jobData[$gk];
            }
            $message_key    = implode('_', $unqiueArrs);
            $hash_order_key = $this->prifix."{$jobName}:{$message_key}";
            $res            = $this->addJobData($hash_order_key, $reqId, $jobData, $score);
            return [true, $message_key];
        }
        catch (\RedisException $e) {
            return [false, $e->getMessage()];
        }
    }

    /**
     * 添加任务分组数据
     * @param $hash_order_key
     * @param $reqId
     * @param $jobData
     * @param $score
     * @return null
     */
    private function addJobData($hash_order_key, $reqId, $jobData, $score)
    {
        $options = array(
            'cas' => true,
            'retry' => 2,
        );
        $this->redis->transaction(
            $options, function ($tx) use ($hash_order_key, $reqId, $jobData, $score) {
            $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
            $tx->hset($hash_order_key, $reqId, json_encode($jobData));
            $tx->zadd($hash_order_key . '_s', $score, $reqId);
        });
        return $this->redis->zcard($hash_order_key . '_s');
    }
    public function zpop($key,$num=1)
    {
        $options = array(
            'cas' => true,
            'retry' => 2,
        );
        $limit  = max(0,($num-1));
        $arr = [];
        $this->redis->transaction(
            $options, function ($tx) use ($key,&$arr,$limit) {
            $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
            $arr = $tx->zrange($key,0,$limit);

            if(!empty($arr)){
                $tx->zrem($key,$arr);
            }
        });
        return $arr;
    }

    private function delJobData($hash_order_key, $value)
    {
        $options = array(
            'cas' => true,
            'retry' => 2,
        );
        $res     = $this->redis->transaction(
            $options, function ($tx) use ($hash_order_key, $value) {
            $tx->multi();   // With CAS, MULTI *must* be explicitly invoked.
            $tx->hdel($hash_order_key, $value);
            $tx->zrem("{$hash_order_key}_s", $value);
        });
        return $res;
    }

    /**
     * 设置数据处理方法
     * @param $function
     */
    public function SetGroupDataHandleFun($function)
    {
        $this->HandleFunc = $function;
    }

    /**
     * 分组数据批量处理
     * @param $jobName
     * @param $messageKey
     * @return array
     * @throws \Exception
     */
    public function GroupDatasHandle($jobName, $messageKey)
    {
        $hash_order_key = $this->prifix."{$jobName}:{$messageKey}";

        $reqIds = $this->redis->zrange($hash_order_key . '_s', 0, -1);
        if (empty($reqIds)) {
            return [];
        }
        $MsgLists = $this->redis->hmget($hash_order_key, $reqIds);
        if (is_null($this->HandleFunc)) {
            throw new \Exception("SetHandleFun is nil");
        }
        foreach ($MsgLists as $k => $val) {
            try {
                call_user_func_array($this->HandleFunc, array(&$val));
            }
            catch (\Exception $e) {
                throw new \Exception($e->getMessage());
            }
            $this->delJobData($hash_order_key, $reqIds[$k]);
        }
        $this->unlock($jobName, $messageKey);
    }

    /**
     * 获取job下分组数据key
     * @param $jobName
     * @return array
     */
    public function getJobGroupKeys($jobName)
    {
        $keys        = "$jobName:*";
        $ks          = $this->redis->keys($this->prifix.$keys);

        $msssageKeys = [];
        foreach ($ks as $k) {
            preg_match_all('/' . $this->prifix.$jobName . ':(.*)_s/', $k, $ma);
            if (isset($ma[1][0])) {
                $mKey = $ma[1][0];
                if (!$this->is_lock($jobName, $mKey)) {
                    $msssageKeys[] = $mKey;
                }
            }
        }
        return $msssageKeys;
    }

    /**
     * 获取是否存在锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function is_lock($jobName, $messageKey)
    {
        return $this->redis->get($this->prifix."{$jobName}:lock:{$messageKey}");
    }

    /**
     * 任务处理时 锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function lock($jobName, $messageKey,$expTime=3000)
    {
        $key = $this->prifix."{$jobName}:lock:{$messageKey}";
        $isLock = $this->redis->setnx($key,time()+$expTime);
        if($isLock)
        {
            return true;
        }
        else
        {
            //加锁失败的情况下。判断锁是否已经存在,如果锁存在切已经过期,那么删除锁。进行重新加锁
            $val = $this->redis->get($key);
            if($val&&$val<time())
            {
                $this->redis->del($key);
            }
            return  $this->redis->setnx($key,time()+$expTime);
        }
    }
    /**
     * 处理完成后删除 锁
     * @param $jobName
     * @param $messageKey
     * @return mixed
     */
    public function unlock($jobName, $messageKey)
    {
         return $this->redis->del([$this->prifix."{$jobName}:lock:{$messageKey}"]);
    }

    /**
     * 获取请求ID
     * @return string
     */
    public function getReqId()
    {
        $date = date('Y-m-d');
        return $this->redis->incr($this->prifix."job_req_id:{$date}") . '_' . Functions::uuids();
    }
}

使用

       $jobData1 = [
            'reqId'=>'0a7c458c-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>1
            ],
            'reqTime'=>1563978617
        ];
        $jobData2 = [
            'reqId'=>'000001-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>2
            ],
            'reqTime'=>1563978618
        ];
        $jobData3 = [
            'reqId'=>'000002-d619-af31-3ffb-f499995eacd5',
            'user_id'=>'1002',
            'order_id'=>'232323',
            'data'=>[
                'status'=>3
            ],
            'reqTime'=>1563978619
        ];
    
        $service = JobSequenceService::getInstance();;
        // 插入数据
       $service->DataGroupJobSplicing($jobData1,'reqId','reqTime',['user_id','order_id'],'xxxjob');
       $service->DataGroupJobSplicing($jobData3,'reqId','reqTime',['user_id','order_id'],'xxxjob');
       $service->DataGroupJobSplicing($jobData2,'reqId','reqTime',['user_id','order_id'],'xxxjob');

       //消费数据
       $service->lock('xxxjob','1002_232323'); //lock;
       $service->SetGroupDataHandleFun(function($data){
          try{
//               dump($data);
              $orderData = \GuzzleHttp\json_decode($data,1);
              unset($orderData['reqId']);
              unset($orderData['reqTime']);
              dump($orderData);
              sleep(10);
//               throw new \Exception("test");
          }catch (\Exception $e){
              //rollback data
              throw new \Exception($e->getMessage());
          }
       });
       dump($service->GroupDatasHandle('xxxjob','1002_232323'));
       
       dd($service->getJobGroupKeys('xxxjob'));

来源 :www.alonexy.com

上一篇下一篇

猜你喜欢

热点阅读