Web开发java后端

应用Flask框架设计RESTFUL API接口

2018-08-05  本文已影响551人  plutoese

笔记

RESTful架构风格概述

RESTful架构风格

RESTful架构风格最初由Roy T. Fielding(HTTP/1.1协议专家组负责人)在其2000年的博士学位论文中提出。HTTP就是该架构风格的一个典型应用。

REST即Representational State Transfer的缩写,可译为"表现层状态转化”。REST最大的几个特点为:资源、统一接口、URI和无状态。

RESTful架构风格的特点

资源

所谓"资源",就是网络上的一个实体,或者说是网络上的一个具体信息。它可以是一段文本、一张图片、一首歌曲、一种服务,总之就是一个具体的实在。资源总要通过某种载体反应其内容,文本可以用txt格式表现,也可以用HTML格式、XML格式表现,甚至可以采用二进制格式;图片可以用JPG格式表现,也可以用PNG格式表现;JSON是现在最常用的资源表示格式。

统一接口

RESTful架构风格规定,数据的元操作,即CRUD(create, read, update和delete,即数据的增删查改)操作,分别对应于HTTP方法:GET用来获取资源,POST用来新建资源(也可以用于更新资源),PUT用来更新资源,DELETE用来删除资源,这样就统一了数据操作的接口,仅通过HTTP方法,就可以完成对数据的所有增删查改工作。

URI

可以用一个URI(统一资源定位符)指向资源,即每个URI都对应一个特定的资源。要获取这个资源,访问它的URI就可以,因此URI就成了每一个资源的地址或识别符。

一般的,每个资源至少有一个URI与之对应,最典型的URI即URL。

无状态

所谓无状态的,即所有的资源,都可以通过URI定位,而且这个定位与其他资源无关,也不会因为其他资源的变化而改变。有状态和无状态的区别,举个简单的例子说明一下。如查询员工的工资,如果查询工资是需要登录系统,进入查询工资的页面,执行相关操作后,获取工资的多少,则这种情况是有状态的,因为查询工资的每一步操作都依赖于前一步操作,只要前置操作不成功,后续操作就无法执行;如果输入一个url即可得到指定员工的工资,则这种情况是无状态的,因为获取工资不依赖于其他资源或状态,且这种情况下,员工工资是一个资源,由一个url与之对应,可以通过HTTP中的GET方法得到资源,这是典型的RESTful风格。

ROA

ROA即Resource Oriented Architecture,RESTful 架构风格的服务是围绕资源展开的,是典型的ROA架构。RESTful 架构风格的服务通常被称之为ROA架构。

RESTful风格的服务,由于可以直接以json或xml为载体承载数据,以HTTP方法为统一接口完成数据操作,客户端的开发不依赖于服务实现的技术,移动终端也可以轻松使用服务,这也加剧了REST取代RPC成为web service的主导。

认证机制

认证机制基本上是通用的,常用的认证机制包括 session auth(即通过用户名密码登录),basic auth,token auth和OAuth,服务开发中常用的认证机制为后三者。

Basic Auth

Basic Auth是配合RESTful API 使用的最简单的认证方式,只需提供用户名密码即可,但由于有把用户名密码暴露给第三方客户端的风险,在生产环境下被使用的越来越少。

Token Auth

Token Auth并不常用,它与Basic Auth的区别是,不将用户名和密码发送给服务器做用户认证,而是向服务器发送一个事先在服务器端生成的token来做认证。因此Token Auth要求服务器端要具备一套完整的Token创建和管理机制,该机制的实现会增加大量且非必须的服务器端开发工作,也不见得这套机制足够安全和通用,因此Token Auth用的并不多。

OAuth

OAuth(开放授权)是一个开放的授权标准,允许用户让第三方应用访问该用户在某一web服务上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。

OAuth允许用户提供一个令牌,而不是用户名和密码来访问他们存放在特定服务提供者的数据。每一个令牌授权一个特定的第三方系统(例如,视频编辑网站)在特定的时段(例如,接下来的2小时内)内访问特定的资源(例如仅仅是某一相册中的视频)。这样,OAuth让用户可以授权第三方网站访问他们存储在另外服务提供者的某些特定信息,而非所有内容。

正是由于OAUTH的严谨性和安全性,现在OAUTH已成为RESTful架构风格中最常用的认证机制,和RESTful架构风格一起,成为企业级服务的标配。

RESTful API 编写指南

Request和Response

RESTful API的开发和使用,无非是客户端向服务器发请求(request),以及服务器对客户端请求的响应(response)。

响应这些request时,常用的Response要包含的数据和状态码(status code)

最后,关于Request 和 Response,不要忽略了http header中的Content-Type。以json为例,如果API要求客户端发送request时要传入json数据,则服务器端仅做好json数据的获取和解析即可,但如果服务端支持多种类型数据的传入,如同时支持json和form-data,则要根据客户端发送请求时header中的Content-Type,对不同类型是数据分别实现获取和解析;如果API响应客户端请求后,需要返回json数据,需要在header中添加Content-Type=application/json。

Serialization和Deserialization

Serialization和Deserialization即序列化和反序列化。RESTful API以规范统一的格式作为数据的载体,常用的格式为json或xml,以json格式为例,当客户端向服务器发请求时,或者服务器相应客户端的请求,向客户端返回数据时,都是传输json格式的文本,而在服务器内部,数据处理时基本不用json格式的字符串,而是native类型的数据,最典型的如类的实例,即对象(object),json仅为服务器和客户端通信时,在网络上传输的数据的格式,服务器和客户端内部,均存在将json转为native类型数据和将native类型数据转为json的需求,其中,将native类型数据转为json即为序列化,将json转为native类型数据即为反序列化。

我们在开发RESTful API时,没必要制造重复的轮子,选一个好用的库即可,如python中的marshmallow。

Validation

Validation即数据校验,是开发健壮RESTful API中另一个重要的一环。仍以json为例,当客户端向服务器发出post或put请求时,通常会同时给服务器发送json格式的相关数据,服务器在做数据处理之前,先做数据校验,是最合理和安全的前后端交互。如果客户端发送的数据不正确或不合理,服务器端经过校验后直接向客户端返回400错误及相应的数据错误信息即可。常见的数据校验包括:

Authentication和Permission

Authentication指用户认证,Permission指权限机制,这两点是使RESTful API 强大、灵活和安全的基本保障。

URL Rules

Version your API

规范的API应该包含版本信息,在RESTful API中,最简单的包含版本的方法是将版本信息放到url中,如:

/api/v1/posts/
/api/v1/drafts/

/api/v2/posts/
/api/v2/drafts/

另一种优雅的做法是,使用HTTP header中的accept来传递版本信息,这也是GitHub API 采取的策略。

Use nouns, not verbs

RESTful API 中的url是指向资源的,而不是描述行为的,因此设计API时,应使用名词而非动词来描述语义,否则会引起混淆和语义不清。

# Bad APIs
/api/getArticle/1/
/api/updateArticle/1/
/api/deleteArticle/1/

上面四个url都是指向同一个资源的,虽然一个资源允许多个url指向它,但不同的url应该表达不同的语义,上面的API可以优化为:

# Good APIs
/api/Article/1/

article 资源的获取、更新和删除分别通过 GET, PUT 和 DELETE方法请求API即可。

Nested resources routing

如果要获取一个资源子集,采用 nested routing 是一个优雅的方式,如,列出所有文章中属于Gevin编写的文章

# List Gevin's articles
/api/authors/gevin/articles/

