peewee数据库连接池
2023-06-18 本文已影响0人
d4d5907268a9
目录
同步数据库与异步数据库
数据库配置信息:
db_config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'ai_platform_cloud',
}
同步:
# 同步数据库
from peewee import *
db = MySQLDatabase(**db_config)
异步:
# 异步数据库
from peewee import *
from peewee_async import MySQLDatabase as AsyncMySQLDatabase
db = AsyncMySQLDatabase(**db_config)
模型类:
class UserModel(Model):
"""模型基类"""
user_id = IntegerField(primary_key=True)
user_name = CharField(max_length=255)
class Meta:
database = db
查询:
# 同步查询
query_list = UserModel.select().where(UserModel.user_id >= 1)
# 异步查询
query_list = await UserModel.objects.execute(UserModel.select().where(UserModel.user_id >= 1))
# 打印结果
for query in query_list:
print(query.user_id)
print(quert.user_name)
断线重连
from peewee import *
from playhouse.shortcuts import ReconnectMixin
# 同步数据库
# 同步数据库断线重连类
class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):
pass
# 数据库实例
db = ReconnectMySQLDatabase(**db_config)
# 异步数据库
from peewee_async import MySQLDatabase as AsyncMySQLDatabase
# 异步数据库断线重连类
class ReconnectAsyncMySQLDatabase(ReconnectMixin, AsyncMySQLDatabase):
pass
# 数据库实例
db = ReconnectAsyncMySQLDatabase(**db_config)
连接池
from peewee import *
# 同步数据库连接池
from playhouse.pool import PooledMySQLDatabase
# 数据库实例
db = PooledMySQLDatabase(**db_config, max_connections=10)
# 异步数据库连接池
from peewee_async import PooledMySQLDatabase as AsyncPooledMySQLDatabase
# 数据库实例
db = AsyncPooledMySQLDatabase(**db_config, max_connections=10)
断线重连+连接池
from peewee import *
from playhouse.shortcuts import ReconnectMixin
# 同步数据库
# 连接池
from playhouse.pool import PooledMySQLDatabase
# 断线重连+连接池
class ReconnectPooledMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
_instance = None
@classmethod
def get_db_instance(cls):
if not cls._instance:
cls._instance = cls(**db_config, max_connections=10)
return cls._instance
# 数据库实例
db = ReconnectPooledMySQLDatabase.get_db_instance()
# 异步数据库
# 连接池
from peewee_async import PooledMySQLDatabase as AsyncPooledMySQLDatabase
# 断线重连+连接池
class ReconnectAsyncPooledMySQLDatabase(ReconnectMixin, AsyncPooledMySQLDatabase):
_instance = None
@classmethod
def get_db_instance(cls):
if not cls._instance:
cls._instance = cls(**db_config, max_connections=10)
return cls._instance
# 数据库实例
db = ReconnectAsyncPooledMySQLDatabase.get_db_instance()