实战Linux I/O多路复用:借助epoll,单线程高效管理1

2024-05-09  本文已影响0人  拂去尘世尘

[TOC]

引言

  在应对高并发连接的传统策略中,普遍采取为每个连接配置单独线程或进程的直接方式,管理其I/O操作。此法虽直观易行,但随业务规模扩张,线程资源需求急剧上升。相反,Linux下的I/O多路复用技术,尤其是epoll,展示了一种高效路径:单一线程即可监控成千上万的文件描述符,极大提升了资源使用效率。
  I/O 多路复用的场景有很多,也比较实用。通常用法epoll线程 + 线程/协程池处理并发场景,这里做一个简单的实例使用,以便后续查阅。

概述

selectpoll同样能够满足多路复用的需求,在特定场景下各有千秋。不过,当面对需监控大量文件句柄的场景时,epoll凭借其高效的设计和更高的性能表现,成为更为优选的解决方案。其不仅在资源管理和事件处理上展现出明显优势,而且编程接口的灵活性也更为优雅。本文主要聚焦于epoll的实践应用,实例学习其高效而精炼的使用方法。

epoll常用接口

epoll的描述man手册已经记录比较详细了,这里列举一下常用的接口:

  1. epoll_create / epoll_create1
  1. epoll_ctl
  1. epoll_wait

应用场景

  在高并发TCP服务场景中,服务端通过部署epoll + 线程/协程池机制,构建高效服务框架。epoll作为核心监听器,统一管理并快速响应来自不同客户端的连接请求,其事件驱动特性确保了对socket就绪状态的即时检测。与此同时,这些请求被异步地分发至线程/协程池中,利用任务队列和工作线程(或轻量级协程)并发执行,提升数据处理能力。

类图

EpollEventHandler类图

源码实现

编程环境

① 编译环境: Linux环境
② 语言: C++语言

接口定义

class EpollEventHandler
{
public:
    virtual ~EpollEventHandler();
    static EpollEventHandler* GetInstance();

    void AddPoll(IEpollEvent* p);
    void DelPoll(IEpollEvent* p);
    void EpollLoop(bool bRun);

private:
    EpollEventHandler(int size = 0);

private:
    int     mHandle;
    bool    mRun;
    std::map<int, IEpollEvent*> mEpollMap;   // fd, type, IEpollEvent
};

EpollEventHandler主要封装了epoll接口,集中管理并监听所有IEpollEvent实例。在EpollLoop循环中,阻塞等待并处理各类句柄事件,一旦事件触发,即通过多态调用IEpollEvent的虚函数来EpollEvent执行特定的事件处理逻辑,从而实现差异化的处理需求。

void EpollEventHandler::EpollLoop(bool bRun)
{
    struct epoll_event ep[32];

    mRun = bRun;
    do {
        if (!mRun) {
            break;
        }

        // 无事件时, epoll_wait阻塞, 等待
        int count = epoll_wait(mHandle, ep, sizeof(ep)/sizeof(ep[0]), -1);
        if (count <= 0) {
            continue;
        }

        for (int i = 0; i < count; i++) {
            IEpollEvent* p = (IEpollEvent*)ep[i].data.ptr;
            if (p == nullptr) {
                continue;
            }

            // TODO: 丢到线程/协程池响应
            p->EpollEvent(p->GetEpollFd(), p->GetEpollType(), p->GetArgs());
        }
    } while(mRun);

    SPR_LOGD("EpollLoop exit\n");
}
class IEpollEvent
{
public:
    IEpollEvent(int fd, EpollType eType = EPOLL_TYPE_BEGIN, void* arg = nullptr)
        : mEpollFd(fd), mEpollType(eType), mArgs(arg) {};

    virtual ~IEpollEvent() = default;
    virtual ssize_t Write(int fd, const std::string& bytes);
    virtual ssize_t Read(int fd, std::string& bytes);
    virtual void*   EpollEvent(int fd, EpollType eType, void* arg) = 0;

    int         GetEpollFd()        { return mEpollFd; }
    EpollType   GetEpollType()      { return mEpollType; }
    void*       GetArgs()           { return mArgs; }

protected:
    int         mEpollFd;
    EpollType   mEpollType;
    void*       mArgs;
};

