boost.asio聊天服务器(muduo)

2020-10-25  本文已影响0人  老杜振熙

TCP分包

在发生一个消息或一帧数据时,通过一定的处理,使接收方能够从字节流中识别并截取一个个消息。

对于短连接的TCP服务,分包不是问题:

  1. 发送方主动关闭连接,就代表消息发送完毕;
  2. 接收方read的字节数变为0,代表消息接收完毕;

对于长连接的TCP服务,分包则需要从消息本身去着手:

  1. 可以使消息的长度保持固定;
  2. 可以让消息使用特定的字符或字符串去作为消息边界,如"\r\n"
  3. 可以在消息的头部加上长度字段;
  4. 利用消息自身的格式进行匹配,如JSON文件的{...}匹配机制

数据到达VS消息到达

muduo::net::buffer无法设置回调函数的触发条件,意思也就是说,buffer只能判断是否有数据到达,但具体的消息是否到达完整,则还需要进一步处理。为此,可以增加一层数据编解码器LengthHeaderCodec用于把数据的parse任务进行抽离,这样就简化了代码,并且使得用户只需要关心消息是否已经到达,到达了就调用回调函数。换而言之,回调函数onMessage()变为了onStringMessage(),编写服务端和客户端的程序就只需要考虑消息到达后的发送,而不必考虑字节流的parse,因为LengthHeaderCodec已经为我们提供了这一层服务。具体的调用过程为:

消息到达
 => Codec::OnMessage() # 注册为server_的消息到达回调函数
 => Codec::MessageCallback_() # 由server_.onStringMessage()注册

OK,服务端和客户端都可以将这个技巧进行运用了,甚至可以使用相同的编解码器对象以节省空间。

多线程初步(客户端)

客户端使用两个线程,一个用于读取键盘输入,另一个则用于打印从服务端接收到的数据。

实际上手

  1. 编解码器LengthHeaderCodec
#ifndef LENGTHHEADERCODEC_H
#define LENGTHHEADERCODEC_H

#include <muduo/net/Buffer.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Endian.h>

class LengthHeaderCodec: muduo::noncopyable // this class is noncopyable
{
private:
    using MessageCallbackFunc = std::function<void (const muduo::net::TcpConnectionPtr &, const muduo::string &, muduo::Timestamp time)>;
    
    const int32_t nHeader = sizeof (int32_t); // length of header;
    MessageCallbackFunc messageCallback_; // this is a r-value;

public:
    explicit LengthHeaderCodec(const MessageCallbackFunc &func): messageCallback_(func)
    {} // 此处的入参类型必须为const引用,因为messageCallback_是一个右值

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time){
        // LOG_INFO << " buf->readableBytes() is " << buf->readableBytes(); // test, to show if data is accepted;
        // above LOG_INFO is OK, so bug comes below;
        while(buf->readableBytes() >= nHeader){ // could have data, but completeness can't be guaranteed
            // const void *dataptr = buf->peek();
            // int32_t dataSz = *static_cast<const int32_t *>(dataptr);
            // const int32_t len = muduo::net::sockets::networkToHost32(dataSz);
            const int32_t len = buf->peekInt32(); // this function contains networkToHost32
            if (len<0 || len>65536){
                LOG_ERROR << "Invalid length " << len;
                conn->shutdown();
                break;
            } else if(buf->readableBytes() >= nHeader+len){
                buf->retrieve(nHeader);
                muduo::string message_(buf->peek(), len);
                // only @message_ is used here
                messageCallback_(conn, message_, time); // string message callback, registered by server and client
                buf->retrieve(len);
            } else {
                break; // message is not complete;
            }
        }
    }

    void send(muduo::net::TcpConnection *conn, const muduo::StringPiece &data){
        muduo::net::Buffer buf; 
        buf.append(data.data(), data.size()); // convert to type: muduo::net::Buffer
        int32_t len = static_cast<int32_t>(data.size());
        // int32_t dataSz = muduo::net::sockets::hostToNetwork32(len);
        // buf.prepend(&dataSz, sizeof dataSz);
        buf.prependInt32(len); // this function contains hostToNetwork32
        conn->send(&buf);
    }
};

#endif /* LENGTHHEADERCODEC_H */

LengthHeaderCodec一些注意点:

  1. 服务端ChatServerMyself
#ifndef CHAT_SERVER_MYSELF_H
#define CHAT_SERVER_MYSELF_H

#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <set>
#include "LengthHeaderCodec.h"

using namespace std::placeholders;

class ChatServerMyself // 为了方便直接将main函数写在了该文件内
{
private:
    using connSet = std::set<muduo::net::TcpConnectionPtr>;