获取资源子集的另一种方式是基于filter,这两种方式都符合规范,但语义不同:如果语义上将资源子集看作一个独立的资源集合,则使用 nested routing 感觉更恰当,如果资源子集的获取是出于过滤的目的,则使用filter更恰当。

Filter

对于资源集合,可以通过url参数对资源进行过滤,如:

# List Gevin's articles
/api/articles?author=gevin

Pagination

对于资源集合,分页获取是一种比较合理的方式。

Gevin的策略是,返回资源集合是,包含与分页有关的数据如下:

{
  "page": 1,            # 当前是第几页
  "pages": 3,           # 总共多少页
  "per_page": 10,       # 每页多少数据
  "has_next": true,     # 是否有下一页数据
  "has_prev": false,    # 是否有前一页数据
  "total": 27           # 总共多少数据
}

当想API请求资源集合时,可选的分页参数为:

参数 含义
page 当前是第几页,默认为1
per_page 每页多少条记录,默认为系统默认值

另外,系统内还设置一个per_page_max字段,用于标记系统允许的每页最大记录数,当per_page值大于 per_page_max 值时,每页记录条数为 per_page_max。

Url design tricks

Flask RESTful API 开发基础

Flask对HTTP方法的支持

Flask原生支持所有的HTTP方法

@app.route('/http-method-test/', methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE'])
def http_method_example():
    if request.method == 'GET':
        return 'Send request with `GET` method'
    elif request.method == 'POST':
        return 'Send request with `POST` method'
    elif request.method == 'PUT':
        return 'Send request with `PUT` method'
    elif request.method == 'PATCH':
        return 'Send request with `PATCH` method'
    elif request.method == 'DELETE':
        return 'Send request with `DELETE` method'

另外一种方式是采用Flask的MethodView

class HttpMethodExample(MethodView):
    def get(self):
        return 'Send request with `GET` method'

    def post(self):
        return 'Send request with `POST` method'

    def put(self):
        return 'Send request with `PUT` method'

    def patch(self):
        return 'Send request with `PATCH` method'

    def delete(self):
        return 'Send request with `DELETE` method'

app.add_url_rule('/http-method-test2/', view_func=HttpMethodExample.as_view('http_method_example2'))

Flask对序列化与反序列化的支持

序列化

RESTful API开发中的序列化,通过包含了以下操作:

这两步操作,Flask提供的一个快捷函数jsonify()能直接完成

class SerializationExample(MethodView):
    def get(self):
        option = request.args.get('option')

        if option == 'list1':
            return self.test_list()
        if option == 'list2':
            return self.test_list2()
        if option == 'dict1':
            return self.test_dict1()
        if option == 'dict2':
            return self.test_dict2()
        if option == 'dict3':
            return self.test_dict3()


        msg = {
            'info': '`option` is needed in url as a url parameter',
            'avilable option values': 'list1, list2, test_dict1, test_dict2, test_dict2'
        }
        return jsonify(msg)



    def test_list(self):
        data = [{'a':1, 'b':2}, {'c':3, 'd':4}]
        return jsonify(result=data)

    def test_list2(self):
        data = [1,2,3,4,5,6,7,8]
        return jsonify(data)


    def test_dict1(self):
        data = {'a':1, 'b':2, 'c':3}
        return jsonify(data)

    def test_dict2(self):
        data = {'a':1, 'b':2, 'c':3}
        return jsonify(**data)

    def test_dict3(self):
        data = {'a':1, 'b':2, 'c':3}
        return jsonify(result=data)

app.add_url_rule('/serialization/', view_func=SerializationExample.as_view('serialization'))

反序列化

反序列化,即把文本形式的数据转换为Python native类型数据的过程。在RESTful API开发时,Flask内置的get_json()方法,能够把request中的json数据,转换为Python标准库中的dict或list。

@app.route('/deserialization/', methods=['get', 'post'])
def deserialization():
    if request.method == 'POST':
        data = request.get_json()
        if not data:
            return 'No json data found', 400

        result = {
            'json data in request': data
        }
        return jsonify(result)

    return 'Please post json data'

Designing a RESTful API with Python and Flask

Implementing RESTful services in Python and Flask

Using the base Flask application we are now ready to implement the first entry point of our web service

from flask import Flask, jsonify

app = Flask(__name__)

tasks = [
    {
        'id': 1,
        'title': u'Buy groceries',
        'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
        'done': False
    },
    {
        'id': 2,
        'title': u'Learn Python',
        'description': u'Need to find a good Python tutorial on the web',
        'done': False
    }
]

@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
    return jsonify({'tasks': tasks})

if __name__ == '__main__':
    app.run(debug=True)
$ curl -i http://localhost:5000/todo/api/v1.0/tasks
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 294
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 04:53:53 GMT

{
  "tasks": [
    {
      "description": "Milk, Cheese, Pizza, Fruit, Tylenol",
      "done": false,
      "id": 1,
      "title": "Buy groceries"
    },
    {
      "description": "Need to find a good Python tutorial on the web",
      "done": false,
      "id": 2,
      "title": "Learn Python"
    }
  ]
}

Now let's write the second version of the GET method for our tasks resource.

from flask import abort

@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
    task = [task for task in tasks if task['id'] == task_id]
    if len(task) == 0:
        abort(404)
    return jsonify({'task': task[0]})

Here is how this function looks when invoked from curl.

$ curl -i http://localhost:5000/todo/api/v1.0/tasks/2
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 151
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 05:21:50 GMT

{
  "task": {
    "description": "Need to find a good Python tutorial on the web",
    "done": false,
    "id": 2,
    "title": "Learn Python"
  }
}
$ curl -i http://localhost:5000/todo/api/v1.0/tasks/3
HTTP/1.0 404 NOT FOUND
Content-Type: text/html
Content-Length: 238
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 05:21:52 GMT

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>404 Not Found</title>
<h1>Not Found</h1>
<p>The requested URL was not found on the server.</p><p>If you     entered the URL manually please check your spelling and try again.</p>

We need to improve our 404 error handler.

from flask import make_response

@app.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'error': 'Not found'}), 404)

And we get a much more API friendly error response

$ curl -i http://localhost:5000/todo/api/v1.0/tasks/3
HTTP/1.0 404 NOT FOUND
Content-Type: application/json
Content-Length: 26
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 05:36:54 GMT

{
  "error": "Not found"
}

Next in our list is the POST method, which we will use to insert a new item in our task database.

from flask import request

@app.route('/todo/api/v1.0/tasks', methods=['POST'])
def create_task():
    if not request.json or not 'title' in request.json:
        abort(400)
    task = {
        'id': tasks[-1]['id'] + 1,
        'title': request.json['title'],
        'description': request.json.get('description', ""),
        'done': False
    }
    tasks.append(task)
    return jsonify({'task': task}), 201

To test this new function we can use the following curl command.

$ curl -i -H "Content-Type: application/json" -X POST -d '{"title":"Read a book"}' http://localhost:5000/todo/api/v1.0/tasks
HTTP/1.0 201 Created
Content-Type: application/json
Content-Length: 104
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 05:56:21 GMT

{
  "task": {
    "description": "",
    "done": false,
    "id": 3,
    "title": "Read a book"
  }
}

The remaining two functions of our web service are shown below.

