用time wheel踢掉空闲TCP连接——使用C++智能指针的

2020-11-04  本文已影响0人  老杜振熙

注:本文为阅读《Linux多线程服务端编程:使用muduo C++网络库》的一点笔记

空闲连接指的是一段时间内没有受到任何数据的连接。我们需要做的是每隔一段时间断开这些空闲连接,以免浪费资源。剔除空闲连接这一任务大概有如下两个特点和需求:

time wheel运用了桶排序的思路,在系统中设置N个桶,共同组成一个队列。第i个桶中存放i秒后将要变为空闲连接的连接。这样一来,我们只需要每秒剔除第0个桶中的连接即可,无需遍历全部连接,剔除之后,将第0个桶移动到尾部。而一个连接如果接收到了新数据,那么该连接就将自己重新放入最后一个桶中。


以循环队列构造的time wheel

接下来,我先用自己的语言描述一下书中作者所使用的数据结构:

嗯...上面一段听起来不太像人话。接下来是我对上述数据结构的理解和剖析:

  1. 明确time wheel服务的目标:
    明确这一点是很重要的,time wheel的核心仅仅是"⏲计时"。这意味time wheel只是去判断一个TCP连接是否已经超时成为空闲连接,而不去判断、也不去影响这个TCP连接的其他状态,比如说这个TCP连接是否已经断开。那或许有人会这样问了:“当time wheel得知某一个TCP连接已经超时并变成空闲连接之后,由谁来执行断开操作呢?”答案是交给其他合适的数据结构。明白了这一点,使用using Entry = shd::weak_ptr<TcpConnection>作为time wheel和TcpConnection之间的桥梁也就是顺其而然的事情了,因为weak_ptr的特点就是,它可以探测到其指向的资源的状态,但本身并不影响其状态。那为什么非得要这样来设计呢?我觉得这是一个明确分工的问题,一个数据结构应该专注于将自己的任务做好,而不应过分插手其他的任务。

  2. 直观的解决方案
    其实明白了第1点的内涵之后,再结合time wheel的思路,我们就已经可以有一个比较直观的方案了。当服务器发现新的TcpConnection之后,就生成这个TcpConnection对应的Entry,并将其放置到尾部的桶中。而每当任意TcpConnection接收到新的消息之后,就将对应的Entry从原来的桶再移动到尾部的桶中。最后,利用定时器设定一个每秒执行一次的回调函数,该函数将头部的桶中的全部Entry逐个进行delete,而Entry中的析构函数会去断开对应的TcpConnection。上述的思路就是很直观明了的。

  3. 反思:应该充分利用C++语言特性
    在第2点谈到的初步方案中,我们利用Entry的析构函数去断开对应的TcpConnection,这已经体现了RAII的思路了,但方案很麻烦的一点在于,我们需要对Entry进行频繁的移动。试想一下,为了移动Entry,我们必须要记录每个Entry位于哪一个桶中,这就又需要一个新的数据结构,变来变去徒增烦恼。还是再一次发挥RAII的优势,我们再使用一层using EntryPtr = shared_ptr<Entry>;去管理Entry。具体的思路是这样的,桶中的元素类型从Entry变为EntryPtr,这样一来,每当任意TcpConnection接收到新的消息之后,我们只需要在尾部的桶中新增一个对应EntryEntryPtr。定时器的回调函数还是同样的模式,这样一来,每当一个TcpConnection有新消息时,其对应的Entry的引用计数会增加,而每秒执行一次的定时函数又会减少头部桶中的各个Entry的引用计数。完美,我们不再需要新的数据结构去记录Entry的位置了。下图是一个简单的示意,当前这个Entry在桶中共存放了4个EntryPtr,如果后续的6秒内该TcpConnection均没有接收到数据,那么EntryPtr的引用计数就会变为0,那么系统自动执行Entry的析构函数,也就是去断开对应的TcpConnection

    当前Entry有4个EntryPtr
  1. 细节:一个TcpConnection仅对应一个Entry
    这是一个非常隐秘的细节,但也及其重要。试想一下,如果同一个TcpConnection生成了两个Entry,而这两个Entry各自的多个EntryPtr又都放入了桶中,那么毫无疑问,该TcpConnection会被"断开两次",而放入到此处的语义中,则是TcpConnection会被提前断开连接。下图是一个示例,图中的TcpConnection生成了两个Entry,而Entry2最多在4秒以后就会因为引用计数变为0而进行析构(假设这个连接之后不会再接收数据),即断开了对应的TCP连接,但实际上,桶6中也有对应的代理,这意味着理论上这个TcpConnection至少还应该存在6秒。这也就是书中两个思考题的答案了,我们需要在TcpConnectioncontext中保存那唯一一个Entry,而这个Entry是在服务器探测到新连接的时候为这个新连接创建出来的。
    错误的示例,单个TcpConnection生成了两个Entry

接下来就是实际操作了,明白了原理之后,就简单很多了。

/*  省略各个头文件 */
/*  ...  */
class EchoServer
{
    using WeakTcpConnectionPtr = std::weak_ptr<TcpConnection>;
    struct Entry {
        std::weak_ptr<TcpConnection> ptr_;
        Entry(const WeakTcpConnectionPtr &ptr): ptr_(ptr) {} // weak_ptr是可以通过shared_ptr进行构造的
        ~Entry(){
            const TcpConnectionPtr conn = ptr_.lock(); // return the corresponding shared_ptr
            if(conn){ // must check the activity before shutdown action
                conn->shutdown();
            }
        }
    };

    using EntryPtr = std::shared_ptr<Entry>;
    using Bucket = std::unordered_set<EntryPtr>;
    using BucketList = boost::circular_buffer<Bucket>; // our finale data structure

    using WeakEntryPtr = std::weak_ptr<Entry>;

public:
    EchoServer(EventLoop *loop, const InetAddress &addr, const int &num_buckets):
        loop_(loop), server_(loop, addr, "ECHO SERVER"), buckets_(num_buckets)
    {
        server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1)); // must have the &
        server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));
        loop_->runEvery(1.0, std::bind(&EchoServer::onTime, this));
    }
    // ~EchoServer();

    void start_(){
        server_.start();
    }

private:
    EventLoop *loop_;
    TcpServer server_;
    BucketList buckets_;

    void onTime(){
        buckets_.push_back(Bucket()); // the head bucket is popped automatically
    }

    void onConnection(const TcpConnectionPtr &conn){
        LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort()
            << "-> " << conn->localAddress().toIpPort() << " is "
            << ( conn->connected() ? " ON " : " OFF " );
        if(conn->connected()){
            EntryPtr newPtr(new Entry(conn)); // shared_ptr can be transformed to weak_ptr
            buckets_.back().insert(newPtr);
            conn->setContext(WeakEntryPtr(newPtr)); // store weak_ptr of Entry
        }
    }
    void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time_){
        muduo::string msg(buf->retrieveAllAsString());
        LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort() 
            << " has message coming, size is: " << msg.size();
        WeakEntryPtr tmp = boost::any_cast<WeakEntryPtr>(conn->getContext());
        buckets_.back().insert(tmp.lock());// add the count
        conn->send(msg);
    }

    

};

#endif /* ECHO_SERVER_H */

int main(int argc, char *argv[])
{
    EventLoop loop;
    InetAddress addr(2333);
    EchoServer server(&loop, addr, 10); // 设置超时时间为10s
    server.start_();
    loop.loop();
    return 0;
}

上一篇 下一篇

猜你喜欢

热点阅读