爬虫@IT·互联网生活不易 我用python

aiohttp服务器上手——搭建简单的todo应用

2017-04-27  本文已影响3171人  treelake

Getting start with aiohttp.web: A todo tutorial

aiohttp是一个基于Python 3.4+ asyncio模块的HTTP工具包。它包括:

高低层级的服务器区别在于高层级服务器提供了:

低层级服务器仍然允许使用高层级的请求、响应和websocket对象,并不是低到让你直接处理TCP套接字。
在本指南中,我们将构建一个简单的Todo应用程序来上手高级服务器。在未来,我希望涵盖更复杂的应用,但目前待办事项列表已经成为网络编程的最佳选择。

安装

python3.6 -m venv asynctodo
source asynctodo/bin/activate
python -m pip install aiohttp==2.0.7  # current version as 2017-04-16

基本应用

from aiohttp import web

TODOS = [
    {
        'name': 'Start this tutorial',
        'finished': True
    },
    {
        'name': 'Finish this tutorial',
        'finished': False
    }
]


def get_all_todos(request):
    return web.json_response([
        {'id': idx, **todo} for idx, todo in enumerate(TODOS)
    ])


def get_one_todo(request):
    id = int(request.match_info['id'])

    if id >= len(TODOS):
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.json_response({'id': id, **TODOS[id]})


def app_factory(args=()):
    app = web.Application()
    app.router.add_get('/todos/', get_all_todos, name='all_todos')
    app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')

    return app

补充:nerdwaller评论指出路由处理器应该总是异步的来鼓励你以协程思考。这里我保留了原始代码来保持文本的一致性,然而,将上述内容转换为协程只需要将def替换为async def。我必须同意,即使在将列表转储到json的简单情况下,使用async def也提供了与其它路由处理器的良好对称性。

小贴士

运行应用

python -m aiohttp.web -P 8080 aiotodo:app_factory

你将看到:

======== Running on http://localhost:8080 ========
(Press CTRL+C to quit)

然后打开浏览器访问localhost:8080/todos/(注意尾斜杠),会看到我们放在列表中的两个初始项目以及它们的id。访问localhost:8080/todos/0localhost:8080/todos/1(没有尾斜杠)查看单个项目。

增加修改删除Todos

async def create_todo(request):
    data = await request.json()

    if 'name' not in data:
        return web.json_response({'error': '"name" is a required field'})

    name = data.get('name')

    if not isinstance(name, str) or not len(name):
        return web.json_response(
            {'error': '"name" must be a string with at least one character'})

    data['finished'] = bool(data.get('finished', False))
    TODOS.append(data)
    new_id = len(TODOS) - 1

    return web.Response(
        headers={
            'Location': str(request.app.router['one_todo'].url_for(id=new_id))
        },
        status=303
    )
async def update_todo(request):
    id = int(request.match_info['id'])

    if id >= len(TODOS):
        return web.json_response({'error': 'Todo not found'}, status=404)

    data = await request.json()

    if 'finished' not in data:
        return web.json_response(
            {'error': '"finished" is a required key'}, status=400)

    TODOS[id]['finished'] = bool(data['finished'])

    return web.Response(status=204)


def remove_todo(request):
    id = int(request.match_info['id'])

    if id >= len(TODOS):
        return web.json_response({'error': 'Todo not found'})

    del TODOS[id]

    return web.Response(status=204)
def app_factory(args=()):
    app = web.Application()
    app.router.add_get('/todos/', get_all_todos, name='all_todos')
    app.router.add_post('/todos/', create_todo, name='create_todo',
                        expect_handler=web.Request.json)
    app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
    app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
    app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
    return app
import requests, json
#
body = json.dumps({u"name": u"feed the api"})
url = u"http://localhost:8080/todos/"
#
r = requests.post(url=url, data=body)
r.content

可以看到,requests帮我们处理了重定向,成功访问到新建的项目:



其它请自行尝试。

持久化

python -m pip install psycopg2 aiopg sqlalchemy

启动数据库

CREATE ROLE aiotodo LOGIN PASSWORD '12345' NOINHERIT CREATEDB;
CREATE DATABASE aiotodo;

简单测试

import psycopg2
#
conn = psycopg2.connect("dbname=aiotodo user=aiotodo password=12345")
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS test")
cur.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def"))
cur.execute("SELECT * FROM test;")
print(cur.fetchone())
conn.commit()
cur.close()
conn.close()
mkdir -p tmp/pgdata
docker run -d --name postgres -p 5432:5432 \
    -v $(pwd)/tmp/pgdata:/var/lib/postgres/data \
    -e POSTGRES_USER=aiotodo -e POSTGRES_PASSWORD=12345 -e POSTGRES_DB=aiotodo \
    postgres

将应用程序连接到数据库

from aiopg.sa import create_engine
import sqlalchemy as sa