@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
    task = [task for task in tasks if task['id'] == task_id]
    if len(task) == 0:
        abort(404)
    if not request.json:
        abort(400)
    if 'title' in request.json and type(request.json['title']) != unicode:
        abort(400)
    if 'description' in request.json and type(request.json['description']) is not unicode:
        abort(400)
    if 'done' in request.json and type(request.json['done']) is not bool:
        abort(400)
    task[0]['title'] = request.json.get('title', task[0]['title'])
    task[0]['description'] = request.json.get('description', task[0]['description'])
    task[0]['done'] = request.json.get('done', task[0]['done'])
    return jsonify({'task': task[0]})

@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):
    task = [task for task in tasks if task['id'] == task_id]
    if len(task) == 0:
        abort(404)
    tasks.remove(task[0])
    return jsonify({'result': True})

A function call that updates task #2 as being done would be done as follows.

$ curl -i -H "Content-Type: application/json" -X PUT -d '{"done":true}' http://localhost:5000/todo/api/v1.0/tasks/2
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 170
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 07:10:16 GMT

{
  "task": [
    {
      "description": "Need to find a good Python tutorial on the web",
      "done": true,
      "id": 2,
      "title": "Learn Python"
    }
  ]
}

Securing a RESTful web service

The easiest way to secure our web service is to require clients to provide a username and a password.

There is a small Flask extension that can help with this, written by no other than yours truly. So let's go ahead and install Flask-HTTPAuth.

Let's say we want our web service to only be accessible to username miguel and password python. We can setup a Basic HTTP authentication as follows.

from flask_httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()

@auth.get_password
def get_password(username):
    if username == 'miguel':
        return 'python'
    return None

@auth.error_handler
def unauthorized():
    return make_response(jsonify({'error': 'Unauthorized access'}), 401)

With the authentication system setup, all that is left is to indicate which functions need to be protected, by adding the @auth.login_required decorator.

@app.route('/todo/api/v1.0/tasks', methods=['GET'])
@auth.login_required
def get_tasks():
    return jsonify({'tasks': tasks})

If we now try to invoke this function with curl this is what we get.

$ curl -i http://localhost:5000/todo/api/v1.0/tasks
HTTP/1.0 401 UNAUTHORIZED
Content-Type: application/json
Content-Length: 36
WWW-Authenticate: Basic realm="Authentication Required"
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 06:41:14 GMT

{
  "error": "Unauthorized access"
}

To be able to invoke this function we have to send our credentials.

$ curl -u miguel:python -i http://localhost:5000/todo/api/v1.0/tasks
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 316
Server: Werkzeug/0.8.3 Python/2.7.3
Date: Mon, 20 May 2013 06:46:45 GMT

{
  "tasks": [
    {
      "title": "Buy groceries",
      "done": false,
      "description": "Milk, Cheese, Pizza, Fruit, Tylenol",
      "uri": "http://localhost:5000/todo/api/v1.0/tasks/1"
    },
    {
      "title": "Learn Python",
      "done": false,
      "description": "Need to find a good Python tutorial on the web",
      "uri": "http://localhost:5000/todo/api/v1.0/tasks/2"
    }
  ]
}

使用 Flask-RESTful 设计 RESTful API

RESTful 服务器

[图片上传失败...(image-fb5a62-1533821208427)]

这个服务唯一的资源叫做“任务”,它有如下一些属性:

路由

Flask-RESTful 提供了一个 Resource 基础类,它能够定义一个给定 URL 的一个或者多个 HTTP 方法。例如,定义一个可以使用 HTTP 的 GET, PUT 以及 DELETE 方法的 User 资源,你的代码可以如下.

from flask import Flask
from flask.ext.restful import Api, Resource

app = Flask(__name__)
api = Api(app)

class UserAPI(Resource):
    def get(self, id):
        pass

    def put(self, id):
        pass

    def delete(self, id):
        pass

api.add_resource(UserAPI, '/users/<int:id>', endpoint = 'user')

add_resource 函数使用指定的 endpoint 注册路由到框架上。如果没有指定 endpoint,Flask-RESTful 会根据类名生成一个,但是有时候有些函数比如 url_for 需要 endpoint,因此我会明确给 endpoint 赋值。

我的待办事项 API 定义两个 URLs:/todo/api/v1.0/tasks(获取所有任务列表),以及 /todo/api/v1.0/tasks/<int:id>(获取单个任务)。我们现在需要两个资源

class TaskListAPI(Resource):
    def get(self):
        pass

    def post(self):
        pass

class TaskAPI(Resource):
    def get(self, id):
        pass

    def put(self, id):
        pass

    def delete(self, id):
        pass

api.add_resource(TaskListAPI, '/todo/api/v1.0/tasks', endpoint = 'tasks')
api.add_resource(TaskAPI, '/todo/api/v1.0/tasks/<int:id>', endpoint = 'task')

解析以及验证请求

当我在以前的文章中实现此服务器的时候,我自己对请求的数据进行验证。

@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods = ['PUT'])
@auth.login_required
def update_task(task_id):
    task = filter(lambda t: t['id'] == task_id, tasks)
    if len(task) == 0:
        abort(404)
    if not request.json:
        abort(400)
    if 'title' in request.json and type(request.json['title']) != unicode:
        abort(400)
    if 'description' in request.json and type(request.json['description']) is not unicode:
        abort(400)
    if 'done' in request.json and type(request.json['done']) is not bool:
        abort(400)
    task[0]['title'] = request.json.get('title', task[0]['title'])
    task[0]['description'] = request.json.get('description', task[0]['description'])
    task[0]['done'] = request.json.get('done', task[0]['done'])
    return jsonify( { 'task': make_public_task(task[0]) } )

在这里, 我必须确保请求中给出的数据在使用之前是有效,这样使得函数变得又臭又长。

Flask-RESTful 提供了一个更好的方式来处理数据验证,它叫做 RequestParser 类。这个类工作方式类似命令行解析工具 argparse。

首先,对于每一个资源需要定义参数以及怎样验证它们.

from flask.ext.restful import reqparse

class TaskListAPI(Resource):
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('title', type = str, required = True,
            help = 'No task title provided', location = 'json')
        self.reqparse.add_argument('description', type = str, default = "", location = 'json')
        super(TaskListAPI, self).__init__()

    # ...

class TaskAPI(Resource):
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('title', type = str, location = 'json')
        self.reqparse.add_argument('description', type = str, location = 'json')
        self.reqparse.add_argument('done', type = bool, location = 'json')
        super(TaskAPI, self).__init__()

    # ...

当请求解析器被初始化,解析和验证一个请求是很容易的。 例如,请注意 TaskAPI.put() 方法变的多么地简单.

def put(self, id):
    task = filter(lambda t: t['id'] == id, tasks)
    if len(task) == 0:
        abort(404)
    task = task[0]
    args = self.reqparse.parse_args()
    for k, v in args.iteritems():
        if v != None:
            task[k] = v
    return jsonify( { 'task': make_public_task(task) } )

使用 Flask-RESTful 来处理验证的另一个好处就是没有必要单独地处理类似 HTTP 400 错误,Flask-RESTful 会来处理这些。

生成响应

原来设计的 REST 服务器使用 Flask 的 jsonify 函数来生成响应。Flask-RESTful 会自动地处理转换成 JSON 数据格式.

return { 'task': make_public_task(task) }

Flask-RESTful 也支持自定义状态码,如果有必要的话.

return { 'task': make_public_task(task) }, 201

Flask-RESTful 还有更多的功能。make_public_task 能够把来自原始服务器上的任务从内部形式包装成客户端想要的外部形式。最典型的就是把任务的 id 转成 uri。Flask-RESTful 就提供一个辅助函数能够很优雅地做到这样的转换,不仅仅能够把 id 转成 uri 并且能够转换其他的参数.

