python + redis 操作总结
2018-08-22 本文已影响0人
sunshaoping
一.基本操作
1.cmd窗口1启动redis服务(服务端)命令:redis-server
2.cmd窗口2启动客户端(客户端)命令:redis-cli
3.客户端输入级命令:ping,输出pong,证明服务启动正常
4.客户端获取redis密码:config get requirepass
5.客户端设置redis密码:config set requirepass 123456(设置单次密码)
6.设置永久密码:修改redis配置文件,redis安装目录下的redis.conf文件,找到# requirepass foobared修改为 requirepass 123456,修改密码为123456。
二.redis写入读取数据
import redis # 导入redis模块,通过python操作redis 也可以直接在redis主机的服务端操作缓存数据库
r = redis.Redis(host='localhost', port=6379) # host是redis主机,需要redis服务端和客户端都启动 redis默认端口是6379
order = [[10001,2000,1500000,12],[10001,2001,1500000,13]] # 订单列表
user = {"id":1,"order":order,"state":1}
r.set("1",user)
a = r.get(1)
print(a) # 从redis数据库中获取的字节类型
a = a.decode("utf-8") # 将字节变为字符串
print(a) # 字符串类型
b = eval(a) # eval函数将类似列表的字符串转为列表,将类似字典的转换为字典
print(b["id"],b["order"],b["order"][0])
# 输出结果:1, [[10001]...,[10001]...],[10001,2000,150000,12]
三.封装redis辅助类
class RedisHelper(): # redis封装
def __init__(self, host='localhost', port=6379,db=0):
try:
self.__redis = redis.StrictRedis(host, port, db)
except Exception as e:
print(e)
def get(self, key):
if self.__redis.exists(key):
return self.__redis.get(key)
else:
return ""
def set(self, key, value):
self.__redis.set(key, value)
r = RedisHelper() # 实例化redis
r.set("1","user") # 写入数据
r.get("1") # 读取数据
四.使用redis连接池
import redis # 导入redis模块,通过python操作redis 也可以直接在redis主机的服务端操作缓存数据库
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True) # host是redis主机,需要redis服务端和客户端都起着 redis默认端口是6379
r = redis.Redis(connection_pool=pool)
r.set('gender', 'male') # key是"gender" value是"male" 将键值对存入redis缓存
print(r.get('gender')) # gender 取出键male对应的值
五.生产消费模式
使用redis中的列表来实现一个消息队列,开启两个程序,一个作为生产者使用lpush写队列,一个作为消费者使用brpop读队列,由于消费者不知道什么时候会有数据传过来,所以消费者需要一直循环读取数据,两者的消息使用json进行封装协议传输。
# 生产者模型
import json
import redis
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2
def make_message(m_id, m_type): # 产生一个消息
mess_dict = {"id": m_id, "type": m_type}
return json.dumps(mess_dict)
def creator(): # 生产消息放进消息队列
conn = redis.StrictRedis() # 实例化redis
for i in range(1, 10):
js_data = make_message(i, i % 3) # 生产消息
print("push message:%s" % js_data)
conn.lpush("msgQueue", js_data) # 消息放进消息队列
if __name__ == '__main__':
creator()
# 消费者模型
import json
import redis
MSG_TYPE_READ_BOOK = 0
MSG_TYPE_PLAY_GAME = 1
MSG_TYPE_SING_SONG = 2
def parse_message(js_data): # 将消息队列中的消息解析成字典
return json.loads(js_data)
def handle_message():
conn = redis.StrictRedis()
print("开始等待消息队列")
while True:
msg = conn.brpop("msgQueue")[1]
msg = str(msg, encoding="utf-8") # 将byte装换为str
msg_dict = parse_message(msg)
m_id = msg_dict["id"]
m_type = msg_dict["type"]
if m_type == MSG_TYPE_PLAY_GAME:
print("消息%d:我要打游戏"%m_id)
elif m_type == MSG_TYPE_READ_BOOK:
print("消息%d:我要读书"%m_id)
else:
print("消息%d:我要唱歌"%m_id)
if __name__ == '__main__':
handle_message()
六.使用redis封装连接池
import redis
# 定义redis类
class redis_ret():
def __init__(self):
# 初始化变量
self.rpool = redis.ConnectionPool(host="127.0.0.1",port=6379,db=0) # 设置链接主机(host=主机ip,port=端口号)
self.r = redis.Redis(connection_pool=self.rpool) # 建立连接池
# 存储字典
def hset(self,name,key,value):
self.r.hset(name,key,value)
# 存储字典
def hmset(self,name,key):
self.r.hmset(name,key)
# 获取字典中对应的值
def hget(self,name,key):
return self.r.hget(name,key).decode("utf-8")
# 获取字典中的值
def hmget(self,name,*args):
hmget_list = [i.decode("utf-8") for i in self.r.hmget(name,args)]
return hmget_list
# 获取字典中的值,不存在返回amount
def hincrby(self,name,key,value):
self.r.hincrby(name,key,value)
# 获取字典中的值,不存在返回floast型amount
def hincrbyfloat(self,name,key,value):
self.r.hincrbyfloat(name,key,value)
#删除name对应的list中的第一个值
def lpop(self,name):
return self.r.lpop(name).decode("utf-8")
# 删除name对应的list中的指定值
def lrem(self,name,value):
self.r.lrem(name,value)
# 获取name对应的list的长度
def llen(self,name):
return self.r.llen(name)
# 删除指定元素
def delete(self,name):
self.r.delete(name)
# 获取name对应的list中切片后的值
def lrange(self,name,a,b):
lrange_list = [i.decode("utf-8") for i in self.r.lrange(name,a,b)]
return lrange_list
# 存储列表(往后(右)插入)
def rpush(self,name,value):
self.r.rpush(name,value)
# 在name对应的list中添加元素,name不存在不操作
def lpushx(self,name,value):
self.r.lpushx(name,value)
七.将redis添加到windows服务中
之前安装过redis数据库,也将redis服务添加到windows服务中,但是由于之前的服务路径已经不存在,导致redis服务每次启动的时候都找不到文件路径,所以redis服务也无法启动
解决方法:
1.重新安装redis服务:在github下载安装redis安装包https://github.com/MSOpenTech/redis/tags,下载安装包后解压到指定文件夹。
2.下载的要求配置为系统配置:
Redis-x64-3.2.100
win10 64
3.运行cmd,指向解压后的redis文件目录,
启动服务命令:redis-server redis.windows.conf
4.将redis解压目录添加到系统环境变量中
(1)点击(我的电脑)-->(属性)-->(高级系统设置)-->(环境变量)
(2)在系统变量这一栏找到「Path」这个变量,选择并点击下方的「编辑
(3)点击「新建」,将 Redis 的解压文件目录地址复制粘贴进去,然后点击下方的「确定」,关闭窗口,将前边的窗口都点击「确定」后关闭
(4)OK, Redis 环境变量已经添加完成,下边进行下一步
5.将redis服务添加到windows服务中:redis-server --service-install redis.windows.conf --service-name redis6379 --loglevel verbose
指定服务名称为redis6379,这样就不会和之前的服务名称冲突,也可以在不删除之前的服务情况下,安装新的redis服务。
6.正常将redis服务添加到windows服务中为在解压文件下输入命令:redis-server --service-install redis.windows-service.conf --loglevel verbose