程序员

Python网络编程笔记(六):服务器架构

2018-08-06  本文已影响144人  郝开心信札

网络服务面临两个基本问题:

  1. 编码:编写出正确处理请求和响应的代码。
  2. 部署:使用守护进程,活动日志持久化存储,防止各种失败或者在失败后立即重启。

部署

参考:十二要素应用宣言

单线程服务器

以一个简单的TCP协议为例:
即使 listen()的参数大于0,第一个对话未完成时,第二个对话仍旧会在队列中,只是减少了切换时建立连接的时间。

缺点:

tips:性能测试使用 trace 模块:python -m trace -tg --ignore-dir=/usr main.py

多线程与多进程服务器

利用操作系统的多路复用。可以创建多个共享相同内存的线程,或者完全独立的进程。

优点:简单,复用单线程代码。

缺点:

模块:threadingmultiprocessing

异步服务器

利用服务端向客户端发送响应后等待下一次响应的时间。

异步(asyhchronous),表示从不停下来,区别于同步(synchronized)

操作系统级异步

传统的 select(), 后续 Linux 的 poll() 和 BSD 的 epoll()

看下面这段简单的异步代码:精髓在于自己设计数据结构保存客户端状态,而不依赖操作系统的上下文切换。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_async.py
# Asynchronous I/O driven directly by the poll() system call.

# zen_utils 是自己编写的处理各种业务逻辑的包
import select, zen_utils

# 两层循环,while 不断调用 poll(), 针对poll返回的不通事件再循环
# 为了代码简洁,用生成器写
def all_events_forever(poll_object):
    while True:
        for fd, event in poll_object.poll():
            yield fd, event

def serve(listener):
    # 维护 sockets 字典和 address 字典
    sockets = {listener.fileno(): listener} 
    addresses = {}
    # 要接受和要发送的缓存字典。这四个字典是核心。
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    # 监听套接字始终在 sockets 字典里,且状态始终未 POLLIN
    poll_object.register(listener, select.POLLIN)

    for fd, event in all_events_forever(poll_object):
        sock = sockets[fd]

        # Socket closed: remove it from our data structures.
        # 出错、关闭、异常等

        if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock, b'')
            sb = bytes_to_send.pop(sock, b'')
            if rb:
                print('Client {} sent {} but then closed'.format(address, rb))
            elif sb:
                print('Client {} closed before we sent {}'.format(address, sb))
            else:
                print('Client {} closed socket normally'.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        # New socket: add it to our data structures.
        # 监听套接字,accept

        elif sock is listener:
            sock, address = sock.accept()
            print('Accepted connection from {}'.format(address))
            sock.setblocking(False)     # force socket.timeout if we blunder
            sockets[sock.fileno()] = sock
            addresses[sock] = address
            poll_object.register(sock, select.POLLIN)

        # Incoming data: keep receiving until we see the suffix.
        # POLLIN状态,recv()

        elif event & select.POLLIN:
            more_data = sock.recv(4096)
            if not more_data:  # end-of-file
                sock.close()  # next poll() will POLLNVAL, and thus clean up
                continue
            data = bytes_received.pop(sock, b'') + more_data
            if data.endswith(b'?'):
                bytes_to_send[sock] = zen_utils.get_answer(data)
                poll_object.modify(sock, select.POLLOUT)
            else:
                bytes_received[sock] = data

        # Socket ready to send: keep sending until all bytes are delivered.
        # POLLOUT状态,send

        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock, select.POLLIN)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('low-level async server')
    listener = zen_utils.create_srv_socket(address)
    serve(listener)

回调风格的 asynio

把 select 调用的细节隐藏起来。

通过对象实例来维护每个打开的客户端链接,使用对象的方法调用。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_asyncio1.py
# Asynchronous I/O inside "asyncio" callback methods.

import asyncio, zen_utils

# 一个对象实例维护一个客户端链接
class ZenServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            answer = zen_utils.get_answer(self.data)
            # 响应通过 self.transport.write() 即可
            self.transport.write(answer)
            self.data = b''

    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address, exc))
        elif self.data:
            print('Client {} sent {} but then closed'
                  .format(self.address, self.data))
        else:
            print('Client {} closed socket'.format(self.address))

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using callbacks')
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ZenServer, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

协程风格的 asyncio

协程(coroutine)是一个函数,在进行IO操作是不会阻塞,而是暂停,将控制权转移回调用方。python支持协程的标准形式就是生成器 yield。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_asyncio2.py
# Asynchronous I/O inside an "asyncio" coroutine.

import asyncio, zen_utils

@asyncio.coroutine
def handle_conversation(reader, writer):
    address = writer.get_extra_info('peername')
    print('Accepted connection from {}'.format(address))
    while True:
        data = b''
        while not data.endswith(b'?'):
            # 注意 yield from 
            more_data = yield from reader.read(4096)
            if not more_data:
                if data:
                    print('Client {} sent {!r} but then closed'
                          .format(address, data))
                else:
                    print('Client {} closed socket normally'.format(address))
                return
            data += more_data
        answer = zen_utils.get_answer(data)
        writer.write(answer)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using coroutine')
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_conversation, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

完美方案

异步的缺点是:所有操作都在单个线程中完成。即使多核机器,也只会使用一个核。

方案:检查核数,有几个核,就启动几个事件循环进程。在每个CPU上,使用异步(回调或者协程)方案。操作系统负责新建立的连接分配给某个服务器进程。

上一篇下一篇

猜你喜欢

热点阅读