from flask.ext.restful import fields, marshal

task_fields = {
    'title': fields.String,
    'description': fields.String,
    'done': fields.Boolean,
    'uri': fields.Url('task')
}

class TaskAPI(Resource):
    # ...

    def put(self, id):
        # ...
        return { 'task': marshal(task, task_fields) }

task_fields 结构用于作为 marshal 函数的模板。fields.Uri 是一个用于生成一个 URL 的特定的参数。 它需要的参数是 endpoint。

使用Flask设计RESTful的认证

用户数据库

用户的数据库模型是十分简单的。对于每一个用户,username 和 password_hash 将会被存储.

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key = True)
    username = db.Column(db.String(32), index = True)
    password_hash = db.Column(db.String(128))

出于安全原因,用户的原始密码将不被存储,密码在注册时被散列后存储到数据库中。使用散列密码的话,如果用户数据库不小心落入恶意攻击者的手里,他们也很难从散列中解析到真实的密码。

密码决不能很明确地存储在用户数据库中。

密码散列

为了创建密码散列,我将会使用 PassLib 库,一个专门用于密码散列的 Python 包。

PassLib 提供了多种散列算法供选择。custom_app_context 是一个易于使用的基于 sha256_crypt 的散列算法。

User 用户模型需要增加两个新方法来增加密码散列和密码验证功能.

from passlib.apps import custom_app_context as pwd_context

    class User(db.Model):
        # ...

        def hash_password(self, password):
            self.password_hash = pwd_context.encrypt(password)

        def verify_password(self, password):
            return pwd_context.verify(password, self.password_hash)

用户注册

个客户端可以使用 POST 请求到 /api/users 上注册一个新用户。请求的主体必须是一个包含 username 和 password 的 JSON 格式的对象。

@app.route('/api/users', methods = ['POST'])
def new_user():
    username = request.json.get('username')
    password = request.json.get('password')
    if username is None or password is None:
        abort(400) # missing arguments
    if User.query.filter_by(username = username).first() is not None:
        abort(400) # existing user
    user = User(username = username)
    user.hash_password(password)
    db.session.add(user)
    db.session.commit()
    return jsonify({ 'username': user.username }), 201, {'Location': url_for('get_user', id = user.id, _external = True)}

这个函数是十分简单地。参数 username 和 password 是从请求中携带的 JSON 数据中获取,接着验证它们。

如果参数通过验证的话,新的 User 实例被创建。username 赋予给 User,接着使用 hash_password 方法散列密码。用户最终被写入数据库中。

这里是一个用户注册的请求,发送自 curl.

$ curl -i -X POST -H "Content-Type: application/json" -d '{"username":"miguel","password":"python"}' http://127.0.0.1:5000/api/users
HTTP/1.0 201 CREATED
Content-Type: application/json
Content-Length: 27
Location: http://127.0.0.1:5000/api/users/1
Server: Werkzeug/0.9.4 Python/2.7.3
Date: Thu, 28 Nov 2013 19:56:39 GMT

{
  "username": "miguel"
}

需要注意地是在真实的应用中这里可能会使用安全的的 HTTP (譬如:HTTPS)。如果用户登录的凭证是通过明文在网络传输的话,任何对 API 的保护措施是毫无意义的。

基于密码的认证

现在我们假设存在一个资源通过一个 API 暴露给那些必须注册的用户。这个资源是通过 URL: /api/resource 能够访问到。

为了保护这个资源,我们将使用 HTTP 基本身份认证,但是不是自己编写完整的代码来实现它,而是让 Flask-HTTPAuth 扩展来为我们做。

使用 Flask-HTTPAuth,通过添加 login_required 装饰器可以要求相应的路由必须进行认证。

rom flask.ext.httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()

@app.route('/api/resource')
@auth.login_required
def get_resource():
    return jsonify({ 'data': 'Hello, %s!' % g.user.username })

能够提供最大自由度的选择(可能这也是唯一兼容 PassLib 散列)就是选用 verify_password 回调函数,这个回调函数将会根据提供的 username 和 password 的组合的,返回 True(通过验证) 或者 Flase(未通过验证)。Flask-HTTPAuth 将会在需要验证 username 和 password 对的时候调用这个回调函数。

verify_password 回调函数的实现如下

@auth.verify_password
def verify_password(username, password):
    user = User.query.filter_by(username = username).first()
    if not user or not user.verify_password(password):
        return False
    g.user = user
    return True

这个函数将会根据 username 找到用户,并且使用 verify_password() 方法验证密码。如果认证通过的话,用户对象将会被存储在 Flask 的 g 对象中,这样视图就能使用它。

这里是用 curl 请求只允许注册用户获取的保护资源。

$ curl -u miguel:python -i -X GET http://127.0.0.1:5000/api/resource
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 30
Server: Werkzeug/0.9.4 Python/2.7.3
Date: Thu, 28 Nov 2013 20:02:25 GMT

{
  "data": "Hello, miguel!"
}

基于令牌的认证

每次请求必须发送 username 和 password 是十分不方便,即使是通过安全的 HTTP 传输的话还是存在风险,因为客户端必须要存储不加密的认证凭证,这样才能在每次请求中发送。

一种基于之前解决方案的优化就是使用令牌来验证请求。

我们的想法是客户端应用程序使用认证凭证交换了认证令牌,接下来的请求只发送认证令牌。

令牌是具有有效时间,过了有效时间后,令牌变成无效,需要重新获取新的令牌。令牌的潜在风险在于生成令牌的算法比较弱,但是有效期较短可以减少风险。

有很多的方法可以加强令牌。一个简单的强化方式就是根据存储在数据库中的用户以及密码生成一个随机的特定长度的字符串,可能过期日期也在里面。令牌就变成了明文密码的重排,这样就能很容易地进行字符串对比,还能对过期日期进行检查。

更加完善的实现就是不需要服务器端进行任何存储操作,使用加密的签名作为令牌。这种方式有很多的优点,能够根据用户信息生成相关的签名,并且很难被篡改。

Flask 使用类似的方式处理 cookies 的。这个实现依赖于一个叫做 itsdangerous 的库,我们这里也会采用它。

令牌的生成以及验证将会被添加到 User 模型中,其具体实现如下。

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

class User(db.Model):
    # ...

    def generate_auth_token(self, expiration = 600):
        s = Serializer(app.config['SECRET_KEY'], expires_in = expiration)
        return s.dumps({ 'id': self.id })

    @staticmethod
    def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None # valid token, but expired
        except BadSignature:
            return None # invalid token
        user = User.query.get(data['id'])
        return user

generate_auth_token() 方法生成一个以用户 id 值为值,’id’ 为关键字的字典的加密令牌。令牌中同时加入了一个过期时间,默认为十分钟(600 秒)。

验证令牌是在 verify_auth_token() 静态方法中实现的。静态方法被使用在这里,是因为一旦令牌被解码了用户才可得知。如果令牌被解码了,相应的用户将会被查询出来并且返回。

API 需要一个获取令牌的新函数,这样客户端才能申请到令牌。

@app.route('/api/token')
@auth.login_required
def get_auth_token():
    token = g.user.generate_auth_token()
    return jsonify({ 'token': token.decode('ascii') })

注意:这个函数是使用了 auth.login_required 装饰器,也就是说需要提供 username 和 password。

剩下来的就是决策客户端怎样在请求中包含这个令牌。

