python 后台应用项目搭建(aiohtpp)

2020-03-21  本文已影响0人  BrianHsu

通过本文章,可能获得以下能力

阅读概览

环境
依赖
项目结构
项目主要内容

环境

      python 3.6 or highter
      mysql  5.2.5 (未使用其他版本测试,感兴趣的话可以试着更换 mysql 版本,查看其兼容性)
      设备 Mac 
      开发工具  pycharm

依赖包

     -   aiomysql==0.0.20
     -   ujson==1.35
     -   SQLAlchemy==1.3.8
     -   shortuuid==0.5.0
     -   aiohttp==3.5.4
     -   PyYAML==5.3.1

包安装

    pip install aiohttp
    pip install aiomysql
    pip install sqlalchemy
    pip install ujson
    pip install PyYAML
    pip install shortuuid

项目目录

根目录为配置文件及源码

.
├── README.md
├── config 
│   ├── __init__.py
│   ├── dev.yml
│   └── setting.py
├── requirements.txt
└── src
    ├── __init__.py
    ├── common
    │   ├── __init__.py
    │   └── mysql.py
    ├── constants
    │   ├── __init__.py
    │   └── const.py
    ├── model
    │   ├── __init__.py
    │   ├── base_option.py
    │   └── student.py
    ├── routes.py
    ├── server.py
    ├── service
    │   ├── __init__.py
    │   └── student_svc.py
    ├── utils
    │   ├── __init__.py
    │   ├── request_utils.py
    │   ├── response_utils.py
    │   └── time_utils.py
    └── views
        ├── __init__.py
        ├── status.py
        └── student_api.py


项目内容

# server.py
from aiohttp import web

from config.setting import config
from src import create_app

app = create_app()

if __name__ == "__main__":
    web.run_app(app, host='127.0.0.1', port=config['base']['port'])

用于初始化路由、数据库、日志、配置信息等操作

import asyncio
from aiohttp import web

# 版本号
from src.common.mysql import Mysql
from config.setting import config
from src.routes import set_up
from src.utils.request_utils import request_id
from src.utils.response_utils import success, error

__version__ = '1.0.0'


def create_app():
    # 增加响应处理
    my_app = web.Application(middlewares=[middleware])
    my_app['config'] = config
    # 初始化mysql
    asyncio.ensure_future(Mysql.init_engine())
    # 初始化路由
    set_up(my_app)
    return my_app


@web.middleware
async def middleware(request, handler):
    _request_id = request_id(request)
    try:
        request['request_id'] = _request_id
        data = await handler(request)
        if data:
            resp = success(_request_id, data)
        else:
            resp = success(_request_id)
    except ValueError as e:
        resp = error(_request_id, "RC_INVALID_PARAM", e.args[0] if e.args else 'params invalid')
    except Exception as e1:
        resp = error(_request_id, "RC_INTERNAL_ERROR", e1.args[0] if e1.args else 'inner error')
    return web.Response(body=resp.encode(), content_type='application/json')

from src.views.student_api import students, student, save_student
from .views.status import status


def set_up(app):
    # 项目状态
    app.router.add_get('/status', status, name='status')

    # 获取全部学生信息
    app.router.add_get('/students', students, name='students')

    # 获取单个学生信息
    app.router.add_get('/student', student, name='student')

    # 获取单个学生信息
    app.router.add_post('/save', save_student, name='save_student')

import asyncio

from src.service import student_svc


# 使用asyncio.shield 防止请求过程取消请求造成数据库阻塞

async def students(request):
    return await asyncio.shield(student_svc.students())


async def save_student(request):
    data = await request.json()
    return await asyncio.shield(student_svc.save_student(data))


async def student(request):
    st_id = request.match_info.get("id")
    return await asyncio.shield(student_svc.single_student(st_id))


import aiomysql.sa

from config.setting import config

mysql_config = config["mysql"]


