Python版本Zinx——(7)读写分离

2023-08-28  本文已影响0人  爱折腾的胖子

  最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍
  python版本的Zinx,基于Gevent 22.10.2,使用协程功能。

  golang版本的Zinx项目,项目中两个文件夹,ziface和znet。

  • ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
  • znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
    └── zinx
     ├── ziface
     │  └──
     └── znet
        ├──

  python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
  暂时使用抽象基类的形式模拟接口的实现。


  在之前的章节中,消息读到之后直接就进行了写操作。本节就将写操作分离出来。需要使用Queue队列进行存储即将需要写的数据。本节内容都在Connection中完成。
  在Connection中添加一个成员变量。

from gevent.queue import Queue


class Connection(IConnection):
    def __init__(self, conn: socket.socket, connID: int, remote_addr: tuple, msgHandler: IMsgHandler):
        self.Conn: socket.socket = conn  # 当前链接的socket TCP套接字
        self.ConnID: int = connID  # 链接的ID
        # self.HandlerAPI = handlerAPI  # 当前链接绑定的业务处理方法的API
        self.is_closed: bool = False  # 链接状态
        self.Remote_Addr: tuple = remote_addr # 地址
        # self.Router: IRouter = router
        self.msgHandler: IMsgHandler = msgHandler # 消息处理模块
        self.msgQueue: Queue = Queue() # 写队列

  修改一下SendMsg,原来的SendMsg直接就将消息写出去了,现在需要将消息存入msgQueue中。

    def SendMsg(self, msgID: int, data: bytes):
        """
        发送数据 将数据发送给远程的客户端
        :param msgID:
        :param data:
        :return:
        """
        if self.is_closed:
            raise Exception("发送数据时客户端链接被关闭")
        try:
            dp = NewDataPack()
            msg = dp.Pack(NewMessage(msgID, len(data), data))
            # self.Conn.send(msg)
            self.msgQueue.put(msg)
        except Exception as e:
            print(e)

  新增一个StartWriter函数,实现写功能。

    def StartWriter(self):
        """
        写业务
        :return:
        """
        print("开启写业务")
        while True:
            try:
                if self.is_closed:
                    break
                msg = self.msgQueue.get_nowait()
                self.Conn.send(msg)
            except:
                # 由于msgQueue的get_nowait并不会主动切换其他协程,使用sleep切换到其他协程,保证不在当前协程中循环运行
                gevent.sleep(0)
                continue
        print(self.Remote_Addr, "写业务退出")

  在Start函数中,开启写功能。

    def Start(self):
        """
        启动链接 让当前的链接准备开始工作
        :return:
        """
        print("链接开启,ID=", self.ConnID)
        # 开启写业务
        g1 = gevent.spawn(self.StartWriter)
        # 开启读业务
        g2 = gevent.spawn(self.StartReader)
        GlobalGevents.append(g1)
        GlobalGevents.append(g2)

  服务接口做完了。在demo\read_write中做一个客户端和服务端测试一下,代码与demo\msghandler一样即可。
  此时发送和接收都正常。读写分离完成。

上一篇下一篇

猜你喜欢

热点阅读