Python网络编程Python

Twisted + gevent 异步+协程服务器开发

2016-07-04  本文已影响0人  小虚大魔王

背景

最近接触到用 Twisted 来写个 RPC 服务器,对高并发、性能和大量长连接时的稳定性方面有要求,所以应该在 Twisted 的基础上再造些轮子,最后考虑用 Twisted + gevent 来实现 「异步+协程」的部分。

分别简要介绍下 Twisted 和 gevent。

Twisted

Twisted是用 Python 实现的基于事件驱动的异步的网络引擎框架。它封装了大部分主流的网络协议(传输层或应用层),如 TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP等,在这我主要会用到 TCP 协议。

使用 Twisted 的好处在于,它是以事件驱动编程实现的,所以提供了事件注册的回调函数的接口,每次接受到请求,获得了事件通知,就调用事件所注册的回调函数( Node.js 程序员可能比较熟悉)。这让我不必去操心服务器事件驱动的编写。

并且,在网络引擎方面,有心跳包和粘包的三方库,非常完善。

然而,Twisted 有一个缺陷,它的异步有点问题,单个连接建立后是一个进程,在进程里用多线程实现并发,但多个连接建立后仍然会出现同步阻塞的情况,所以这就要引入 gevent 来填充其性能上的缺陷。

gevent

gevent 是一种基于协程的 Python 网络库,它用到 greenlet 提供的,封装了 libevent 事件循环的高层同步API。

如果你不知道什么是协程,那么可以简单这么理解:

协程就是由程序员自己编码实现调度的多线程。

而 gevent 对 greenlet 协程进行了封装,同时 gevent 提供了看上去非常像传统的基于线程模型编程的接口,但是在隐藏在下面做的是异步 I/O ,所以它以同步的编码实现了异步的功能。

开搞

Step 1 完成基础框架

首先由于我要编写一个 RPC 服务器(使用 TCP 协议),所以需要先实现一个 TCP 服务器。

# server.py
from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

PORT = 5354

class CmdProtocol(LineReceiver):
    client_ip = ''
    
    # 连接建立接口
    def connectionMade(self):
        # 获得连接对端 ip
        self.client_ip = self.transport.getPeer().host
        print("Client connection from %s" % self.client_ip)
  
    # 连接断开接口
    def connectionLost(self, reason):
        print('Lost client connection. Reason: %s' % reason)

    # 数据接收接口
    def dataReceived(self, data):
        print('Cmd received from %s : %s' % (self.client_ip, data))

class RPCFactory(ServerFactory):
    # 使用 CmdProtocol 与客户端通信
    protocol = CmdProtocol

# 启动服务器
if __name__ == "__main__":
    reactor.listenTCP(PORT, RPCFactory())
    reactor.run()

Twisted 提供3个非常基础的接口使程序员进行重写:

这3个接口通常来说是必须的,以此基础上进行完善,可以看到我只是先输出了友好信息。

这样简单完成了一个 TCP 服务器,可以看出 Twisted 网络引擎的架构如下:

  1. 先由程序员来制定一个或多个协议(该协议可以继承各种底层网络协议)。
  2. 接着指定唯一一个工厂,这个工厂必须声明使用的协议对象。
  3. 使用 reactor 选择监听模式、监听工厂和端口,开启服务器。

Step 2 完善基础框架

显然,这个 TCP 服务器基础框架显得有些单薄,我首先想到的是需要进行多客户端的控制及 ip 记录,故应有个队列来实时更新连接入服务器的 ip。

并且,最近有好几部电影在豆瓣我标记了,我想和高圆圆一起去看,所以不能一直盯着屏幕来观察反馈,所以需要一个日志系统来记录反馈信息。

故增加一个 log.py 日志系统文件:

# log.py
import os
import logging
import logging.handlers
from twisted.python import log

#当前执行文件所在地址
CURRENT_PATH = os.getcwd()
#日志文件路径
LOG_FILE = CURRENT_PATH + '/rpcserver.log'
# 全局日志模块
gl_logger = None 

class log(Protocol):
    def init_log():
    global gl_logger
    try:
        os.makedirs(os.path.dirname(LOG_FILE))
    except:
        pass
    # 实例化handler 
    handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024, backupCount=1)   
    fmt = '[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s] - %(message)s'    
     # 实例化formatter
    formatter = logging.Formatter(fmt)
    # 为handler添加formatter
    handler.setFormatter(formatter)
    # 获取名为rpcserver的logger
    gl_logger = logging.getLogger('rpcserver')
    # 为logger添加handler    
    loggergl_logger.addHandler(handler)
    handlergl_logger.setLevel(logging.DEBUG)

    gl_logger.info("----------------------------------")

并在 server.py 中添加如下代码:
(添加多连接控制,把 print 替换为 log.msg 来打印日志)

# server.py
from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from twisted.python import log

PORT = 5354

class CmdProtocol(LineReceiver):
    client_ip = ''
    
    # 连接建立接口
    def connectionMade(self):
        # 获得连接对端 ip
        self.client_ip = self.transport.getPeer().host
        log.msg("Client connection from %s" % self.client_ip)
        
        # 进行多连接控制
        if len(self.factory.clients) >= self.factory.clients_max:
            log.msg("Too many connections. Disconnect!")
            self.client_ip = None
            self.transport.loseConnection()
        else:
            self.factory.clients.append(self.client_ip)
  
    # 连接断开接口
    def connectionLost(self, reason):
        log.msg('Lost client connection. Reason: %s' % reason)
        if self.client_ip:
            self.factory.clients.remove(self.client_ip)

    # 数据接收接口
    def dataReceived(self, data):
        log.msg('Cmd received from %s : %s' % (self.client_ip, data))

class RPCFactory(ServerFactory):
    # 使用 CmdProtocol 与客户端通信
    protocol = CmdProtocol
    
    # 设置最大连接数
    def __init__(self, clients_max=10):
        self.clients_max = clients_max
        self.clients = []

# 启动服务器
if __name__ == "__main__":
    log.startLogging(sys.stdout)
    reactor.listenTCP(PORT, RPCFactory())
    reactor.run()

Step 3 增加 rpc 实例

既然是 rpc 服务器,辣么接下来就要实现一个简单的远程命令调用,既然之前写了日志模块,那就写一个对应的远程日志查看调用吧!

对了,写到这里,已经是 02:53 了,我不知道为什么开始胡思乱想起来。

我想大概是因为越是无端的,越是会心念着...

rpcserver
上一篇 下一篇

猜你喜欢

热点阅读