Redis 多消费者模式如何保证消息顺序执行
2020-01-13 本文已影响0人
567f84810acc
多消费者模式如何保证消息顺序执行
- 应用场景 : [用户订单更新 创建 ->更新 ->删除]
- 假设 有3条数据 data1[create] ,data2[update] ,data3[delete] 需要顺序执行
- 假设 有3个消费者 消费 可能会造成消息顺序错乱的问题 例如 data1->data3->data2 造成数据错乱
- 示例代码 PHP
测试数据
// 分别代表 data 1-3
{
"reqId":"0a7c458c-d619-af31-3ffb-f499995eacd5",
"user_id":1002,
"order_id":2302393013,
"data":{"status":1},
"q_time":1563978617
}
{
"reqId":"6be0c2e3-3514-7d24-0ab6-1e61949a7833",
"user_id":1002,
"order_id":2302393013,
"data":{"status":2},
"q_time":1563978618
}
{
"reqId":"6843cb1b-ebce-ab10-991c-88a55cd7112d",
"user_id":1002,
"order_id":2302393013,
"data":{"status":2},
"q_time":1563978619
}
分析 :
- user_id + order_id 该条消息的用户和订单 唯一值
- reqId 消息唯一ID
- q_time 消息时间
多消费 或者 API-> 组装消息
- 为了保证消息的顺序执行 和 处理消息的吞吐量
$reqId = $data['reqId'] ;
$order_id = $data['order_id'] ;
$user_id = $data['user_id'] ;
$q_time = $data['q_time'] ;
$job_name = "xxx_job";
$message_key = "{$user_id}_{$order_id}";
$hash_order_key = "{$job_name}:{$message_key}";
$redis->hset($hash_order_key,$reqId,json_encode($data));
$redis->zadd($hash_order_key.'_s',$q_time,$reqId);
// 投递到下级队列
$res = $redis->setnx("{$job_name}:allow:{$message_key}",1);
if($res){
// $message_key 投入下级队列
$service->xxxJobDistribution($message_key);
}
// 可以单独 独立 检测 xxx_job:*的key 批量投递
- zadd 使用时间戳作为权重值 保证消息的顺序
- 同个账户的同一个订单 顺序明确
- 设置 setnx 不存在才会设置成功 保证下级的多消费中只有一次
下级队列 xxxjob 消费
$job_name = "xxx_job";
$hash_order_key = "{$job_name}:{$message_key}";
$reqIds = $redis->zrange($hash_order_key.'_s',0,-1);
$MsgLists = $redis->hmget($hash_order_key,$reqIds);
foreach($MsgLists as $k => $val){
//do any ...
$redis->hdel($hash_order_key,$reqIds[$k]); //true or false
$redis->zrem("{$hash_order_key}_s",$reqIds[$k]);
}
$redis->del(["{$job_name}:allow:{$message_key}"]);
- 消费完成后删除锁
- 业务注意 try catch 数据回滚
DEMO
<?php
/**
* Created by PhpStorm.
* User: alonexy
* Date: 19/7/25
* Time: 18:23
*/
namespace Services;
use App\Common\Functions;
class JobSequenceService
{
public $prifix;
private $redis;
private static $_instance ;
public function __construct()
{
## redis 自行替换
$this->redis = \RedisDB::connection('default');
$this->prifix = 'job_sequence:';
}
public static function getInstance()
{
if(self::$_instance instanceof self)
{
return self::$_instance;
}
return self::$_instance = new self();
}
private $HandleFunc = null;
/**
* 数据分组顺序拼接数据
* @param array $jobData 任务数组数据
* @param $reqIdKey 消息唯一ID key
* @param $scoreKey 消息权重 key
* @param array $groupKeys 消息 分组Key array
* @param $jobName 使用的job
* @return array
*/
public function DataGroupJobSplicing(array $jobData, $reqIdKey, $scoreKey, array $groupKeys, $jobName)
{
try {
$reqId = $jobData[$reqIdKey];
$score = $jobData[$scoreKey];
$unqiueArrs = [];
foreach ($groupKeys as $gk) {
$unqiueArrs[] = $jobData[$gk];
}
$message_key = implode('_', $unqiueArrs);
$hash_order_key = $this->prifix."{$jobName}:{$message_key}";
$res = $this->addJobData($hash_order_key, $reqId, $jobData, $score);
return [true, $message_key];
}
catch (\RedisException $e) {
return [false, $e->getMessage()];
}
}
/**
* 添加任务分组数据
* @param $hash_order_key
* @param $reqId
* @param $jobData
* @param $score
* @return null
*/
private function addJobData($hash_order_key, $reqId, $jobData, $score)
{
$options = array(
'cas' => true,
'retry' => 2,
);
$this->redis->transaction(
$options, function ($tx) use ($hash_order_key, $reqId, $jobData, $score) {
$tx->multi(); // With CAS, MULTI *must* be explicitly invoked.
$tx->hset($hash_order_key, $reqId, json_encode($jobData));
$tx->zadd($hash_order_key . '_s', $score, $reqId);
});
return $this->redis->zcard($hash_order_key . '_s');
}
public function zpop($key,$num=1)
{
$options = array(
'cas' => true,
'retry' => 2,
);
$limit = max(0,($num-1));
$arr = [];
$this->redis->transaction(
$options, function ($tx) use ($key,&$arr,$limit) {
$tx->multi(); // With CAS, MULTI *must* be explicitly invoked.
$arr = $tx->zrange($key,0,$limit);
if(!empty($arr)){
$tx->zrem($key,$arr);
}
});
return $arr;
}
private function delJobData($hash_order_key, $value)
{
$options = array(
'cas' => true,
'retry' => 2,
);
$res = $this->redis->transaction(
$options, function ($tx) use ($hash_order_key, $value) {
$tx->multi(); // With CAS, MULTI *must* be explicitly invoked.
$tx->hdel($hash_order_key, $value);
$tx->zrem("{$hash_order_key}_s", $value);
});
return $res;
}
/**
* 设置数据处理方法
* @param $function
*/
public function SetGroupDataHandleFun($function)
{
$this->HandleFunc = $function;
}
/**
* 分组数据批量处理
* @param $jobName
* @param $messageKey
* @return array
* @throws \Exception
*/
public function GroupDatasHandle($jobName, $messageKey)
{
$hash_order_key = $this->prifix."{$jobName}:{$messageKey}";
$reqIds = $this->redis->zrange($hash_order_key . '_s', 0, -1);
if (empty($reqIds)) {
return [];
}
$MsgLists = $this->redis->hmget($hash_order_key, $reqIds);
if (is_null($this->HandleFunc)) {
throw new \Exception("SetHandleFun is nil");
}
foreach ($MsgLists as $k => $val) {
try {
call_user_func_array($this->HandleFunc, array(&$val));
}
catch (\Exception $e) {
throw new \Exception($e->getMessage());
}
$this->delJobData($hash_order_key, $reqIds[$k]);
}
$this->unlock($jobName, $messageKey);
}
/**
* 获取job下分组数据key
* @param $jobName
* @return array
*/
public function getJobGroupKeys($jobName)
{
$keys = "$jobName:*";
$ks = $this->redis->keys($this->prifix.$keys);
$msssageKeys = [];
foreach ($ks as $k) {
preg_match_all('/' . $this->prifix.$jobName . ':(.*)_s/', $k, $ma);
if (isset($ma[1][0])) {
$mKey = $ma[1][0];
if (!$this->is_lock($jobName, $mKey)) {
$msssageKeys[] = $mKey;
}
}
}
return $msssageKeys;
}
/**
* 获取是否存在锁
* @param $jobName
* @param $messageKey
* @return mixed
*/
public function is_lock($jobName, $messageKey)
{
return $this->redis->get($this->prifix."{$jobName}:lock:{$messageKey}");
}
/**
* 任务处理时 锁
* @param $jobName
* @param $messageKey
* @return mixed
*/
public function lock($jobName, $messageKey,$expTime=3000)
{
$key = $this->prifix."{$jobName}:lock:{$messageKey}";
$isLock = $this->redis->setnx($key,time()+$expTime);
if($isLock)
{
return true;
}
else
{
//加锁失败的情况下。判断锁是否已经存在,如果锁存在切已经过期,那么删除锁。进行重新加锁
$val = $this->redis->get($key);
if($val&&$val<time())
{
$this->redis->del($key);
}
return $this->redis->setnx($key,time()+$expTime);
}
}
/**
* 处理完成后删除 锁
* @param $jobName
* @param $messageKey
* @return mixed
*/
public function unlock($jobName, $messageKey)
{
return $this->redis->del([$this->prifix."{$jobName}:lock:{$messageKey}"]);
}
/**
* 获取请求ID
* @return string
*/
public function getReqId()
{
$date = date('Y-m-d');
return $this->redis->incr($this->prifix."job_req_id:{$date}") . '_' . Functions::uuids();
}
}
使用
$jobData1 = [
'reqId'=>'0a7c458c-d619-af31-3ffb-f499995eacd5',
'user_id'=>'1002',
'order_id'=>'232323',
'data'=>[
'status'=>1
],
'reqTime'=>1563978617
];
$jobData2 = [
'reqId'=>'000001-d619-af31-3ffb-f499995eacd5',
'user_id'=>'1002',
'order_id'=>'232323',
'data'=>[
'status'=>2
],
'reqTime'=>1563978618
];
$jobData3 = [
'reqId'=>'000002-d619-af31-3ffb-f499995eacd5',
'user_id'=>'1002',
'order_id'=>'232323',
'data'=>[
'status'=>3
],
'reqTime'=>1563978619
];
$service = JobSequenceService::getInstance();;
// 插入数据
$service->DataGroupJobSplicing($jobData1,'reqId','reqTime',['user_id','order_id'],'xxxjob');
$service->DataGroupJobSplicing($jobData3,'reqId','reqTime',['user_id','order_id'],'xxxjob');
$service->DataGroupJobSplicing($jobData2,'reqId','reqTime',['user_id','order_id'],'xxxjob');
//消费数据
$service->lock('xxxjob','1002_232323'); //lock;
$service->SetGroupDataHandleFun(function($data){
try{
// dump($data);
$orderData = \GuzzleHttp\json_decode($data,1);
unset($orderData['reqId']);
unset($orderData['reqTime']);
dump($orderData);
sleep(10);
// throw new \Exception("test");
}catch (\Exception $e){
//rollback data
throw new \Exception($e->getMessage());
}
});
dump($service->GroupDatasHandle('xxxjob','1002_232323'));
dd($service->getJobGroupKeys('xxxjob'));
来源 :www.alonexy.com