(6)链接与消息封装(Reactor部分)【Lars-基于C++

2019-09-18  本文已影响0人  刘丹冰Aceld

【Lars教程目录】

Lars源代码
https://github.com/aceld/Lars


【Lars系统概述】
第1章-概述
第2章-项目目录构建


【Lars系统之Reactor模型服务器框架模块】
第1章-项目结构与V0.1雏形
第2章-内存管理与Buffer封装
第3章-事件触发EventLoop
第4章-链接与消息封装
第5章-Client客户端模型
第6章-连接管理及限制
第7章-消息业务路由分发机制
第8章-链接创建/销毁Hook机制
第9章-消息任务队列与线程池
第10章-配置文件读写功能
第11章-udp服务与客户端
第12章-数据传输协议protocol buffer
第13章-QPS性能测试
第14章-异步消息任务机制
第15章-链接属性设置功能


【Lars系统之DNSService模块】
第1章-Lars-dns简介
第2章-数据库创建
第3章-项目目录结构及环境构建
第4章-Route结构的定义
第5章-获取Route信息
第6章-Route订阅模式
第7章-Backend Thread实时监控


【Lars系统之Report Service模块】
第1章-项目概述-数据表及proto3协议定义
第2章-获取report上报数据
第3章-存储线程池及消息队列


【Lars系统之LoadBalance Agent模块】
第1章-项目概述及构建
第2章-主模块业务结构搭建
第3章-Report与Dns Client设计与实现
第4章-负载均衡模块基础设计
第5章-负载均衡获取Host主机信息API
第6章-负载均衡上报Host主机信息API
第7章-过期窗口清理与过载超时(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-负载均衡获取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能测试工具
第12章- Lars启动工具脚本


5) tcp链接与Message消息封装

​ 好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。

5.1 Message消息封装

7-TCP粘包问题-拆包封包过程.jpeg

​ 先创建一个message.h头文件

lars_reactor/include/message.h

#pragma once

//解决tcp粘包问题的消息头
struct msg_head
{
    int msgid;
    int msglen;
};

//消息头的二进制长度,固定数
#define MESSAGE_HEAD_LEN 8

//消息头+消息体的最大长度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)

​ 接下来我们每次在server和 client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。

5.2 创建一个tcp_conn连接类

lars_reactor/include/tcp_conn.h

#pragma once

#include "reactor_buf.h"
#include "event_loop.h"

//一个tcp的连接信息
class tcp_conn
{
public:
    //初始化tcp_conn
    tcp_conn(int connfd, event_loop *loop);

    //处理读业务
    void do_read();

    //处理写业务
    void do_write();

    //销毁tcp_conn
    void clean_conn();

    //发送消息的方法
    int send_message(const char *data, int msglen, int msgid);

private:
    //当前链接的fd
    int _connfd;
    //该连接归属的event_poll
    event_loop *_loop;
    //输出buf
    output_buf obuf;     
    //输入buf
    input_buf ibuf;
};

简单说明一下里面的成员和方法:

成员:

_connfd:server刚刚accept成功的套接字

_loop:当前链接所绑定的事件触发句柄.

obuf:链接输出缓冲,向对端写数据

ibuf:链接输入缓冲,从对端读数据

方法

tcp_client():构造,主要在里面实现初始化及创建链接链接的connect过程。

do_read():读数据处理业务,主要是EPOLLIN事件触发。

do_write():写数据处理业务,主要是EPOLLOUT事件触发。

clean_conn():清空链接资源。

send_message():将消息打包成TLV格式发送给对端。

​ 接下来,实现以下tcp_conn类.

lars_reactor/src/tcp_conn.cpp

#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>

#include "tcp_conn.h"
#include "message.h"


//回显业务
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
    conn->send_message(data, len, msgid);
}


//连接的读事件回调
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
    tcp_conn *conn = (tcp_conn*)args;
    conn->do_read();
}
//连接的写事件回调
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
    tcp_conn *conn = (tcp_conn*)args;
    conn->do_write();
}

//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
    _connfd = connfd;
    _loop = loop;
    //1. 将connfd设置成非阻塞状态
    int flag = fcntl(_connfd, F_GETFL, 0);
    fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);

    //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
    int op = 1;
    setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h

    //3. 将该链接的读事件让event_loop监控 
    _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);

    //4 将该链接集成到对应的tcp_server中
    //TODO
}

