Django3.1异步视图+aiomysql+channels实
Django3.1异步视图+aiomysql+channels实现小游戏
最近有需求需要开发一款网页答题小游戏,实现实时对战的功能,首先想到使用tornado高并发异步框架去实现websocket,可是就是这个时候django3.1正式版发布了,说他来的早不如说他来得巧,既然方便强大的django支持异步视图了那为什么还要去花时间研究tornado,django3.x实现asgi接口自然可以实现websocket,但是考虑开发成本,最终还是选择使用channels实现websocket。考虑到公司业务,这里就只写websocket的实现吧。
项目依赖的主要环境
- python >= 3.7
- django >= 3.1
- channels >= 2.4
- aiomysql
- uvicorn
channels配置
1. 假设python和django环境已经就绪且项目已经初始化
2. 安装channels
pip install channels
3. 在app中注册channels
# settings.py
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
...
'channels',
)
4. 创建路由配置
在工程目录下创建routing.py(myproject/routing.py和settings.py同级)
# myproject/routing.py
from channels.auth import AuthMiddlewareStack
# 继承settings中的allow host
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AllowedHostsOriginValidator(
AuthMiddlewareStack( # 中间件
URLRouter(
# 这里配置websocket的路由
)
),
)
})
5. 配置路由指向并且启用通道层
通道层可以理解为为每一个ws连接做一个唯一映射存在全局,这样可以在django的任何地方使用通道层给指定的连接发送消息
使用通道层需要安装channels_redis:pip install channels_redis
# settings.py
ASGI_APPLICATION = 'myproject.routing.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
6. 创建websocket的app
1. 注册app
# settings.py
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
...
'app',
'channels',
)
2. 在app中创建消费者文件(consumers.py)
# app/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
# 建立连接的回调
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_id']
self.room_group_name = 'chat_%s' % self.room_id
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 断开连接的回调
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
# 收到消息的回调
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
# 发送消息指定的处理方法
async def chat_message(self, event):
message = event['message']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': message
}))
3. 在app中创建websocket路由文件(routing.py)
1.创建路由文件
# app/routing.py
from django.urls import re_path
from answer_game import consumers
websocket_urlpatterns = [
re_path(r'ws/game/controller/(?P<room_id>\w+)/(?P<user_id>\d+)$', consumers.GameController),
]
2. 创建自定义中间件
class WebSocketAuthMiddleware:
def __init__(self, inner):
# 存储通过的ASGI应用程序
self.inner = inner
def __call__(self, scope):
# 关闭旧的数据库连接,以防止使用超时的连接
close_old_connections()
# 自定义校验逻辑,获取子协议内容
protocol = dict(scope['headers']).get(b'sec-websocket-protocol', b'').decode()
# 处理子协议
# 这里塞进scope的值可以在消费者方法中直接获取
# 关于scope的更多信息可以参考官方文档 https://channels.readthedocs.io/en/latest/topics/consumers.html?highlight=scope#scope
accept = False if not protocol else True
# 直接返回内部应用程序并让它继续运行
return self.inner(dict(scope, accept=accept))
2. 注册路由,使用自定义中间件
# myproject/routing.py
# 这里是自定义中间件,也可以换成自带的auth中间件,因为本次使用了websocket子协议protocol,所以使用自定义中间件做安全校验
from costudy_answer_game.WebSocketAuthMiddleware import WebSocketAuthMiddleware
# 这里就是自带的auth中间件,没有用到
from channels.auth import AuthMiddlewareStack
# 继承settings中的allow host
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
from app import routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AllowedHostsOriginValidator(
WebSocketAuthMiddleware(
URLRouter(
# 这里配置websocket的路由
routing.websocket_urlpatterns
)
),
)
})
7. 修改消费者方法接受子协议
# app/consumers.ChatConsumer.connect
await self.accept('your_protocol')
8. 创建asgi文件
在工程目录下创建asgi.py
# myproject/asgi.py
import os
import django
from django.core.asgi import get_asgi_application
from channels.routing import get_default_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = get_asgi_application()
django.setup()
ws_application = get_default_application()
到这里websocket的配置就已经完成里
使用aiomysql
django3.1目前还不支持异步orm,虽然官方文档说明可以使用sync_to_async转换同步方法为异步方法从而使用同步orm,但是实际开发中会遇到很多复杂的聚合查询等,此时再使用这种方法会很麻烦,而且甚至使用不了(是我使用不了,可能是我使用方法不对),所以我选择了aiomysql,aiomysql的官方文档在这 https://aiomysql.readthedocs.io/en/latest/connection.html#
当然也可以使用异步orm框架tortoise-orm,附上文档连接 https://tortoise-orm.readthedocs.io/en/latest/
但是由于调研时间问题项目里还是使用了原生sql,而且我始终不愿意在django里使用其他的orm,这是我对django orm的尊重,期望django能更快的支持异步orm
1.安装aiomysql
pip install aiomysql
2. 配置aiomysql
# settings.py
class DBcontroller:
db1_engine = None
db2_engine = None
isinstance = False
def __new__(cls, *args, **kwargs):
if cls.isinstance: # 如果被实例化了
return cls.isinstance # 返回实例化对象
print('connecting to database...')
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(DBcontroller.connect(), loop)
cls.isinstance = object.__new__(cls) # 否则实例化
return cls.isinstance # 返回实例化的对象
@staticmethod
async def connect():
try:
db1_aiomysql_config = dict(
host=os.environ.get('MYSQLHOST', '127.0.0.1'),
port=int(os.environ.get('MYSQLPORT', 3306)),
user=os.environ.get('MYSQLUSER', 'root'),
password=os.environ.get('MYSQLPWD'),
maxsize=100,
db='dbname1',
# echo=True
)
db2_aiomysql_config = db1_aiomysql_config.copy()
db2_aiomysql_config.update({'db': 'dbname2'})
db1_engine = await aiomysql.create_pool(**study_aiomysql_config)
db2_engine = await aiomysql.create_pool(**game_aiomysql_config)
if db1_engine and db2_engine:
DBcontroller.db1_engine = db1_engine
DBcontroller.db2_engine = db2_engine
DBcontroller.connectStatue = True
print('connect to mysql success!')
else:
raise ("connect to mysql error ")
except:
print('connect error.', exc_info=True)
db = DBcontroller()
3. 使用
# app/views.py
from django.shortcuts import HttpResponse
async def get_user_id_by_uid(request):
uid = request.GET.get('uid')
async with await db.study_engine.acquire() as coon: # type: aiomysql.connection.Connection
async with coon.cursor() as cur: # type: aiomysql.cursors.Cursor
sql = "SELECT nickname FROM user WHERE id=%s"
await cur.execute(sql, uid)
rel = await cur.fetchone()
if rel:
user_id, = rel
else:
user_id = 0
db.study_engine.release(coon)
rel = {"user_id":user_id}
return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')
4. 封装使用
每一次执行sql都要写这么多代码,太麻烦了,那就封装一下吧
创建公共方法文件utils.py
class DBExecute(object):
def __init__(self, sql: str, params=None, return_type='tuple'):
"""
执行study数据库的sql
:param sql: sql语句
:param params: sql的参数
:param return_type: 返回的数据类型
"""
self.sql = sql
self.params = params
self.return_type = return_type
async def fetchone(self) -> (tuple, dict):
async with await db.study_engine.acquire() as conn: # type: aiomysql.connection.Connection
if self.return_type == 'dict':
async with conn.cursor(aiomysql.DictCursor) as cur:
if self.params is None:
await cur.execute(self.sql)
else:
await cur.execute(self.sql, self.params)
rel = await cur.fetchone() # type: dict
else:
async with conn.cursor() as cur: # type: aiomysql.cursors.Cursor
if self.params is None:
await cur.execute(self.sql)
else:
await cur.execute(self.sql, self.params)
rel = await cur.fetchone() # type: tuple
db.study_engine.release(conn)
return rel
async def fetchall(self) -> list:
async with await db.study_engine.acquire() as conn: # type: aiomysql.connection.Connection
if self.return_type == 'dict':
async with conn.cursor(aiomysql.DictCursor) as cur: # type: aiomysql.cursors.Cursor
if self.params is None:
await cur.execute(self.sql)
else:
await cur.execute(self.sql, self.params)
rel = await cur.fetchall()
else:
async with conn.cursor() as cur: # type: aiomysql.cursors.Cursor
if self.params is None:
await cur.execute(self.sql)
else:
await cur.execute(self.sql, self.params)
rel = await cur.fetchall()
db.study_engine.release(conn)
return rel
这里只封装了一个db的执行方法,在settings.py中配置了两个db,这个是根据业务调整的
现在再看一下刚刚的视图函数可以怎么写
# app/views.py
from django.shortcuts import HttpResponse
from common.utils import db
async def get_user_id_by_uid(request):
uid = request.GET.get('uid')
sql = "SELECT nickname FROM user WHERE id=%s"
rel = await DBExecute(sql, uid).fetchone()
if rel:
user_id, = rel
else:
user_id = 0
rel = {"user_id":user_id}
return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')
启动项目
uvicorn --host 127.0.0.1 --port 8000 --workers 1 myproject.asgi:ws_application
总结
算了,不会总结,有问题就谷歌