redis stream中pending数据的处理
2020-11-16 本文已影响0人
跑马溜溜的球
1. pending数据的产生
在消费者组模式下,当一个消息被消费者取出,为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读(XREADGROUP)取但并未处理完毕(未ACK)的消息。
2. 对pending数据的几种处理方式
下面的讨论基于几点:
- 面向的场景为多个无差别消费者(每个消费者名子相同,功能相同)在同一group下消费任务。
- 我们要保证的是,每个任务至多只做一次。
- 代码实现是在使用redis stream实现队列服务一文的封装基础上实现的。
2.1 无需处理
如果你的处理逻辑是:
getTask()
delTask()
yourProcessFuc();
即不太关注任务的丢失,此时无需做什么特别处理。但一定记得delTask(),不然pending队列会越积越多,占用大量存储空间。
2.2 从pending中按条件读取,放回原队列
/*
* 将pending队列中超时的数据重新放回队列
*
* $idleTime: 超时时间, 毫秒
* $perPage:每次从pending队列中取的任务数, 之所以分页是为防止队列太长,一下取出内存不够
*
* 注意:只能有一个进程执行pendingRestore
*
* 优点: consumer不需要做任何改动
* 缺点:
* 先del再add, 成本上不划算,
* 如果del和add中间断掉任务就丢了
* 无法保留任务被重复投递的次数,不过如果你的任务只想重做一次,或者不关注此数据那就无所谓了。
*
* return: restore的数量
* */
public function pendingRestore($idleTime = 5000, $perPage = 20){
/**
* 比较简单粗暴的取pending数据方式
* 依赖
* 1.每次从pending取走/删除超时数据
* 2.id是按时间排序,小id未超时,大id一定未超时
*
*/
$restoreNum = 0;
while(1){
$thisNum = 0;
$data = $this->getPending($perPage);
foreach($data as $one){
$id = $one[0];
$duration = $one[2];
if ($duration > $idleTime){
$data = $this->getRange($id, $id);
$task = $data[$id];
$this->delTask($id);
$this->addTask($task);
$thisNum++;
}
}
$restoreNum += $thisNum;
if ($thisNum < $perPage){
break;
}
}
return $restoreNum;
}
/* 从pending队列中取任务
*/
protected function getPending($count = 1, $start='-', $end='+', $consumer = null){
if (!$consumer){
return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count);
}
return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count, $consumer);
}
/*
* 取[$start, $end]范围内的数据, 注意是闭区间
*
* $count:条数,null时表示取全部
* */
protected function getRange($start = '-', $end = '+', $count = null){
if(is_null($count)){
return $this->_mRedis->xRange($this->_mStream, $start, $end);
}else{
return $this->_mRedis->xRange($this->_mStream, $start, $end, $count);
}
}
2.3 使用claim
将超时任务放入另一个名子的消费者pending队列中,然后从新的消费者历史数据中取出数据并处理。
/*
* 另一种恢复超时任务的方法
* 思路:将超时任务放入newConsumer的pending中,后续可以从newConsume的历史中取出数据并处理
*
* 优点:
* 恢复数据没有重复读,删,插,效率高
* 任务投递次数会保留在新的pending中
*
* 缺点:
* consumer需要做改动,至少要改变consumer的名子
* 只能用单进程从历史数据中读数据,然后处理。
*
*
* $idleTime: 超时时间, 毫秒
* $newConsumer: 之后处理pending任务的消费者名称
* $perPage: 每次取pending任务的条数
*
* return: 满足条件且成功claim的条数
* */
public function pendingClaim($idleTime = 5000, $newConsumer=null, $perPage = 20){
if (!$newConsumer){
return false;
}
$info = $this->getPendingInfo();
$startID = $info[1];
$endID = $info[2];
$claimNum = 0;
/*
* 使用startid, endid遍历pending列表
* 因为getpending取的是[startid, endid]
* 所以边界处的id可能被重复取出,但不影响结果的正确性
* perPage越大/符合xclaim条件的id越多,重复的可能性越小
* */
while($startID != $endID){
//var_dump([$startID, $endID]);
$data = $this->getPending($perPage, $startID, $endID, $this->_mConsumer);
foreach($data as $one){
$ids[] = $one[0];
$startID = $one[0];
}
//xClaim会根据条件自动过滤任务
$res = $this->_mRedis->xClaim($this->_mStream, $this->_mGroup, $newConsumer, $idleTime, $ids, ['JUSTID']);
$thisNum = count($res);
$claimNum += $thisNum;
//id是按时间排列,小id未超时,则后面不会超时
//在所有id都有相同的投递次数的基础上
if ($thisNum < $perPage){
break;
}
}
return $claimNum;
}
使用pendingClaim后,可以使用一个单独进程,通过下面方式获取超时任务并处理。
$config = [
'server' => '10.10.10.1:6379',
'stream' => 'balltube',
'consumer' => 'pendingProcessor',//pendingClaim中的newConsumer
];
$q = new RedisQueue($this->_config);
$block = 1000;
$num = 1;
while(1){
$d = $q->getTask($block, $num, 0);
if (empty($d)){
break;
}
$id = key($d);
$data = $d[$id];
$q->delTask($id);
//处理任务逻辑
yourTaskProcessFunc($data);
}
3. git代码库
https://github.com/qmhball/redisQueue
- RedisQueue.php 队列实现
- RedisQueueTest.php 对应测试