//处理读业务
void tcp_conn::do_read()
{
    //1. 从套接字读取数据
    int ret = ibuf.read_data(_connfd);
    if (ret == -1) {
        fprintf(stderr, "read data from socket\n");
        this->clean_conn();
        return ;
    }
    else if ( ret == 0) {
        //对端正常关闭
        printf("connection closed by peer\n");
        clean_conn();
        return ;
    }

    //2. 解析msg_head数据    
    msg_head head;    
    
    //[这里用while,可能一次性读取多个完整包过来]
    while (ibuf.length() >= MESSAGE_HEAD_LEN)  {
        //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN    
        memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
        if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
            fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
            this->clean_conn();
            break;
        }
        if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
            //缓存buf中剩余的数据,小于实际上应该接受的数据
            //说明是一个不完整的包,应该抛弃
            break;
        }

        //2.2 再根据头长度读取数据体,然后针对数据体处理 业务
        //TODO 添加包路由模式
        
        //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
        ibuf.pop(MESSAGE_HEAD_LEN);
        
        //处理ibuf.data()业务数据
        printf("read data: %s\n", ibuf.data());

        //回显业务
        callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);

        //消息体处理完了,往后便宜msglen长度
        ibuf.pop(head.msglen);
    }

    ibuf.adjust();
    
    return ;
}

//处理写业务
void tcp_conn::do_write()
{
    //do_write是触发玩event事件要处理的事情,
    //应该是直接将out_buf力度数据io写会对方客户端 
    //而不是在这里组装一个message再发
    //组装message的过程应该是主动调用
    
    //只要obuf中有数据就写
    while (obuf.length()) {
        int ret = obuf.write2fd(_connfd);
        if (ret == -1) {
            fprintf(stderr, "write2fd error, close conn!\n");
            this->clean_conn();
            return ;
        }
        if (ret == 0) {
            //不是错误,仅返回0表示不可继续写
            break;
        }
    }

    if (obuf.length() == 0) {
        //数据已经全部写完,将_connfd的写事件取消掉
        _loop->del_io_event(_connfd, EPOLLOUT);
    }

    return ;
}

//发送消息的方法
int tcp_conn::send_message(const char *data, int msglen, int msgid)
{
    printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
    bool active_epollout = false; 
    if(obuf.length() == 0) {
        //如果现在已经数据都发送完了,那么是一定要激活写事件的
        //如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活
        active_epollout = true;
    }

    //1 先封装message消息头
    msg_head head;
    head.msgid = msgid;
    head.msglen = msglen;
     
    //1.1 写消息头
    int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
    if (ret != 0) {
        fprintf(stderr, "send head error\n");
        return -1;
    }

    //1.2 写消息体
    ret = obuf.send_data(data, msglen);
    if (ret != 0) {
        //如果写消息体失败,那就回滚将消息头的发送也取消
        obuf.pop(MESSAGE_HEAD_LEN);
        return -1;
    }

    if (active_epollout == true) {
        //2. 激活EPOLLOUT写事件
        _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
    }


    return 0;
}

//销毁tcp_conn
void tcp_conn::clean_conn()
{
    //链接清理工作
    //1 将该链接从tcp_server摘除掉    
    //TODO 
    //2 将该链接从event_loop中摘除
    _loop->del_io_event(_connfd);
    //3 buf清空
    ibuf.clear(); 
    obuf.clear();
    //4 关闭原始套接字
    int fd = _connfd;
    _connfd = -1;
    close(fd);
}

​ 具体每个方法的实现,都很清晰。其中conn_rd_callback()conn_wt_callback()是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用do_read()do_write()方法。

5.3 修正tcp_server对accept之后的处理方法

lars_reactor/src/tcp_server.cpp

//...

//开始提供创建链接服务
void tcp_server::do_accept()
{
    int connfd;    
    while(true) {
        //accept与客户端创建链接
        printf("begin accept\n");
        connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
        if (connfd == -1) {
            if (errno == EINTR) {
                fprintf(stderr, "accept errno=EINTR\n");
                continue;
            }
            else if (errno == EMFILE) {
                //建立链接过多,资源不够
                fprintf(stderr, "accept errno=EMFILE\n");
            }
            else if (errno == EAGAIN) {
                fprintf(stderr, "accept errno=EAGAIN\n");
                break;
            }
            else {
                fprintf(stderr, "accept error");
                exit(1);
            }
        }
        else {
            //accept succ!
            // ============= 将之前的触发回调的删掉,改成如下====
            tcp_conn *conn = new tcp_conn(connfd, _loop);
            if (conn == NULL) {
                fprintf(stderr, "new tcp_conn error\n");
                exit(1);
            }
            // ============================================
            printf("get new connection succ!\n");
            break;
        }
    }
}

//...

​ 这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作.

​ 现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用nc指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。


关于作者:

作者:Aceld(刘丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原创书籍gitbook: http://legacy.gitbook.com/@aceld

原创声明:未经作者允许请勿转载, 如果转载请注明出处

上一篇下一篇

猜你喜欢

热点阅读