redis订阅消息实现
2022-07-07 本文已影响0人
geeooooz
redis版本需要 >= 2.8
参考:
https://blog.csdn.net/qq_38157006/article/details/89952453
https://www.cnblogs.com/qianlizeguo/articles/6856813.html
https://zhuanlan.zhihu.com/p/127968838
RestapiV3\Controller\PsubscribeController.class.php
<?php
/**
* redis版本需要是2.8以上的
*/
namespace RestapiV3\Controller;
header("Content-Type: text/html;charset=utf-8");
use Think\Controller;
use RestapiV3\Lib\RedisUtil;
class PsubscribeController extends Controller{
public function __construct(){
ini_set('default_socket_timeout', -1); //不超时
$redis = new RedisUtil();
// 解决Redis客户端订阅时候超时情况
$redis->setOption();
$redis->psubscribe(array('__keyevent@0__:expired'),"keyCallback");
//windows本地启动
//php index.php RestapiV3/Psubscribe start
//linux启动 nohup 不挂起执行
//nohup index.php RestapiV3/Psubscribe start &
}
//任意位置发布订阅消息 需要引入 use RestapiV3\Lib\RedisUtil;
public function index(){
$objredis = new RedisUtil();
$objredis->lRange('order',0,9999999);
$order_sn = '168';//msg就是这里 3代表过期事件
$res = $objredis->setex($order_sn,3,'redis延迟任务');////3秒后回调
// $key = array(
// 'id'=>'2',
// 'msg'=>'3'
// );
// $objredis = new RedisUtil();
// $objredis->lRange('order',0,9999999);
// $order_sn = '168';
// $res = $objredis->setex(json_encode($key),10,'');
// dump($res);die;
// echo '发布订阅消息成功';
// die;
}
}
RestapiV3\Lib\RedisUtil.class.php
<?php
namespace RestapiV3\Lib;
class RedisUtil{
private $instance = null;
public function __construct(){
$this->instance = new \Redis();
$this->instance->connect(C('cache_ip'),C('cache_port') );
$this->instance->auth(C('cache_password'));
$this->instance->config('notify-keyspace-events','Ex');//开启redis key 过期通知(改过配置文件,但没成功)
}
/*
public static function getRedis()
{
if(!self::$instance)
{
new self;
}
return self::$instance;
}
*/
public function setex($key, $time, $val)
{
return $this->instance->setex($key, $time, $val);
}
public function psubscribe($patterns = array(), $callback)
{
$this->instance->psubscribe($patterns, $callback);
}
public function lRange($key,$start,$end)
{
return $this->instance->lRange($key,$start,$end);
}
public function lPush($key, $value1, $value2 = null, $valueN = null ){
return $this->instance->lPush($key, $value1, $value2 = null, $valueN = null );
}
public function setOption()
{
$this->instance->setOption(\Redis::OPT_READ_TIMEOUT, -1);
}
//add new value
public function set($key,$value,$time)
{
if($time==0)
{
return $this->instance->set($key,$value);
}
else
{
$this->instance->set($key,$value);
return $this->instance->expire($key,$time);
}
}
//add value with expire time
public function expire($key,$time)
{
return $this->instance->expire($key,$time);
}
public function get($key)
{
return $this->instance->get($key);
}
public function exists($key)
{
return $this->instance->exists($key);
}
public function delete($key)
{
return $this->instance->delete($key);
}
private function _clone(){}
}