IEpollEvent主要统一句柄注册与事件处理的标准操作,方便EpollEventHandler统一监听,通过EpollEvent实现差异化响应。

class PSocket : public IEpollEvent
{
public:
    PSocket(int domain, int type, int protocol,
               std::function<void(int, void*)> cb, void* arg = nullptr);

    PSocket(int sock,
               std::function<void(int, void*)> cb, void* arg = nullptr);

    virtual ~PSocket();

    void Close();
    int AsTcpServer(short bindPort, int backlog);
    int AsTcpClient(bool con = false,
                    const std::string& srvAddr = "",
                    short srvPort = 0,
                    int rcvLen = 512 * 1024,
                    int sndLen = 512 * 1024);

    int AsUdpServer(short bindPort, int rcvLen = 512 * 1024);
    int AsUdpClient(const std::string& srvAddr, short srvPort, int sndLen = 512 * 1024);

    int AsUnixStreamServer(const std::string& serverName, int backlog);
    int AsUnixStreamClient(bool con = false,
                           const std::string& serverName = "",
                           const std::string& clientName = "");

    int AsUnixDgramServer(const std::string& serverName);
    int AsUnixDgramClient(const std::string& serverName);

    virtual void*   EpollEvent(int fd, EpollType eType, void* arg) override;

private:
    bool            mEnable;
    PSocketType     mSockType;
    std::function<void(int, void*)> mCb;
};
class PUart : public IEpollEvent
{
public:
    PUart(const std::string& devPath,
            std::function<void(int, char *, long, void*)> cb,
            void*   arg     = nullptr,
            speed_t rate    = B115200,
            int     parity  = 0,
            int     stopbit = 1
            );
    virtual ~PUart();


    void* EpollEvent(int fd, EpollType eType, void* arg) override;

    bool  SetupPort(speed_t rate, int parity, int stopbit);
    void  Close();

private:
    std::function<void(int, char *, long, void*)> mCb;
    std::string mDevFile;
};

测试效果

int main(int argc, const char *argv[])
{
    std::mutex epFdMutex;
    EpollEventHandler *pEpoll = EpollEventHandler::GetInstance();
    auto tcpClient = make_shared<PSocket>(AF_INET, SOCK_STREAM, 0, [&](int sock, void *arg) {
        PSocket* pCliObj = (PSocket*)arg;
        if (pCliObj == nullptr) {
            SPR_LOGE("PSocket is nullptr\n");
            return;
        }

        std::string rBuf;
        int rc = pCliObj->Read(sock, rBuf);
        if (rc > 0) {
            SPR_LOGD("# RECV [%d]> %s\n", sock, rBuf.c_str());
        } else {
            pEpoll->DelPoll(pCliObj);
            SPR_LOGD("## CLOSE [%d]\n", sock);

            std::lock_guard<std::mutex> lock(epFdMutex);
            pCliObj->Close();
        }
    });

    tcpClient->AsTcpClient(true, "127.0.0.1", 8080);
    pEpoll->AddPoll(tcpClient.get());

    std::thread wThread([&]{
        while(true) {
            std::lock_guard<std::mutex> lock(epFdMutex);
            tcpClient->Write(tcpClient->GetEpollFd(), "Hello World");
            sleep(1);
        }
    });

    pEpoll->EpollLoop(true);
    wThread.join();
    return 0;
}
$ ./sample_tcpserver
  81 EpollEvent D: Add epoll fd 4
  81 EpollEvent D: Add epoll fd 5
  81 EpollEvent D: Add epoll fd 6
  54 TcpServer D: # RECV [6]> I'm Client A
  58 TcpServer D: # SEND [6]> ACK
  54 TcpServer D: # RECV [5]> I'm Client B
  58 TcpServer D: # SEND [5]> ACK
  54 TcpServer D: # RECV [6]> I'm Client A
  58 TcpServer D: # SEND [6]> ACK
  54 TcpServer D: # RECV [5]> I'm Client B
  58 TcpServer D: # SEND [5]> ACK

测试结果上看,sample_tcpserver能够实现一个线程同时监听两个客户端的请求和应答。

总结

上一篇下一篇

猜你喜欢

热点阅读