# 表的SQLAlchemy视图
metadata = sa.MetaData()
todos_tbl = sa.Table(
    'todos', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.String(255), unique=True, nullable=False),
    sa.Column('finished', sa.Boolean(), default=False, nullable=False)
)

# 创建表
async def create_table(engine):
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS todos')
        await conn.execute('''CREATE TABLE todos (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL UNIQUE,
            finished BOOLEAN NOT NULL DEFAULT FALSE
        )''')
async def attach_db(app):
    app['db'] = await create_engine(
        ' '.join([
            # 或改为你的数据库配置
            'host=localhost',
            'port=5432',
            'dbname=aiotodo',
            'user=aiotodo',
            'password=12345'
        ])
    )

async def teardown_db(app):
    app['db'].close()
    await app['db'].wait_closed()
    app['db'] = None


async def populate_initial_values(engine):
    async with engine.acquire() as conn:
        await conn.execute(todos_tbl.insert().values({'name': 'Start this tutorial', 'finished': True}))
        await conn.execute(todos_tbl.insert().values({'name': 'Finish this tutorial', 'finished': False}))


async def setup_todo_table(app):
    await create_table(app['db'])
    await populate_initial_values(app['db'])
def app_factory(args=()):
    app = web.Application()

    app.on_startup.append(attach_db)
    # app.on_teardown.append(teardown_db)
    # 原文为on_teardown但实测windows下aiohttp(2.0.7)会报错,改为
    app.on_shutdown.append(teardown_db)

    if '--make-table' in args:
        app.on_startup.append(setup_todo_table)

    app.router.add_get('/todos/', get_all_todos, name='all_todos')
    app.router.add_post('/todos/', create_todo, name='create_todo',
                        expect_handler=web.Request.json)
    app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
    app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
    app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
    return app
python -m aiohttp.web -P 8080 --make-table aiotodo:app_factory

在路由处理器中操作数据库

async def get_all_todos(request):
    async with request.app['db'].acquire() as conn:
        todos = [dict(row.items()) async for row in conn.execute(todos_tbl.select().order_by(todos_tbl.c.id))]
        return web.json_response(todos)


async def get_one_todo(request):
    id = int(request.match_info['id'])
    async with request.app['db'].acquire() as conn:
        result = await conn.execute(
            todos_tbl.select().where(todos_tbl.c.id == id))
        row = await result.fetchone()

    if not row:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.json_response(dict(row.items()))
async with
async for
results = []
async for x in things:
    results.append(x)

更新和删除

async def remove_todo(request):
    id = int(request.match_info['id'])

    async with request.app['db'].acquire() as conn:
        result = await conn.execute(todos_tbl.delete().where(todos_tbl.c.id == id))

    if not result.rowcount:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.Response(status=204)


async def update_todo(request):
    id = int(request.match_info['id'])
    data = await request.json()

    if 'finished' not in data:
        return web.json_response({'error': '"finished" is a required key'}, status=400)

    async with request.app['db'].acquire() as conn:
        result = await conn.execute(
            todos_tbl.update().where(todos_tbl.c.id == id).values({
                'finished': bool(data['finished'])
            })
        )

    if result.rowcount == 0:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.Response(status=204)

插入数据

from sqlalchemy import sql


async def create_todo(request):
    data = await request.json()

    if 'name' not in data:
        return web.json_response({'error': '"name" is a required field'})

    name = data['name']

    if not name or not isinstance(name, str):
        return web.json_response({'error': '"name" must be a string with at least one character'})

    todo = {'name': name, 'finished': bool(data.get('finished', False))}

    async with request.app['db'].acquire() as conn:
        async with conn.begin():
            await conn.execute(todos_tbl.insert().values(todo))
            result = await conn.execute(
                sql.select([sql.func.max(todos_tbl.c.id).label('id')])
            )
            new_id = await result.fetchone()

    return web.Response(
        status=303,
        headers={
            'Location': str(request.app.router['one_todo'].url_for(id=new_id.id))
        }
    )

进一步

代码 - (py3.5.2)

from aiohttp import web
from aiopg.sa import create_engine
import sqlalchemy as sa
from sqlalchemy import sql


# 表的SQLAlchemy视图
metadata = sa.MetaData()
todos_tbl = sa.Table(
    'todos', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.String(255), unique=True, nullable=False),
    sa.Column('finished', sa.Boolean(), default=False, nullable=False)
)


# -----------------------------------路由处理器----------------------------------
# 使用 async with request.app['db'].acquire() as conn 连接数据库
async def get_all_todos(request):
    '''
    获取所有代办事项
    '''
    async with request.app['db'].acquire() as conn:
        todos = []
        async for row in conn.execute(
            todos_tbl.select().order_by(todos_tbl.c.id)
        ):
            todos.append(
                dict(row.items()))
        return web.json_response(todos)