HTTP 基本认证方式不特别要求 usernames 和 passwords 用于认证,在 HTTP 头中这两个字段可以用于任何类型的认证信息。基于令牌的认证,令牌可以作为 username 字段,password 字段可以忽略。

这就意味着服务器需要同时处理 username 和 password 作为认证,以及令牌作为 username 的认证方式。verify_password 回调函数需要同时支持这两种方式。

@auth.verify_password
def verify_password(username_or_token, password):
    # first try to authenticate by token
    user = User.verify_auth_token(username_or_token)
    if not user:
        # try to authenticate with username/password
        user = User.query.filter_by(username = username_or_token).first()
        if not user or not user.verify_password(password):
            return False
    g.user = user
    return True

新版的 verify_password 回调函数会尝试认证两次。首先它会把 username 参数作为令牌进行认证。如果没有验证通过的话,就会像基于密码认证的一样,验证 username 和 password。

如下的 curl 请求能够获取一个认证的令牌。

$ curl -u miguel:python -i -X GET http://127.0.0.1:5000/api/token
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 139
Server: Werkzeug/0.9.4 Python/2.7.3
Date: Thu, 28 Nov 2013 20:04:15 GMT

{
  "token": "eyJhbGciOiJIUzI1NiIsImV4cCI6MTM4NTY2OTY1NSwiaWF0IjoxMzg1NjY5MDU1fQ.eyJpZCI6MX0.XbOEFJkhjHJ5uRINh2JA1BPzXjSohKYDRT472wGOvjc"
}

现在可以使用令牌获取资源。

$ curl -u eyJhbGciOiJIUzI1NiIsImV4cCI6MTM4NTY2OTY1NSwiaWF0IjoxMzg1NjY5MDU1fQ.eyJpZCI6MX0.XbOEFJkhjHJ5uRINh2JA1BPzXjSohKYDRT472wGOvjc:unused -i -X GET http://127.0.0.1:5000/api/resource
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 30
Server: Werkzeug/0.9.4 Python/2.7.3
Date: Thu, 28 Nov 2013 20:05:08 GMT

{
  "data": "Hello, miguel!"
}

How to structure a Flask-RESTPlus web service for production builds

Flask-RESTPlus

Flask-RESTPlus is an extension for Flask that adds support for quickly building REST APIs. Flask-RESTPlus encourages best practices with minimal setup. It provides a coherent collection of decorators and tools to describe your API and expose its documentation properly (using Swagger).

Project Setup and Organization

In the project directory, create a new package called app. Inside app, create two packages main and test. Your directory structure should look similar to the one below.

Inside the main package, create three more packages namely: controller, service and model. The model package will contain all of our database models while the service package will contain all the business logic of our application and finally the controller package will contain all our application endpoints.

[图片上传失败...(image-1dc2bf-1533821208427)]

Configuration Settings

In the main package create a file called config.py with the following content.

import os

# uncomment the line below for postgres database url from environment variable
# postgres_local_base = os.environ['DATABASE_URL']

basedir = os.path.abspath(os.path.dirname(__file__))


class Config:
    SECRET_KEY = os.getenv('SECRET_KEY', 'my_precious_secret_key')
    DEBUG = False


class DevelopmentConfig(Config):
    # uncomment the line below to use postgres
    # SQLALCHEMY_DATABASE_URI = postgres_local_base
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_main.db')
    SQLALCHEMY_TRACK_MODIFICATIONS = False


class TestingConfig(Config):
    DEBUG = True
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_test.db')
    PRESERVE_CONTEXT_ON_EXCEPTION = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False


class ProductionConfig(Config):
    DEBUG = False
    # uncomment the line below to use postgres
    # SQLALCHEMY_DATABASE_URI = postgres_local_base


config_by_name = dict(
    dev=DevelopmentConfig,
    test=TestingConfig,
    prod=ProductionConfig
)

key = Config.SECRET_KEY

The configuration file contains three environment setup classes which includes testing, development, and production.

We will be using the application factory pattern for creating our Flask object. This pattern is most useful for creating multiple instances of our application with different settings. This facilitates the ease at which we switch between our testing, development and production environment by calling the create_app function with the required parameter.

In the __init__.py file inside the main package, enter the following lines of code.

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt

from .config import config_by_name

db = SQLAlchemy()
flask_bcrypt = Bcrypt()


def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config_by_name[config_name])
    db.init_app(app)
    flask_bcrypt.init_app(app)

    return app

Flask Script

Now let’s create our application entry point. In the root directory of the project, create a file called manage.py with the following content.

import os
import unittest

from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager

from app import blueprint
from app.main import create_app, db
from app.main.model import user, blacklist

app = create_app(os.getenv('BOILERPLATE_ENV') or 'dev')
app.register_blueprint(blueprint)

app.app_context().push()

manager = Manager(app)

migrate = Migrate(app, db)

manager.add_command('db', MigrateCommand)


@manager.command
def run():
    app.run()


@manager.command
def test():
    """Runs the unit tests."""
    tests = unittest.TestLoader().discover('app/test', pattern='test*.py')
    result = unittest.TextTestRunner(verbosity=2).run(tests)
    if result.wasSuccessful():
        return 0
    return 1

if __name__ == '__main__':
    manager.run()

The above code within manage.py does the following

Flask-Migrate exposes two classes, Migrate and MigrateCommand. The Migrateclass contains all the functionality of the extension. The MigrateCommand class is only used when it is desired to expose database migration commands through the Flask-Script extension.

Database Models and Migration

Now let’s create our models. We will be using the db instance of the sqlalchemy to create our models.

The db instance contains all the functions and helpers from both sqlalchemy and sqlalchemy.orm and it provides a class called Model that is a declarative base which can be used to declare models.

In the model package, create a file called user.py with the following content.


from .. import db, flask_bcrypt
import datetime
from app.main.model.blacklist import BlacklistToken
from ..config import key
import jwt


class User(db.Model):
    """ User Model for storing user related details """
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    email = db.Column(db.String(255), unique=True, nullable=False)
    registered_on = db.Column(db.DateTime, nullable=False)
    admin = db.Column(db.Boolean, nullable=False, default=False)
    public_id = db.Column(db.String(100), unique=True)
    username = db.Column(db.String(50), unique=True)
    password_hash = db.Column(db.String(100))

    @property
    def password(self):
        raise AttributeError('password: write-only field')

    @password.setter
    def password(self, password):
        self.password_hash = flask_bcrypt.generate_password_hash(password).decode('utf-8')

    def check_password(self, password):
        return flask_bcrypt.check_password_hash(self.password_hash, password)


    def encode_auth_token(self, user_id):
        """
        Generates the Auth Token
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                key,
                algorithm='HS256'
            )
        except Exception as e:
            return e

    @staticmethod
    def decode_auth_token(auth_token):
        """
        Decodes the auth token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, key)
            is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
            if is_blacklisted_token:
                return 'Token blacklisted. Please log in again.'
            else:
                return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'

    def __repr__(self):
        return "<User '{}'>".format(self.username)

The above code within user.py does the following

Now to generate the database table from the user model we just created, we will use migrateCommand through the manager interface. For managerto detect our models, we will have to import theuser model by adding below code to manage.py file.

from app.main.model import user

Testing

Configuration

Create a file called test_config.py in the test package with the content below.

import os
import unittest

from flask import current_app
from flask_testing import TestCase

from manage import app
from app.main.config import basedir


