OpenResty上使用的redis工具类
2021-10-17 本文已影响0人
肥兔子爱豆畜子
最近在看360团队的openresty最佳实践,一边学lua这门语言,一边熟悉openresty技术栈。里边提供了一个封装的redis客户端工具类,这里稍微改了一下。
-- file name: resty/redis_iresty.lua
local redis_c = require "resty.redis"
--当前版本是否支持table.new,做下兼容处理
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end
--[[
table.new(narray,nhash)
预分配new一个table,指定array长度或者hash长度。
一般一个table推荐同时只用一种类型,所以一般是table.new(0,n)或table.new(n,0)
]]
local _M = new_tab(0, 155) -- 新建一个155长度的hash table
_M._VERSION = '0.01'
--redis命令
local commands = {
"append", "auth", "bgrewriteaof",
"bgsave", "bitcount", "bitop",
"blpop", "brpop",
"brpoplpush", "client", "config",
"dbsize",
"debug", "decr", "decrby",
"del", "discard", "dump",
"echo",
"eval", "exec", "exists",
"expire", "expireat", "flushall",
"flushdb", "get", "getbit",
"getrange", "getset", "hdel",
"hexists", "hget", "hgetall",
"hincrby", "hincrbyfloat", "hkeys",
"hlen",
"hmget", "hmset", "hscan",
"hset",
"hsetnx", "hvals", "incr",
"incrby", "incrbyfloat", "info",
"keys",
"lastsave", "lindex", "linsert",
"llen", "lpop", "lpush",
"lpushx", "lrange", "lrem",
"lset", "ltrim", "mget",
"migrate",
"monitor", "move", "mset",
"msetnx", "multi", "object",
"persist", "pexpire", "pexpireat",
"ping", "psetex", "psubscribe",
"pttl",
"publish", --[[ "punsubscribe", ]] "pubsub",
"quit",
"randomkey", "rename", "renamenx",
"restore",
"rpop", "rpoplpush", "rpush",
"rpushx", "sadd", "save",
"scan", "scard", "script",
"sdiff", "sdiffstore",
"select", "set", "setbit",
"setex", "setnx", "setrange",
"shutdown", "sinter", "sinterstore",
"sismember", "slaveof", "slowlog",
"smembers", "smove", "sort",
"spop", "srandmember", "srem",
"sscan",
"strlen", --[[ "subscribe", ]] "sunion",
"sunionstore", "sync", "time",
"ttl",
"type", --[[ "unsubscribe", ]] "unwatch",
"watch", "zadd", "zcard",
"zcount", "zincrby", "zinterstore",
"zrange", "zrangebyscore", "zrank",
"zrem", "zremrangebyrank", "zremrangebyscore",
"zrevrange", "zrevrangebyscore", "zrevrank",
"zscan",
"zscore", "zunionstore", "evalsha"
}
--元表,用在返回的模块对象上
local mt = { __index = _M }
local function is_redis_null( res )
if type(res) == "table" then
for k,v in pairs(res) do
if v ~= ngx.null then
return false
end
end
return true
elseif res == ngx.null then
return true
elseif res == nil then
return true
end
return false
end
-- change connect address as you need
function _M.connect_mod( self, redis )
redis:set_timeout(self.timeout)
--return redis:connect("127.0.0.1", 6379)
--return redis:connect(self.ip, self.port)
local ok ,err = redis:connect(self.ip, self.port)
if not ok then
return {}, err
end
local count
count, err = redis:get_reused_times()
if 0 == count then
--local ok, err = redis:auth("123456")
ok, err = redis:auth(self.password)
if not ok then
--ngx.say("failed to auth: ", err)
return {}, err
end
elseif err then
--ngx.say("failed to get reused times: ", err)
return {}, err
end
return ok, err
end
function _M.set_keepalive_mod( self, redis )
-- put it into the connection pool of size 100, with 60 seconds max idle time
--return redis:set_keepalive(60000, 100)
return redis:set_keepalive(self.max_idle_ms, self.pool_size)
end
function _M.init_pipeline( self )
self._reqs = {}
end
function _M.commit_pipeline( self )
local reqs = self._reqs
if nil == reqs or 0 == #reqs then
return {}, "no pipeline"
else
self._reqs = nil
end
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok then
return {}, err
end
redis:init_pipeline()
for _, vals in ipairs(reqs) do
local fun = redis[vals[1]]
table.remove(vals , 1)
fun(redis, unpack(vals))
end
local results, err = redis:commit_pipeline()
if not results or err then
return {}, err
end
if is_redis_null(results) then
results = {}
ngx.log(ngx.WARN, "is null")
end
-- table.remove (results , 1)
self.set_keepalive_mod(redis)
for i,value in ipairs(results) do
if is_redis_null(value) then
results[i] = nil
end
end
return results, err
end
function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
res, err = redis:read_reply()
if not res then
return nil, err
end
redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return res, err
end
local function do_command(self, cmd, ... )
if self._reqs then
table.insert(self._reqs, {cmd, ...})
return
end
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local fun = redis[cmd]
local result, err = fun(redis, ...)
if not result or err then
-- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
return nil, err
end
if is_redis_null(result) then
result = nil
end
self:set_keepalive_mod(redis) -- self.set_keepalive_mod(self, redis)
return result, err
end
for i = 1, #commands do
local cmd = commands[i]
_M[cmd] =
function (self, ...)
return do_command(self, cmd, ...)
end
end
function _M.new(self, opts)
opts = opts or {}
--local timeout = (opts.timeout and opts.timeout * 1000) or 1000
local timeout = opts.timeout or 1000
local db_index= opts.db_index or 0
--补齐其他可配置项
local ip = opts.ip or "127.0.0.1" --默认去连本地redis,目前不支持哨兵和集群模式
local port = opts.port or 6379
local max_idle_ms = opts.max_idle_ms or 60000
local pool_size = opts.pool_size or 64
local password = opts.password or "password"
--ngx.log(ngx.INFO, "redis timeout: ", timeout, " , db_index: ", db_index)
--ngx.log(ngx.INFO, "redis ip: ", ip, " , port: ", port)
--ngx.log(ngx.INFO, "pool_size: ", pool_size, " , max_idle_ms: ", max_idle_ms)
return setmetatable({
timeout = timeout,
db_index = db_index,
ip = ip,
port = port,
max_idle_ms = max_idle_ms,
pool_size = pool_size,
password = password,
_reqs = nil }, mt)
end
return _M
代码使用:
local redis_client = require "wangan.common.redis_iresty"
local red = redis_client:new({
ip = "127.0.0.1",
port = 6379,
password = "123456",
timeout = 2000,
db_index = 0,
max_idle_ms = 60000,
pool_size = 32
})
local ok, err = red:get("my-account")
if not ok then
ngx.say("failed to get my-account: ", err)
return
end
ngx.say("my-account: ", ok)
然后如果不使用封装的工具类,代码是这样的:
--[[
https://github.com/openresty/lua-resty-redis
openresty连接redis的例子
官方还不支持redis哨兵和集群模式,第三方库倒是有一个https://github.com/steve0511/resty-redis-cluster
--]]
local redis = require "resty.redis"
local red = redis:new()
--red:set_timeouts(connect_timeout, send_timeout, read_timeout)
red:set_timeouts(1000, 1000, 1000)
--red:connect是新建连接、还有可能是去内建连接池里取一个连接
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.say("failed to connect: ", err)
return
end
--每个连接只需要auth一次,这里get_reused_times判定是否是新连接
local count
count, err = red:get_reused_times()
if 0 == count then
ok, err = red:auth("123456")
if not ok then
ngx.say("failed to auth: ", err)
return
end
elseif err then
ngx.say("failed to get reused times: ", err)
return
end
local res, err = red:get("my-account")
if not res then
ngx.say("failed to get my-account: ", err)
return
end
if res == ngx.null then
ngx.say("my-account not found.")
return
end
ngx.say("my-account: ", res)
--连接正常使用之后,将连接放入连接池
--允许空闲10s,最大连接数100
local ok , err = red:set_keepalive(10000, 100)
if not ok then
ngx.say("failed to set_keepalive: ", err)
return
end
--[[
关于将连接放入连接池,一定要确保连接使用过程中未发生异常、即顺利的使用完以后在放入。
并且要注意连接的状态,因为放入连接池的连接将会带着这个状态,会被其他的业务模块使用到,可能会引发预期之外的问题。
例如:
red:connect(ip, 6379)
red:select(1)
red:set_keepalive(10000, 100)
高并发下上面代码将连接放到池里后,后面其他模块如果拿到这个连接还是会取db1操作的,如果默认是想去db0操作那么就会引发bug
所以正确的代码应该是在set_keepalvie之前,复位默认,red:select(0)
--]]
不使用封装工具类也没啥问题,但是会有很多重复冗余的代码。
关于哨兵和集群模式
参看OpenResty连接redis的例子 https://github.com/openresty/lua-resty-redis
目前官方还不支持redis哨兵和集群模式,第三方库和实现思路可以参考:
https://github.com/steve0511/resty-redis-cluster