Python版本Zinx——(9)链接管理

2023-08-29  本文已影响0人  爱折腾的胖子

  最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍
  python版本的Zinx,基于Gevent 22.10.2,使用协程功能。

  golang版本的Zinx项目,项目中两个文件夹,ziface和znet。

  • ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
  • znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
    └── zinx
     ├── ziface
     │  └──
     └── znet
        ├──

  python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
  暂时使用抽象基类的形式模拟接口的实现。


  在第4节全局配置中,配置了一个参数MaxConn,表示最大链接数量。本节就将这个参数的作用进行实现。
  在ziface中,创建iconmanager,链接管理抽象层。

# -*- coding: utf-8 -*-
from ziface.iconnection import IConnection
from abc import ABC, abstractmethod

class IConManager(ABC):
    """
    连接管理抽象层
    """
    @abstractmethod
    def Add(self, Conn: IConnection):
        """
        添加链接
        :param Conn:
        :return:
        """
        pass

    @abstractmethod
    def Remove(self, Conn: IConnection):
        """
        删除连接
        :param Conn:
        :return:
        """
        pass

    @abstractmethod
    def Get(self, connID: int) -> IConnection:
        """
        利用ConnID获取链接
        :param connID:
        :param Conn:
        :return:
        """
        pass

    @abstractmethod
    def Len(self) -> int:
        """
        获取当前连接个数
        :return:
        """
        pass

    @abstractmethod
    def ClearConn(self):
        """
        删除并停止所有链接
        :return:
        """

  在znet中创建conmanager实现一下。

# -*- coding: utf-8 -*-
from typing import Dict, Optional

from ziface.iconManager import IConManager
from ziface.iconnection import IConnection


class ConManager(IConManager):
    """
    连接管理模块
    """

    def __init__(self):
        self.connections: Dict[int, IConnection] = {}

    def Add(self, Conn: IConnection):
        """
        添加链接
        :param Conn:
        :return:
        """
        self.connections[Conn.GetConnID()] = Conn
        print("链接添加至链接管理成功,当前数量 = ", self.Len())

    def Remove(self, Conn: IConnection):
        """
        删除连接
        :param Conn:
        :return:
        """
        del self.connections[Conn.GetConnID()]
        print("链接移除成功ConnID=", Conn.GetConnID(), ",当前数量 = ", self.Len())

    def Get(self, connID: int) -> Optional[IConnection]:
        """
        利用ConnID获取链接
        :param connID:
        :return:
        """
        if connID in self.connections.keys():
            return self.connections[connID]
        return None

    def Len(self) -> int:
        """
        获取当前连接个数
        :return:
        """
        return len(self.connections)

    def ClearConn(self):
        """
        删除并停止所有链接
        :return:
        """
        # 当进行key遍历时,如果dict被修改会抛出异常,使用copy做一个副本进行解决
        keys = self.connections.copy().keys()
        for key in keys:
            if key not in self.connections.keys():
                continue
            self.connections[key].Stop()
            try:
                del self.connections[key]
            except:
                continue
        print("清除所有链接: 当前数量 = ", self.Len())


# 创建一个链接管理
def NewConnManager() -> IConManager:
    cm = ConManager()
    return cm

  链接管理模块封装好了,接下来就是集成了。
  在Server中,添加一个成员变量connManager。

class Server(IServer):
    def __init__(self, name: str, ip: str, port: int,
                 family: socket.AddressFamily = socket.AF_INET,
                 socket_kind: socket.SocketKind = socket.SOCK_STREAM):
        self.name: str = name
        self.family: socket.AddressFamily = family
        self.socket_kind: socket.SocketKind = socket_kind
        self.ip: str = ip
        self.port: int = port
        # self.Router: Optional[IRouter] = None
        self.msgHandler: IMsgHandler = NewMsgHandler()
        self.connManager: IConManager = NewConnManager()

  在IServer中,添加一个成员函数GetConnMgr。

    @abstractmethod
    def GetConnMgr(self) -> IConManager:
        """
        得到链接管理
        :return:
        """
        pass

  在Server中,实现一下GetConnMgr。

    def GetConnMgr(self) -> IConManager:
        """
        得到链接管理
        :return:
        """
        return self.connManager

  在Connection中,添加Server属性,为Connection提供connManager的使用权。在Connection初始化的时候,就添加到链接管理模块中

class Connection(IConnection):
    def __init__(self, tcp_server: IServer, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler):
        self.TCPServer = tcp_server
        self.Conn: socket.socket = conn  # 当前链接的socket TCP套接字
        self.ConnID: int = connID  # 链接的ID
        # self.HandlerAPI = handlerAPI  # 当前链接绑定的业务处理方法的API
        self.is_closed: bool = False  # 链接状态
        self.Remote_Addr: tuple = remote_addr # 地址
        # self.Router: IRouter = router
        self.msgHandler: IMsgHandler = msgHandler # 消息处理模块
        self.msgQueue: Queue = Queue() # 写队列

def NewConnection(tcp_server: IServer, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler) -> IConnection:
    c = Connection(tcp_server, conn, connID, remote_addr, msgHandler)
    c.TcpServer.GetConnMgr().Add(c)
    return c

  在Server的InitTCP中判断一下是否达到最大链接数。

    def InitTCP(self):
        """
        初始化TCP链接
        :return:
        """
        # 1.获取一个TCP的Addr
        with socket.socket(self.family, self.socket_kind) as tcp:
            # 2.监听服务器的地址
            tcp.bind((self.ip, self.port))
            tcp.listen(128)
            print("[启动] %s服务启动成功,监听中......" % self.name)
            # 3.阻塞的等待客户端链接,处理客户端链接业务(读写)。
            cid: int = 0
            while True:
                print("开启接收")
                remote_socket, remote_addr = tcp.accept()
                if self.connManager.Len() >= GlobalObject.MaxConn:
                    remote_socket.close()
                    continue
                dealConn = NewConnection(self, remote_socket, cid, remote_addr, self.msgHandler)
                cid += 1
                g2 = gevent.spawn(dealConn.Start())
                GlobalGevents.append(g2)

  在Connection断开链接的时候,移除当前链接。在Connection中Stop函数中移除。

    def Stop(self):
        """
        停止链接 结束当前链接的工作
        :return:
        """
        print("链接关闭,ID=", self.ConnID)
        if self.is_closed:
            return
        self.is_closed = True
        # 关闭socket链接,回收资源
        self.Conn.close()
        # 将链接从连接管理器中删除
        self.TcpServer.GetConnMgr().Remove(self)

  在Server停止的时候,移除当前所有链接。在Server中Stop函数中移除。

    def Stop(self):
        """
        停止服务器方法
        :return:
        """
        print("服务停止")
        # 断开所有链接,回收
        self.connManager.ClearConn()

  服务接口做完了。在demo\connmanager中做一个客户端和服务端测试一下,代码与demo\msghandler一样即可。
  此时发送和接收都正常。链接管理模块完成。

上一篇下一篇

猜你喜欢

热点阅读