class TestDevelopmentConfig(TestCase):
    def create_app(self):
        app.config.from_object('app.main.config.DevelopmentConfig')
        return app

    def test_app_is_development(self):
        self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
        self.assertTrue(app.config['DEBUG'] is True)
        self.assertFalse(current_app is None)
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_main.db')
        )


class TestTestingConfig(TestCase):
    def create_app(self):
        app.config.from_object('app.main.config.TestingConfig')
        return app

    def test_app_is_testing(self):
        self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
        self.assertTrue(app.config['DEBUG'])
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_test.db')
        )


class TestProductionConfig(TestCase):
    def create_app(self):
        app.config.from_object('app.main.config.ProductionConfig')
        return app

    def test_app_is_production(self):
        self.assertTrue(app.config['DEBUG'] is False)


if __name__ == '__main__':
    unittest.main()

Run the test using the command below

python manage.py test

User Operations

Now let’s work on the following user related operations:

User Service class: This class handles all the logic relating to the user model.

In the service package, create a new file user_service.py with the following content

import uuid
import datetime

from app.main import db
from app.main.model.user import User


def save_new_user(data):
    user = User.query.filter_by(email=data['email']).first()
    if not user:
        new_user = User(
            public_id=str(uuid.uuid4()),
            email=data['email'],
            username=data['username'],
            password=data['password'],
            registered_on=datetime.datetime.utcnow()
        )
        save_changes(new_user)
        return generate_token(new_user)
    else:
        response_object = {
            'status': 'fail',
            'message': 'User already exists. Please Log in.',
        }
        return response_object, 409


def get_all_users():
    return User.query.all()


def get_a_user(public_id):
    return User.query.filter_by(public_id=public_id).first()


def generate_token(user):
    try:
        # generate the auth token
        auth_token = user.encode_auth_token(user.id)
        response_object = {
            'status': 'success',
            'message': 'Successfully registered.',
            'Authorization': auth_token.decode()
        }
        return response_object, 201
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': 'Some error occurred. Please try again.'
        }
        return response_object, 401


def save_changes(data):
    db.session.add(data)
    db.session.commit()

The above code within user_service.py does the following

No need to use jsonify for formatting an object to JSON, Flask-restplus does it automatically

In the main package, create a new package called util . This package will contain all the necessary utilities we might need in our application.

In the util package, create a new file dto.py. As the name implies, the data transfer object (DTO) will be responsible for carrying data between processes. In our own case, it will be used for marshaling data for our API calls. We will understand this better as we proceed.

from flask_restplus import Namespace, fields


class UserDto:
    api = Namespace('user', description='user related operations')
    user = api.model('user', {
        'email': fields.String(required=True, description='user email address'),
        'username': fields.String(required=True, description='user username'),
        'password': fields.String(required=True, description='user password'),
        'public_id': fields.String(description='user Identifier')
    })


class AuthDto:
    api = Namespace('auth', description='authentication related operations')
    user_auth = api.model('auth_details', {
        'email': fields.String(required=True, description='The email address'),
        'password': fields.String(required=True, description='The user password '),
    })

The above code within dto.py does the following

User Controller: The user controller class handles all the incoming HTTP requests relating to the user.

Under the controller package, create a new file called user_controller.py with the following content.

from flask import request
from flask_restplus import Resource

from app.main.util.decorator import admin_token_required
from ..util.dto import UserDto
from ..service.user_service import save_new_user, get_all_users, get_a_user

api = UserDto.api
_user = UserDto.user


@api.route('/')
class UserList(Resource):
    @api.doc('list_of_registered_users')
    @admin_token_required
    @api.marshal_list_with(_user, envelope='data')
    def get(self):
        """List all registered users"""
        return get_all_users()

    @api.expect(_user, validate=True)
    @api.response(201, 'User successfully created.')
    @api.doc('create a new user')
    def post(self):
        """Creates a new User """
        data = request.json
        return save_new_user(data=data)


@api.route('/<public_id>')
@api.param('public_id', 'The User identifier')
@api.response(404, 'User not found.')
class User(Resource):
    @api.doc('get a user')
    @api.marshal_with(_user)
    def get(self, public_id):
        """get a user given its identifier"""
        user = get_a_user(public_id)
        if not user:
            api.abort(404)
        else:
            return user

line 1 through 8 imports all the required resources for the user controller.

We defined two concrete classes in our user controller which are userList and user. These two classes extends the abstract flask-restplus resource.

Concrete resources should extend from this class and expose methods for each supported HTTP method. If a resource is invoked with an unsupported HTTP method, the API will return a response with status 405 Method Not Allowed. Otherwise the appropriate method is called and passed all arguments from the URL rule used when adding the resource to an API instance.

The api namespace in line 7 above provides the controller with several decorators which includes but is not limited to the following:

We have now defined our namespace with the user controller. Now its time to add it to the application entry point.

In the __init__.py file of app package, enter the following.

from flask_restplus import Api
from flask import Blueprint

from .main.controller.user_controller import api as user_ns
from .main.controller.auth_controller import api as auth_ns

blueprint = Blueprint('api', __name__)

api = Api(blueprint,
          title='FLASK RESTPLUS API BOILER-PLATE WITH JWT',
          version='1.0',
          description='a boilerplate for flask restplus web service'
          )

api.add_namespace(user_ns, path='/user')
api.add_namespace(auth_ns)

The above code within blueprint.py does the following.

We have now defined our blueprint. It’s time to register it on our Flask app.

Update manage.py by importing blueprint and registering it with the Flask application instance.

Now open the URL http://127.0.0.1:5000 in your browser. You should see the swagger documentation.

Security and Authentication

Let’s create a model blacklistToken for storing blacklisted tokens. In the models package, create a blacklist.py file with the following content.

from .. import db
import datetime


class BlacklistToken(db.Model):
    """
    Token Model for storing JWT tokens
    """
    __tablename__ = 'blacklist_tokens'

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    token = db.Column(db.String(500), unique=True, nullable=False)
    blacklisted_on = db.Column(db.DateTime, nullable=False)

    def __init__(self, token):
        self.token = token
        self.blacklisted_on = datetime.datetime.now()

    def __repr__(self):
        return '<id: token: {}'.format(self.token)

    @staticmethod
    def check_blacklist(auth_token):
        # check whether auth token has been blacklisted
        res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
        if res:
            return True
        else:
            return False

Lets not forget to migrate the changes to take effect on our database.

Import the blacklist class in manage.py.

from app.main.model import blacklist

Next create blacklist_service.py in the service package with the following content for blacklisting a token.


from app.main import db

from app.main.model.blacklist import BlacklistToken


def save_token(token):
    blacklist_token = BlacklistToken(token=token)
    try:
        # insert the token
        db.session.add(blacklist_token)
        db.session.commit()
        response_object = {
            'status': 'success',
            'message': 'Successfully logged out.'
        }
        return response_object, 200
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': e
        }
        return response_object, 200

Update the user model with two static methods for encoding and decoding tokens. Add the following imports.

import datetime
import jwt
from app.main.model.blacklist import BlacklistToken
from ..config import key

Encoding

def encode_auth_token(self, user_id):
        """
        Generates the Auth Token
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                key,
                algorithm='HS256'
            )
        except Exception as e:
            return e

Decoding: Blacklisted token, expired token and invalid token are taken into consideration while decoding the authentication token.

  @staticmethod
  def decode_auth_token(auth_token):
        """
        Decodes the auth token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, key)
            is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
            if is_blacklisted_token:
                return 'Token blacklisted. Please log in again.'
            else:
                return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'

