WSGI简介

2020-07-03  本文已影响0人  诺之林

结合案例Python部署 & 示例代码wsgi-demo

All in One

vim app_in_one.py
# coding:utf-8

import socket


EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'Hello World!'
response_params = [
    'HTTP/1.0 200 OK',
    'Date: Sat, 10 jun 2017 01:01:01 GMT',
    'Content-Type: text/plain; charset=utf-8',
    'Content-Length: {}\r\n'.format(len(body)),
    body,
]
response = '\r\n'.join(response_params)


def handle_connection(conn, addr):
    request = b""
    while EOL1 not in request and EOL2 not in request:
        request += conn.recv(1024)
    print(request)
    conn.send(response.encode())
    conn.close()


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('127.0.0.1', 8080))
    serversocket.listen(1)

    try:
        while True:
            conn, address = serversocket.accept()
            handle_connection(conn, address)
    finally:
        serversocket.close()


if __name__ == '__main__':
    main()
python all_in_one.py

curl localhost:8080
# Hello World!
# b'GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.64.1\r\nAccept: */*\r\n\r\n'

WSGI

Nginx <===> [Gunicorn](https://gunicorn.org/) <===> Flask App

Web Server <===> WSGI <===> Web App

WSGI gunicorn

vim web_app.py
# coding:utf-8

def web_app(environ, start_response):
    print(environ, start_response)
    print('\n\r')
    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [b'Hello World!\n']
gunicorn web_app:web_app

curl localhost:8000
# Hello World!

WSGI wsgiref

vim wsgi_wsgiref.py
# coding:utf-8

from wsgiref.simple_server import make_server
from web_app import web_app

if __name__ == "__main__":
    make_server('', 8000, web_app).serve_forever()
python wsgi_wsgiref.py

curl localhost:8000
# Hello World!

WSGI implement

vim wsgi_implement.py
# coding:utf-8

import os
import sys

from web_app import web_app


def run_with_wsgi(application):
    headers_set = []

    def write(data):
        status, response_headers = headers_set
        sys.stdout.write('Status: %s\r\n' % status)
        for header in response_headers:
            sys.stdout.write('%s: %s\r\n' % header)
        sys.stdout.write('\r\n')

        sys.stdout.write(data.decode())
        sys.stdout.flush()

    def start_response(status, response_headers):
        headers_set[:] = [status, response_headers]

    environ = dict(os.environ.items())
    environ['wsgi.input'] = sys.stdin
    environ['wsgi.errors'] = sys.stderr
    environ['wsgi.version'] = (1, 0)
    environ['wsgi.multithread'] = False
    environ['wsgi.multiprocess'] = True
    environ['wsgi.run_once'] = True
    environ['wsgi.url_scheme'] = 'http'

    result = application(environ, start_response)
    try:
        for data in result:
            if data:
                write(data)
    finally:
        if hasattr(result, 'close'):
            result.close()


if __name__ == '__main__':
    run_with_wsgi(web_app)
python wsgi_implement.py
# Hello World!

参考

上一篇 下一篇

猜你喜欢

热点阅读