class Mysql(object):
    engine = None

    def __init__(self, engine):
        self.engine = engine

    @staticmethod
    async def init_engine():
        Mysql.engine = await aiomysql.sa.create_engine(user=mysql_config["user"], db=mysql_config["database"],
                                                       host=mysql_config["host"], password=mysql_config["password"],
                                                       autocommit=True, connect_timeout=10, echo=False)

    @staticmethod
    async def get_engine():
        if not Mysql.engine:
            await Mysql.init_engine()
        return Mysql.engine

from src import Mysql


class BaseOption:

    @classmethod
    async def insert_or_update(cls, *sqls):
        try:
            engine = await Mysql.get_engine()
            async with engine.acquire() as conn:
                await conn.connection.autocommit(False)
                trans = None
                try:
                    trans = await conn.begin()
                    for sql in sqls:
                        await conn.execute(sql)
                    await trans.commit()
                    return "success"
                except Exception as t:
                    if trans:
                        await trans.rollback()
                    raise Exception(t)
                finally:
                    await conn.connection.autocommit(True)
                    conn.connection.close()
        except Exception as e:
            raise Exception(e)

    @classmethod
    async def query(cls, sql):
        try:
            engine = await Mysql.get_engine()
            async with engine.acquire() as conn:
                try:
                    return await conn.execute(sql)
                except Exception as exec_err:
                    raise Exception(exec_err)
                finally:
                    conn.connection.close()
        except Exception as e:
            raise Exception(e)


from sqlalchemy import BigInteger, SMALLINT, Integer
from sqlalchemy import String
from sqlalchemy import Column
from sqlalchemy import Table
from sqlalchemy import MetaData

from src.constants import const
from src.model.base_option import BaseOption
from src.utils.time_utils import current_sec_time


class Student(BaseOption):

    def __init__(self):
        meta = MetaData()
        self.student_tb = Table(
            "student",
            meta,
            Column(const.Student.ID, Integer, primary_key=True),
            Column(const.Student.NAME, String(40)),
            Column(const.Student.CLASS, String(40)),
            Column(const.Student.STATUS, SMALLINT, default=1),
            Column(const.Student.CREATE_TIME, BigInteger, default=current_sec_time()),
            Column(const.Student.UPDATE_TIME, BigInteger, default=current_sec_time())
        )

    async def insert_student(self, student_obj):
        sql = self.student_tb.insert().values(
            name=student_obj.get("name"),
            st_class=student_obj.get("st_class"),
            status=student_obj.get("status", 1),
            create_time=current_sec_time(),
            update_time=current_sec_time())
        return await self.insert_or_update(sql)

    async def get_update_student_sql(self, student_obj):
        sql = self.student_tb.update().values(
            name=student_obj.get("name"),
            st_class=student_obj.get("st_class"),
            status=student_obj.get("status"),
            create_time=current_sec_time(),
            update_time=current_sec_time()
        ).where(self.student_tb.c.id == student_obj.get(const.Student.ID))
        return self.query(sql)

    async def status(self, st_id):
        try:
            sql = self.student_tb.delete().where(self.student_tb.c.id == st_id)
            return await self.insert_or_update(sql)
        except Exception as e:
            raise Exception('update status failed, error info:{}.'.format(e))

    async def select_student_by_st_id(self, st_id):
        try:
            sql = self.student_tb.select().where(self.student_tb.c.id == st_id)
            cursor = await self.query(sql)
            records = await cursor.fetchall()
            return [dict(r) for r in records] if records else None
        except Exception as e:
            raise Exception('select_by_id failed, error info:{}.'.format(e))

    async def select_students(self):
        try:
            sql = self.student_tb.select()
            print(sql)
            cursor = await self.query(sql)
            records = await cursor.fetchall()
            return [dict(r) for r in records] if records else None
        except Exception as e:
            raise Exception('select_by_id failed, error info:{}.'.format(e))

具体代码移步github aiohttp-example

上一篇下一篇

猜你喜欢

热点阅读