Now let’s write a test for the user model to ensure that our encode and decode functions are working properly.

In the test package, create base.py file with the following content


from flask_testing import TestCase

from app.main import db
from manage import app


class BaseTestCase(TestCase):
    """ Base Tests """

    def create_app(self):
        app.config.from_object('app.main.config.TestingConfig')
        return app

    def setUp(self):
        db.create_all()
        db.session.commit()

    def tearDown(self):
        db.session.remove()
        db.drop_all()

The BaseTestCase sets up our test environment ready before and after every test case that extends it.

Create test_user_medol.py with the following test cases.

import unittest

import datetime

from app.main import db
from app.main.model.user import User
from app.test.base import BaseTestCase


class TestUserModel(BaseTestCase):

    def test_encode_auth_token(self):
        user = User(
            email='test@test.com',
            password='test',
            registered_on=datetime.datetime.utcnow()
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))

    def test_decode_auth_token(self):
        user = User(
            email='test@test.com',
            password='test',
            registered_on=datetime.datetime.utcnow()
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))
        self.assertTrue(User.decode_auth_token(auth_token.decode("utf-8") ) == 1)


if __name__ == '__main__':
    unittest.main()

Run the test with python manage.py test. All the tests should pass.

Let’s create the authentication endpoints for login and logout.

class AuthDto:
    api = Namespace('auth', description='authentication related operations')
    user_auth = api.model('auth_details', {
        'email': fields.String(required=True, description='The email address'),
        'password': fields.String(required=True, description='The user password '),
    })

When a user is logged out, the user’s token is blacklisted ie the user can’t log in again with that same token.

from app.main.model.user import User
from ..service.blacklist_service import save_token


class Auth:

    @staticmethod
    def login_user(data):
        try:
            # fetch the user data
            user = User.query.filter_by(email=data.get('email')).first()
            if user and user.check_password(data.get('password')):
                auth_token = user.encode_auth_token(user.id)
                if auth_token:
                    response_object = {
                        'status': 'success',
                        'message': 'Successfully logged in.',
                        'Authorization': auth_token.decode()
                    }
                    return response_object, 200
            else:
                response_object = {
                    'status': 'fail',
                    'message': 'email or password does not match.'
                }
                return response_object, 401

        except Exception as e:
            print(e)
            response_object = {
                'status': 'fail',
                'message': 'Try again'
            }
            return response_object, 500

    @staticmethod
    def logout_user(data):
        if data:
            auth_token = data.split(" ")[1]
        else:
            auth_token = ''
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                # mark the token as blacklisted
                return save_token(token=auth_token)
            else:
                response_object = {
                    'status': 'fail',
                    'message': resp
                }
                return response_object, 401
        else:
            response_object = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return response_object, 403

Let us now create endpoints for login and logout operations.

In the controller package, create auth_controller.py with the following contents.

from flask import request
from flask_restplus import Resource

from app.main.service.auth_helper import Auth
from ..util.dto import AuthDto

api = AuthDto.api
user_auth = AuthDto.user_auth


@api.route('/login')
class UserLogin(Resource):
    """
        User Login Resource
    """
    @api.doc('user login')
    @api.expect(user_auth, validate=True)
    def post(self):
        # get the post data
        post_data = request.json
        return Auth.login_user(data=post_data)


@api.route('/logout')
class LogoutAPI(Resource):
    """
    Logout Resource
    """
    @api.doc('logout a user')
    def post(self):
        # get auth token
        auth_header = request.headers.get('Authorization')
        return Auth.logout_user(data=auth_header)

At this point the only thing left is to register the auth api namespace with the application Blueprint
Update __init__.py file of app package with the following

# app/__init__.py
from flask_restplus import Api
from flask import Blueprint

from .main.controller.user_controller import api as user_ns
from .main.controller.auth_controller import api as auth_ns

blueprint = Blueprint('api', __name__)

api = Api(blueprint,
          title='FLASK RESTPLUS API BOILER-PLATE WITH JWT',
          version='1.0',
          description='a boilerplate for flask restplus web service'
          )

api.add_namespace(user_ns, path='/user')
api.add_namespace(auth_ns)

Run the application with python manage.py run and open the url http://127.0.0.1:5000 in your browser.

Before we write some tests to ensure our authentication is working as expected, let’s modify our registration endpoint to automatically login a user once the registration is successful.

Add the method generate_token below to user_service.py.

def generate_token(user):
    try:
        # generate the auth token
        auth_token = user.encode_auth_token(user.id)
        response_object = {
            'status': 'success',
            'message': 'Successfully registered.',
            'Authorization': auth_token.decode()
        }
        return response_object, 201
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': 'Some error occurred. Please try again.'
        }
        return response_object, 401

The generate_token method generates an authentication token by encoding the user id. This token is the returned as a response.

Next, replace the return block in save_new_user method below

response_object = {
    'status': 'success',
    'message': 'Successfully registered.'
}
return response_object, 201

with

return generate_token(new_user)

Now its time to test the login and logout functionalities. Create a new test file test_auth.py in the test package with the following content.

import unittest

from app.main import db
from app.main.model.blacklist import BlacklistToken
import json
from app.test.base import BaseTestCase


def register_user(self):
    return self.client.post(
        '/user/',
        data=json.dumps(dict(
            email='joe@gmail.com',
            username='username',
            password='123456'
        )),
        content_type='application/json'
    )


def login_user(self):
    return self.client.post(
        '/auth/login',
        data=json.dumps(dict(
            email='joe@gmail.com',
            password='123456'
        )),
        content_type='application/json'
    )