async def get_one_todo(request):
    '''
    根据路由中的id参数获取指定代办事项
    '''
    id = int(request.match_info['id'])
    async with request.app['db'].acquire() as conn:
        result = await conn.execute(
            todos_tbl.select().where(todos_tbl.c.id == id))
        row = await result.fetchone()

    if not row:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.json_response(dict(row.items()))


async def create_todo(request):
    '''
    创建一个新的代办事项
    '''
    data = await request.json()

    if 'name' not in data:
        return web.json_response({'error': '"name" is a required field'})

    name = data['name']

    if not name or not isinstance(name, str):
        return web.json_response(
            {'error': '"name" must be a string with at least one character'})

    todo = {'name': name, 'finished': bool(data.get('finished', False))}

    async with request.app['db'].acquire() as conn:
        async with conn.begin():
            await conn.execute(todos_tbl.insert().values(todo))
            result = await conn.execute(
                sql.select([sql.func.max(todos_tbl.c.id).label('id')])
            )
            new_id = await result.fetchone()

    return web.Response(
        status=303,
        headers={
            'Location': str(
                request.app.router['one_todo'].url_for(id=new_id.id))
        }
    )


async def remove_todo(request):
    '''
    清除指定代办事项
    '''
    id = int(request.match_info['id'])

    async with request.app['db'].acquire() as conn:
        result = await conn.execute(
            todos_tbl.delete().where(todos_tbl.c.id == id))

    if not result.rowcount:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.Response(status=204)


async def update_todo(request):
    '''
    更新某一条待办事项
    '''
    id = int(request.match_info['id'])
    data = await request.json()

    if 'finished' not in data:
        return web.json_response(
            {'error': '"finished" is a required key'}, status=400)

    async with request.app['db'].acquire() as conn:
        result = await conn.execute(
            todos_tbl.update().where(todos_tbl.c.id == id).values({
                'finished': bool(data['finished'])
            })
        )

    if result.rowcount == 0:
        return web.json_response({'error': 'Todo not found'}, status=404)

    return web.Response(status=204)


# -----------------------------数据库连接初始化相关操作-----------------------------
async def attach_db(app):
    '''
    连接数据库并附加到app
    '''
    app['db'] = await create_engine(
        ' '.join([
            # 或改为你的数据库配置
            'host=localhost',
            'port=5432',
            'dbname=aiotodo',
            'user=aiotodo',
            'password=12345'
        ])
    )


async def teardown_db(app):
    '''
    关闭与数据库的连接
    '''
    app['db'].close()
    await app['db'].wait_closed()
    app['db'] = None


async def create_table(engine):
    '''
    在数据库中创建新表
    '''
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS todos')
        await conn.execute('''CREATE TABLE todos (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL UNIQUE,
            finished BOOLEAN NOT NULL DEFAULT FALSE
        )''')


async def populate_initial_values(engine):
    '''
    初始化数据库的内容
    '''
    async with engine.acquire() as conn:
        await conn.execute(todos_tbl.insert().values(
            {'name': 'Start this tutorial', 'finished': True}))
        await conn.execute(todos_tbl.insert().values(
            {'name': 'Finish this tutorial', 'finished': False}))


async def setup_todo_table(app):
    '''
    创建表并初始化内容,只需执行一次
    '''
    await create_table(app['db'])
    await populate_initial_values(app['db'])


# -----------------------------app工厂 - 设置信号与路由处理器----------------------------
def app_factory(args=()):
    app = web.Application()

    app.on_startup.append(attach_db)
    app.on_shutdown.append(teardown_db)

    if '--make-table' in args:
        app.on_startup.append(setup_todo_table)

    app.router.add_get('/todos/', get_all_todos, name='all_todos')
    app.router.add_post('/todos/', create_todo, name='create_todo',
                        expect_handler=web.Request.json)
    app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
    app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
    app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
    return app

# 本文件命名为 aiotodo.py
# python -m aiohttp.web -P 8080 --make-table aiotodo:app_factory 初始化数据库并运行
# python -m aiohttp.web -P 8080 aiotodo:app_factory 正常运行


# --------------------------------测试-----------------------------------------
# import requests
# import json

# # 增加
# body = json.dumps({u"name": u"feed the api"})
# url = u"http://localhost:8080/todos/"
# r = requests.post(url=url, data=body)
# print(u'增加', r.content)
# # 修改
# body = json.dumps({u"name": u"feed the api", u"finished": u"true"})
# url = u"http://localhost:8080/todos/2"
# r = requests.patch(url=url, data=body)
# print(u'修改', r.status_code)
# # 获取
# url = u"http://localhost:8080/todos/"
# r = requests.get(url=url)
# print(u'所有代办事项为', r.content)
# # 删除
# url = u"http://localhost:8080/todos/2"
# r = requests.delete(url=url)
# r.status_code
# print(u'删除', r.status_code)
上一篇下一篇

猜你喜欢

热点阅读