Acceptor——封装socket系统调用并集成到Reacto

2020-12-04  本文已影响0人  老杜振熙

注:本文为阅读了muduo网络库源码及作者著作之后对于网络库的复现和笔记

功能

我们定义一个class Acceptor,其功能是:让服务器在指定的端口处进行监听,如果在端口监听到连接,则执行由class Acceptor的类用户注册的回调函数。

底层API

首先梳理一下与Acceptor相关的底层API调用。

模块拆分

之前的Reactor模式是一个经典的IO多路复用模式,我们已经用一个class EventLoop抽象出了整个多路复用的网络模型,接下来就是将这个模型用起来,去构建实际的socket网络程序了。

socket网络编程设计到了繁多的底层API接口,在现代C++特性之中,自然也要对其进行合理的封装,才能发挥出语言的最大优势。首先绘制一下服务端的对于TCP连接的接收过程,如图1所示。

对于底层API而言,"socket() 到 bind() "是一条龙式的操作,因此可以定义一个class Socket来封装这个过程。服务器根据这一套流程建立本地的socket。随后当开始决定监听之后,则开始进行"listen()",当探测到连接请求的时候, 开始" accept()",得到对端的socket以及网络地址,然后就可以调用用户注册的回调函数了。需要注意的是图1中的handleRead()子框,这个子框才是Acceptor中的Channel的readable回调。原因其实很简单,因为对于服务器而言,当探测到有客户端发起连接请求之后,服务器的callback应该是先建立连接,再执行用户回调,那么“建立连接+执行用户回调”的整个过程才是“客户端发起连接请求”这个readable事件的回调。

图1. Acceptor工作流程示意

代码实战

/* Socket.h */
#ifndef SOCKET_H
#define SOCKET_H

#include "muduo/base/noncopyable.h"
#include <muduo/net/InetAddress.h>
#include <boost/noncopyable.hpp>

class Socket: boost::noncopyable
{
private:
  const int sockfd_;

public:
  int fd() const { return sockfd_; }
  void bindAddress(const muduo::net::InetAddress& localaddr);
  void listen();

  // fill the peeraddr and return peer connection fd
  // If failed, return -1;
  int accept(muduo::net::InetAddress* peeraddr);

  // SO_REUSEADDR
  void setReuseAddr(bool on);

  // SO_REUSEPORT
  void setReusePort(bool on);
  explicit Socket(const int &fd);
  ~Socket();
};

#endif /* SOCKET_H */

#include "Socket.h"
#include "muduo/base/Logging.h"
#include "muduo/net/InetAddress.h"
#include "SockOptions.h"

#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdio.h>  // snprintf

Socket::Socket(const int &fd):
  sockfd_(fd)
{

}

Socket::~Socket(){
  sockoptions::close(sockfd_);
}

void Socket::bindAddress(const muduo::net::InetAddress &localaddr){
  sockoptions::bindOrDie(sockfd_, localaddr.getSockAddr());
}

void Socket::listen(){
  sockoptions::listenOrDie(sockfd_);
}

int Socket::accept(muduo::net::InetAddress *peeraddr){
  sockaddr_in6 tmpAddr;
  bzero(&tmpAddr, sizeof tmpAddr);
  int connfd = sockoptions::accept(sockfd_, &tmpAddr);
  peeraddr->setSockAddrInet6(tmpAddr);
  return connfd;
}

void Socket::setReuseAddr(bool on){
  int reused = on;
  int len = static_cast<socklen_t>(sizeof reused);
  int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &reused, len);
  if(ret < 0){
    LOG_ERROR << "setReuseAddr falied";
  }
}

void Socket::setReusePort(bool on){
  int reused = on;
  int len = static_cast<socklen_t>(sizeof reused);
  int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &reused, len);
  if(ret < 0){
    LOG_ERROR << "setReuseAddr falied";
  }
}

/* Acceptor.h */
#ifndef ACCEPTOR_H
#define ACCEPTOR_H

#include <functional>
#include <muduo/net/InetAddress.h>
#include <memory>
#include <boost/noncopyable.hpp>

#include "../Reactor/Channel.h"
#include "Socket.h"

class EventLoop;

class Acceptor : boost::noncopyable
{
  // sending fd handle is not an ideal solution, better solution is 
  // sending a Socket object which uses RAII
  using ConnCallback = std::function<void (int, const muduo::net::InetAddress &)>;
  // using ConnCallback = std::function<void (Socket, const muduo::net::InetAddress &)>;
private:
  EventLoop *loop_;
  ConnCallback cb_;
  std::unique_ptr<Socket> socket_;
  Channel socketChannel_;
  bool listening_;

public:
  Acceptor(EventLoop *loop, const muduo::net::InetAddress & localAddr);
 ~Acceptor();

 bool isListening() const {
   return listening_;
 }

 void listen();
 void handleRead();
 void setNewConnectionCallback(const ConnCallback &func);
 
};

#endif /* ACCEPTOR_H */

/* Accpetor.cc */
#include <muduo/base/Logging.h>

#include "../Reactor/EventLoop.h"
#include "../Reactor/Channel.h"
#include "Acceptor.h"
#include "SockOptions.h"
#include "Socket.h"