class TestAuthBlueprint(BaseTestCase):
    def test_registration(self):
        """ Test for user registration """
        with self.client:
            response = register_user(self)
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'success')
            self.assertTrue(data['message'] == 'Successfully registered.')
            self.assertTrue(data['Authorization'])
            self.assertTrue(response.content_type == 'application/json')
            self.assertEqual(response.status_code, 201)

    def test_registered_with_already_registered_user(self):
        """ Test registration with already registered email"""
        register_user(self)
        with self.client:
            response = register_user(self)
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'fail')
            self.assertTrue(
                data['message'] == 'User already exists. Please Log in.')
            self.assertTrue(response.content_type == 'application/json')
            self.assertEqual(response.status_code, 409)

    def test_registered_user_login(self):
        """ Test for login of registered-user login """
        with self.client:
            # user registration
            resp_register = register_user(self)
            data_register = json.loads(resp_register.data.decode())
            self.assertTrue(data_register['status'] == 'success')
            self.assertTrue(
                data_register['message'] == 'Successfully registered.'
            )
            self.assertTrue(data_register['Authorization'])
            self.assertTrue(resp_register.content_type == 'application/json')
            self.assertEqual(resp_register.status_code, 201)
            # registered user login
            response = login_user(self)
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'success')
            self.assertTrue(data['message'] == 'Successfully logged in.')
            self.assertTrue(data['Authorization'])
            self.assertTrue(response.content_type == 'application/json')
            self.assertEqual(response.status_code, 200)

    def test_non_registered_user_login(self):
        """ Test for login of non-registered user """
        with self.client:
            response = login_user(self)
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'fail')
            print(data['message'])
            self.assertTrue(data['message'] == 'email or password does not match.')
            self.assertTrue(response.content_type == 'application/json')
            self.assertEqual(response.status_code, 401)

    def test_valid_logout(self):
        """ Test for logout before token expires """
        with self.client:
            # user registration
            resp_register = register_user(self)
            data_register = json.loads(resp_register.data.decode())
            self.assertTrue(data_register['status'] == 'success')
            self.assertTrue(
                data_register['message'] == 'Successfully registered.')
            self.assertTrue(data_register['Authorization'])
            self.assertTrue(resp_register.content_type == 'application/json')
            self.assertEqual(resp_register.status_code, 201)
            # user login
            resp_login = login_user(self)
            data_login = json.loads(resp_login.data.decode())
            self.assertTrue(data_login['status'] == 'success')
            self.assertTrue(data_login['message'] == 'Successfully logged in.')
            self.assertTrue(data_login['Authorization'])
            self.assertTrue(resp_login.content_type == 'application/json')
            self.assertEqual(resp_login.status_code, 200)
            # valid token logout
            response = self.client.post(
                '/auth/logout',
                headers=dict(
                    Authorization='Bearer ' + json.loads(
                        resp_login.data.decode()
                    )['Authorization']
                )
            )
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'success')
            self.assertTrue(data['message'] == 'Successfully logged out.')
            self.assertEqual(response.status_code, 200)

    def test_valid_blacklisted_token_logout(self):
        """ Test for logout after a valid token gets blacklisted """
        with self.client:
            # user registration
            resp_register = register_user(self)
            data_register = json.loads(resp_register.data.decode())
            self.assertTrue(data_register['status'] == 'success')
            self.assertTrue(
                data_register['message'] == 'Successfully registered.')
            self.assertTrue(data_register['Authorization'])
            self.assertTrue(resp_register.content_type == 'application/json')
            self.assertEqual(resp_register.status_code, 201)
            # user login
            resp_login = login_user(self)
            data_login = json.loads(resp_login.data.decode())
            self.assertTrue(data_login['status'] == 'success')
            self.assertTrue(data_login['message'] == 'Successfully logged in.')
            self.assertTrue(data_login['Authorization'])
            self.assertTrue(resp_login.content_type == 'application/json')
            self.assertEqual(resp_login.status_code, 200)
            # blacklist a valid token
            blacklist_token = BlacklistToken(
                token=json.loads(resp_login.data.decode())['Authorization'])
            db.session.add(blacklist_token)
            db.session.commit()
            # blacklisted valid token logout
            response = self.client.post(
                '/auth/logout',
                headers=dict(
                    Authorization='Bearer ' + json.loads(
                        resp_login.data.decode()
                    )['Authorization']
                )
            )
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'fail')
            self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
            self.assertEqual(response.status_code, 401)


if __name__ == '__main__':
    unittest.main()

Route protection and Authorization

So far, we have successfully created our endpoints, implemented login and logout functionalities but our endpoints remains unprotected.

We need a way to define rules that determines which of our endpoint is open or requires authentication or even an admin privilege.

We can achieve this by creating custom decorators for our endpoints.

Before we can protect or authorize any of our endpoints, we need to know the currently logged in user. We can do this by pulling the Authorization token from the header of the current request by using the flask library request.We then decode the user details from the Authorization token.

In the Auth class of auth_helper.py file, add the following static method

@staticmethod
def get_logged_in_user(new_request):
        # get the auth token
        auth_token = new_request.headers.get('Authorization')
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                user = User.query.filter_by(id=resp).first()
                response_object = {
                    'status': 'success',
                    'data': {
                        'user_id': user.id,
                        'email': user.email,
                        'admin': user.admin,
                        'registered_on': str(user.registered_on)
                    }
                }
                return response_object, 200
            response_object = {
                'status': 'fail',
                'message': resp
            }
            return response_object, 401
        else:
            response_object = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return response_object, 401

Now that we can retrieve the logged in user from the request, let’s go ahead and create the decorators.

Create a file decorator.py in the util package with the following content

from functools import wraps

from flask import request

from app.main.service.auth_helper import Auth


def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):

        data, status = Auth.get_logged_in_user(request)
        token = data.get('data')

        if not token:
            return data, status

        return f(*args, **kwargs)

    return decorated


def admin_token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):

        data, status = Auth.get_logged_in_user(request)
        token = data.get('data')

        if not token:
            return data, status

        admin = token.get('admin')
        if not admin:
            response_object = {
                'status': 'fail',
                'message': 'admin token required'
            }
            return response_object, 401

        return f(*args, **kwargs)

    return decorated

For more information about decorators and how to create them, take a look at this link.

Now that we have created the decorators token_required and admin_token_required for valid token and for an admin token respectively, all that is left is to annotate the endpoints which we wish to protect with the freecodecamp orgappropriate decorator.

电影订票系统后台开发REST API 和 Swagger UI

github,点击此处

创建资源

# *-* coding: utf-8 *-*
from flask import request
from app.utils import UUID
from app.models import Favorite, Movie, db
from flask_restplus import Namespace, Resource
from flask_login import current_user, login_required

api = Namespace('favorite', description='收藏模块')


@api.route('/')
class FavoritesResource(Resource):
    @login_required
    def get(self):
        """获取收藏列表(需登录)"""
        return [f.__json__() for f in current_user.favorites], 200

    @api.doc(parser=api.parser().add_argument(
      'movieId', type=str, required=True, help='电影id', location='form')
    )
    @login_required
    def post(self):
        """收藏电影(需登录)"""
        mid = request.form.get('movieId', '')
        movie = Movie.query.get(mid)
        if movie is None:
            return {'message': '电影不存在'}, 233
        movie = current_user.favorites.filter_by(movieId=mid).first()
        if movie is not None:
            return {'message': '不能重复收藏同部电影'}, 233

        favorite = Favorite()
        favorite.id = UUID()
        favorite.username = current_user.id
        favorite.movieId = mid
        db.session.add(favorite)
        db.session.commit()

        return {'message': '收藏成功', 'id': favorite.id}, 200


@api.route('/<id>')
@api.doc(params={'id': '收藏id'})
class FavoriteResource(Resource):
    @login_required
    def delete(self, id):
        """取消收藏(需登录)"""
        favorite = current_user.favorites.filter_by(id=id).first()
        if favorite is None:
            return {'message': '您没有这个收藏'}, 233
        db.session.delete(favorite)
        db.session.commit()
        return {'message': '取消收藏成功'}, 200

创建了资源之后,只需要进行初始化即可实现 RESTful 服务。

# *-* coding: utf-8 *-*
from flask import Flask
from flask_restplus import Api
from favorite import api as ns1

api = Api(
    title='MonkeyEye',
    version='1.0',
    description='猿眼电影订票系统API',
    doc='/swagger/',             # Swagger UI: http://localhost:5000/swagger/
    catch_all_404s=True,
    serve_challenge_on_401=True
)

api.add_namespace(ns1, path='/api/favorites')

app = Flask(__name__)
api.init_app(app)

if __name__ == '__main__':
    app.run()

参考

  1. Eve. The Simple Way to REST,点击此处
  2. Building-Serverless-Python-Web-Services-with-Zappa,点击此处
  3. 使用 Flask 设计 RESTful APIs,英文版,点击此处,中文版,点击此处
  4. flask-mongorest,点击此处
  5. Buidling a database driven RESTFUL JSON API in Python 3 with Flask Flask-Restful and SQLAlchemy,点击此处
  6. Building beautiful REST APIs using Flask, Swagger UI and Flask-RESTPlus,点击此处
  7. Creating Flask RESTful API Using Python & MySQL,点击此处
  8. How to structure a Flask-RESTPlus web service for production builds,点击此处
上一篇下一篇

猜你喜欢

热点阅读