Python版本Zinx——(9)链接管理
最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍。
python版本的Zinx,基于Gevent 22.10.2,使用协程功能。
golang版本的Zinx项目,项目中两个文件夹,ziface和znet。
- ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
- znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
└── zinx
├── ziface
│ └──
└── znet
├──
python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
暂时使用抽象基类的形式模拟接口的实现。
在第4节全局配置中,配置了一个参数MaxConn,表示最大链接数量。本节就将这个参数的作用进行实现。
在ziface中,创建iconmanager,链接管理抽象层。
# -*- coding: utf-8 -*-
from ziface.iconnection import IConnection
from abc import ABC, abstractmethod
class IConManager(ABC):
"""
连接管理抽象层
"""
@abstractmethod
def Add(self, Conn: IConnection):
"""
添加链接
:param Conn:
:return:
"""
pass
@abstractmethod
def Remove(self, Conn: IConnection):
"""
删除连接
:param Conn:
:return:
"""
pass
@abstractmethod
def Get(self, connID: int) -> IConnection:
"""
利用ConnID获取链接
:param connID:
:param Conn:
:return:
"""
pass
@abstractmethod
def Len(self) -> int:
"""
获取当前连接个数
:return:
"""
pass
@abstractmethod
def ClearConn(self):
"""
删除并停止所有链接
:return:
"""
在znet中创建conmanager实现一下。
# -*- coding: utf-8 -*-
from typing import Dict, Optional
from ziface.iconManager import IConManager
from ziface.iconnection import IConnection
class ConManager(IConManager):
"""
连接管理模块
"""
def __init__(self):
self.connections: Dict[int, IConnection] = {}
def Add(self, Conn: IConnection):
"""
添加链接
:param Conn:
:return:
"""
self.connections[Conn.GetConnID()] = Conn
print("链接添加至链接管理成功,当前数量 = ", self.Len())
def Remove(self, Conn: IConnection):
"""
删除连接
:param Conn:
:return:
"""
del self.connections[Conn.GetConnID()]
print("链接移除成功ConnID=", Conn.GetConnID(), ",当前数量 = ", self.Len())
def Get(self, connID: int) -> Optional[IConnection]:
"""
利用ConnID获取链接
:param connID:
:return:
"""
if connID in self.connections.keys():
return self.connections[connID]
return None
def Len(self) -> int:
"""
获取当前连接个数
:return:
"""
return len(self.connections)
def ClearConn(self):
"""
删除并停止所有链接
:return:
"""
# 当进行key遍历时,如果dict被修改会抛出异常,使用copy做一个副本进行解决
keys = self.connections.copy().keys()
for key in keys:
if key not in self.connections.keys():
continue
self.connections[key].Stop()
try:
del self.connections[key]
except:
continue
print("清除所有链接: 当前数量 = ", self.Len())
# 创建一个链接管理
def NewConnManager() -> IConManager:
cm = ConManager()
return cm
链接管理模块封装好了,接下来就是集成了。
在Server中,添加一个成员变量connManager。
class Server(IServer):
def __init__(self, name: str, ip: str, port: int,
family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM):
self.name: str = name
self.family: socket.AddressFamily = family
self.socket_kind: socket.SocketKind = socket_kind
self.ip: str = ip
self.port: int = port
# self.Router: Optional[IRouter] = None
self.msgHandler: IMsgHandler = NewMsgHandler()
self.connManager: IConManager = NewConnManager()
在IServer中,添加一个成员函数GetConnMgr。
@abstractmethod
def GetConnMgr(self) -> IConManager:
"""
得到链接管理
:return:
"""
pass
在Server中,实现一下GetConnMgr。
def GetConnMgr(self) -> IConManager:
"""
得到链接管理
:return:
"""
return self.connManager
在Connection中,添加Server属性,为Connection提供connManager的使用权。在Connection初始化的时候,就添加到链接管理模块中
class Connection(IConnection):
def __init__(self, tcp_server: IServer, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler):
self.TCPServer = tcp_server
self.Conn: socket.socket = conn # 当前链接的socket TCP套接字
self.ConnID: int = connID # 链接的ID
# self.HandlerAPI = handlerAPI # 当前链接绑定的业务处理方法的API
self.is_closed: bool = False # 链接状态
self.Remote_Addr: tuple = remote_addr # 地址
# self.Router: IRouter = router
self.msgHandler: IMsgHandler = msgHandler # 消息处理模块
self.msgQueue: Queue = Queue() # 写队列
def NewConnection(tcp_server: IServer, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler) -> IConnection:
c = Connection(tcp_server, conn, connID, remote_addr, msgHandler)
c.TcpServer.GetConnMgr().Add(c)
return c
在Server的InitTCP中判断一下是否达到最大链接数。
def InitTCP(self):
"""
初始化TCP链接
:return:
"""
# 1.获取一个TCP的Addr
with socket.socket(self.family, self.socket_kind) as tcp:
# 2.监听服务器的地址
tcp.bind((self.ip, self.port))
tcp.listen(128)
print("[启动] %s服务启动成功,监听中......" % self.name)
# 3.阻塞的等待客户端链接,处理客户端链接业务(读写)。
cid: int = 0
while True:
print("开启接收")
remote_socket, remote_addr = tcp.accept()
if self.connManager.Len() >= GlobalObject.MaxConn:
remote_socket.close()
continue
dealConn = NewConnection(self, remote_socket, cid, remote_addr, self.msgHandler)
cid += 1
g2 = gevent.spawn(dealConn.Start())
GlobalGevents.append(g2)
在Connection断开链接的时候,移除当前链接。在Connection中Stop函数中移除。
def Stop(self):
"""
停止链接 结束当前链接的工作
:return:
"""
print("链接关闭,ID=", self.ConnID)
if self.is_closed:
return
self.is_closed = True
# 关闭socket链接,回收资源
self.Conn.close()
# 将链接从连接管理器中删除
self.TcpServer.GetConnMgr().Remove(self)
在Server停止的时候,移除当前所有链接。在Server中Stop函数中移除。
def Stop(self):
"""
停止服务器方法
:return:
"""
print("服务停止")
# 断开所有链接,回收
self.connManager.ClearConn()
服务接口做完了。在demo\connmanager中做一个客户端和服务端测试一下,代码与demo\msghandler一样即可。
此时发送和接收都正常。链接管理模块完成。