    LengthHeaderCodec codec_;
    muduo::net::TcpServer server_;
    connSet connects_;
    

public:
    ChatServerMyself(muduo::net::EventLoop *loop, muduo::net::InetAddress addr):
        server_(loop, addr, "CHAT_SERVER_MYSELF"),
        codec_(std::bind(&ChatServerMyself::onStringMessage, this, _1, _2, _3))
        {
            server_.setConnectionCallback(std::bind(&ChatServerMyself::onConnection, this, _1));
            server_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
        }


    void start();

    void onConnection(const muduo::net::TcpConnectionPtr &conn);
    void onStringMessage(const muduo::net::TcpConnectionPtr &conn, const muduo::string &message, muduo::Timestamp time);

};

#endif /* CHAT_SERVER_MYSELF_H */

void ChatServerMyself::start(){
    server_.start();
}

void ChatServerMyself::onConnection(const muduo::net::TcpConnectionPtr &conn){
    LOG_INFO << "ASIO Server: " << conn->peerAddress().toIpPort() \
        << " -> " << conn->localAddress().toIpPort() << " is " \
        << (conn->connected() ? "UP" : "DOWN");

    if(conn->connected()){
        connects_.insert(conn);
    } else {
        connects_.erase(conn);
    }
}

void ChatServerMyself::onStringMessage(const muduo::net::TcpConnectionPtr &, const muduo::string &message, muduo::Timestamp ){
    for(connSet::iterator itr = connects_.begin(); itr != connects_.end(); ++itr){ // send data to all clients it connects;
        codec_.send(itr->get(), message);
    }
}

int main(int argc, char *argv[])
{
    LOG_INFO << "pid = " << getpid();
    if(argc > 1){
        muduo::net::EventLoop loop;
        uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
        muduo::net::InetAddress addr(port);
        ChatServerMyself server(&loop, addr);
        server.start();
        loop.loop();
    } else {
        printf("Usage: %s <port>\n", argv[0]);
    }
    return 0;
}

ChatServerMyself一些注意点:

  1. 客户端ChatClientMyself
#ifndef CHAT_CLIENT_MYSELF_H
#define CHAT_CLIENT_MYSELF_H

#include "LengthHeaderCodec.h"
#include <muduo/net/TcpClient.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoopThread.h>
#include <iostream>

using namespace muduo;
using namespace muduo::net;

class ChatClientMyself
{
private:
    TcpClient client_;
    TcpConnectionPtr conn_;
    LengthHeaderCodec codec_;
    mutable MutexLock mutex_;
    

public:
    ChatClientMyself(EventLoop *loop, const InetAddress &addr):
        client_(loop, addr, "CHAT_CLIENT_MYSELF"),
        codec_(std::bind(&ChatClientMyself::onStringMessage, this, _1, _2, _3))
    {
        client_.setConnectionCallback(std::bind(&ChatClientMyself::onConnection, this, _1));
        client_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
        client_.enableRetry(); // FIXME: what is this about?
    }

    void connect(){
        client_.connect();
    }

    void disconnect(){ // FIXME: is disconnect necessary?
        client_.disconnect();
    }

    void onConnection(const TcpConnectionPtr &conn){
        LOG_INFO << "Cient Server Connection: " \
            << conn->localAddress().toIpPort() << " -> " \
            << conn->peerAddress().toIpPort() << " is " \
            << (conn->connected() ? "UP" : "DOWN");
        if(conn->connected()){
            conn_ = conn;
        } else {
            conn_.reset();
        }
    }

    void onStringMessage(const TcpConnectionPtr &, const string &message, Timestamp){
        // std::cout is not thread-safe, so we use printf instead;
        printf("<<< %s\n", message.c_str()); // print accepted data;
    }

    // send data from STDIN input;
    void write(const string &message){
        MutexLockGuard lock_(mutex_); // must lock mutex to protect shared_ptr;
        if(conn_){
            codec_.send(conn_.get(), message);
        }
    }


};

#endif /* CHAT_CLIENT_MYSELF_H */

int main(int argc, char *argv[])
{
    LOG_INFO << "pid = " << getpid();
    if(argc > 2){
        EventLoopThread loopThread; // for net-IO
        uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
        InetAddress addr(argv[1], port);
        ChatClientMyself client(loopThread.startLoop(), addr);
        client.connect();

        std::string input_line;
        while(std::getline(std::cin, input_line)){
            client.write(input_line);
        }
        client.disconnect();
        CurrentThread::sleepUsec(1000 * 1000);
    } else {
        printf("Usage: %s <host-IP> <port>\n", argv[0]);
    }
    return 0;
}

CharClientMyself一些注意点:

演示结果

可以(自己和自己)聊天了.png
上一篇 下一篇

猜你喜欢

热点阅读