Acceptor::Acceptor(EventLoop *loop, const muduo::net::InetAddress &localAddr)
  :loop_(loop),
  socket_(new Socket (sockoptions::createNonblockingOrDie(AF_INET))),
  socketChannel_(loop, socket_->fd()),
  listening_(false)
{
  socket_->setReuseAddr(true);
  socket_->bindAddress(localAddr);
  socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{

}

void Acceptor::listen(){
  loop_->assertInLoopThread();
  listening_ = true;
  socket_->listen(); // call the socket API `listen()`
  socketChannel_.enableRead(); // ready to call the callback
}

void Acceptor::handleRead(){
  muduo::net::InetAddress addr; // this is the perr address
  int connfd = socket_->accept(&addr);
  if(connfd < 0){
    LOG_FATAL << "Acceptor - socket accept failed";
    return ;
  }
  if(cb_){
    cb_(connfd, addr);
  } else {
    LOG_ERROR << "Acceptor - NewConnectionCallback unset";
  }
}

void Acceptor::setNewConnectionCallback(const ConnCallback &func){
  cb_ = func;
}


/* sockOptions.h */
#include <arpa/inet.h>


namespace sockoptions
{

int createNonblockingOrDie(sa_family_t family);
void bindOrDie(int sockfd, const struct sockaddr* addr);
void listenOrDie(int sockfd);
void close(int sockfd);
int  accept(int sockfd, struct sockaddr_in6* addr);
const struct sockaddr* sockaddr_cast(const struct sockaddr_in* addr);
const struct sockaddr* sockaddr_cast(const struct sockaddr_in6* addr);
struct sockaddr* sockaddr_cast(struct sockaddr_in6* addr);

}

/* sockOptions.h */
#include "./SockOptions.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Types.h"
#include "muduo/net/Endian.h"

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>  // snprintf
#include <sys/socket.h>
#include <sys/uio.h>  // readv
#include <unistd.h>

int sockoptions::createNonblockingOrDie(sa_family_t family){
  // stream == tcp
  int sockfd = ::socket(family,
                        SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 
                        IPPROTO_TCP);
  if(sockfd < 0){
    LOG_FATAL << "Create socket failed!";
  }
  return sockfd;
}

void sockoptions::bindOrDie(int sockfd, const struct sockaddr *addr){
  socklen_t len = static_cast<socklen_t>(sizeof(*addr));
  int ret = ::bind(sockfd, addr, len);
  if(ret < 0){
    LOG_FATAL << "Bind address failed!";
  }
}

void sockoptions::listenOrDie(int sockfd){
  int ret = ::listen(sockfd, SOMAXCONN);
  if(ret < 0){
    LOG_FATAL << "Listen socket failed!";
  }
}

void sockoptions::close(int sockfd){
  int ret = ::close(sockfd);
  if(ret < 0){
    LOG_FATAL << "Close sockfd falied!";
  }
}

int sockoptions::accept(int sockfd, struct sockaddr_in6 *addr){
  struct sockaddr *sa = sockaddr_cast(addr);
  socklen_t len = static_cast<socklen_t>(sizeof(*addr));
  int connfd = ::accept(sockfd, sa, &len);
  if(connfd < 0){
    LOG_FATAL << "Accept socket failed!";
  }
  return connfd;
}

const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in *addr){
  return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
}

const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in6 *addr){
  return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
}

struct sockaddr * sockoptions::sockaddr_cast(struct sockaddr_in6 *addr){
  return static_cast<struct sockaddr *>(muduo::implicit_cast<void *>(addr));
}


/* main_Accpetor.cc */
#include "../Reactor/EventLoop.h"
#include "Acceptor.h"
#include <muduo/net/InetAddress.h>

void callbackFunc(int connfd, const muduo::net::InetAddress &addr){
  printf("A new connection comming from %s\n", addr.toIpPort().c_str());
  char msg[] = "Hello, I can hear you calling me\n";
  ::write(connfd, msg, sizeof msg);
}

int main(int argc, char *argv[])
{
  EventLoop loop;
  muduo::net::InetAddress localAddr(2333);
  muduo::net::InetAddress localAddr2(3332);
  Acceptor acceptor(&loop, localAddr);
  Acceptor acceptor2(&loop, localAddr2);
  acceptor.setNewConnectionCallback(&callbackFunc);
  acceptor2.setNewConnectionCallback(&callbackFunc);
  acceptor.listen();
  acceptor2.listen();
  loop.loop();
  return 0;
}

运行结果

Acceptor运行结果(新建一个终端去监听本地的2333端口和3332端口)

实现过程中的一些知识点总结

  1. sockaddrsockaddr_in
    两者之间是相互补充的关系。大多数诸如::bind()的底层socket API使用struct sockaddr *作为入参类型,但sockaddr的小缺陷是,它将IP端口和IP地址混在了一个变量中,故赋值时不太方便。sockaddr_in弥补了这一问题,它将端口和地址进行了分离,同时为了适配sockaddr,又填充了一些不使用的变量,使得两种结构体的内存分布完全一致,因此两种类型的指针可以相互转换。总结一下:sockaddr_in简化了变量的赋值,sockaddr用于函数的传参。

  2. implicit_cast
    该类型转换应该是为了更安全的进行精确类型转换,当使用implicit_cast的时候,编译器会去检查该转换是否安全。(不过该cast暂时还没有纳入标准库)

  3. SOMAXCONN
    在调用::listen()的时候需要指定最多可以支持多少连接请求,为此系统定义了一个专门的宏SOMAXCONN用来表示系统所支持的最多请求个数。

  4. setsockopt()
    网络通信会在不同的层级或者协议之中拥有不同的设置选项,setsockopt()的功能就是将设置这些选项都抽象到了顶层的socket这一层级。比如说要设置socket底层API层级的某些选项,抑或是设置TCP协议中的某些选项,都通过该函数来执行。

TcpServer

TcpServer是Acceptor的直接使用者,其实只需要将TcpServer的Callback注册到Acceptor里面即可

上一篇 下一篇

猜你喜欢

热点阅读