linux 网络编程程序员已收录(2017-8-15)

ss-libev 源码解析udp篇 (3)

2017-06-23  本文已影响108人  勤奋happyfire

本篇分析server_recv_cb,这个是udp转发中最重要的函数。

static void server_recv_cb(EV_P_ ev_io *w, int revents)
{
      //w是ev_io*,他恰好是server_ctx_t的第一个成员,因此可以获取到sever_ctx指针
      server_ctx_t *server_ctx = (server_ctx_t *)w;
      //src_addr用于存放前端的地址
      struct sockaddr_storage src_addr;
      memset(&src_addr, 0, sizeof(struct sockaddr_storage));
      //创建一个buf用于读取数据
      buffer_t *buf = ss_malloc(sizeof(buffer_t));
      balloc(buf, buf_size);
      //前端地址结构的长度,先预先设置为sockaddr_storage的长度
      socklen_t src_addr_len = sizeof(struct sockaddr_storage);
      //读取数据的offset    
      unsigned int offset    = 0;
      //使用recvfrom从server_ctx->fd读取数据到buf, 返回实际读取的数据长度r,并得到发送udp包过来的前端的地址,以及地址结构的长度
      //这儿使用sockaddr_storage是要兼容ipv4,ipv6地址
       ssize_t r;
       r = recvfrom(server_ctx->fd, buf->data, buf_size,
                 0, (struct sockaddr *)&src_addr, &src_addr_len);
      //buf的实际数据长度设置为r 
      buf->len = r; 

      //后续代码
}
#ifdef MODULE_REMOTE
    tx += buf->len; //统计接收到的数据量
    //解密整个buf
    int err = server_ctx->crypto->decrypt_all(buf, server_ctx->crypto->cipher, buf_size);
    if (err) {
        // drop the packet silently
        goto CLEAN_UP;
    }
#endif
    uint8_t frag = *(uint8_t *)(buf->data + 2);
    offset += 3;
char host[257] = { 0 };
    char port[64]  = { 0 };
    struct sockaddr_storage dst_addr;//存放目的地址
    memset(&dst_addr, 0, sizeof(struct sockaddr_storage));

    int addr_header_len = parse_udprealy_header(buf->data + offset, buf->len - offset,
                                                host, port, &dst_addr);
    if (addr_header_len == 0) {
        // error in parse header
        goto CLEAN_UP;
    }

    char *addr_header = buf->data + offset;

看一下parse_udprealy_header,输入buf指针和长度,输出host, port以及一个sockaddr_storage,函数返回值是addr header的长度,(addr header即 ATYP | DST.ADDR | DST.PORT 这个结构,其中ATYP可能是ipv4,v6或域名,如果是域名ADDR是名字长度+名字),根据地址类型的不同这个长度是不一样的,并且对于域名长度本就是可变的。
static int parse_udprealy_header(const char *buf, const size_t buf_len, char *host, char *port, struct sockaddr_storage *storage)
如果addr header中是ipv4或v6地址,storage就会被设置上,如果是域名且域名内容是ip地址,那么ip地址同样会被用于设置storage; 如果是域名且域名不是ip地址,那么storage就没有被设置,也就是是全0的值。另外host和ip会被设置上,其中对于ipv4v6,host就是通过inet_ntop转换来的ip地址表示。下面回到sever_recv_cb函数中,如果parse_udprealy_header返回为0则解析失败,CLEAN_UP只是将buf释放,然后退出函数:

CLEAN_UP:
    bfree(buf);
    ss_free(buf);

那么对于一个不正常的udp包,ss会默默的丢弃,效果和丢包一样。
然后char *addr_header = buf->data + offset;这句解析出addr header的指针,后面会使用。

#ifdef MODULE_LOCAL
    char *key = hash_key(server_ctx->remote_addr->sa_family, &src_addr);
#else
    char *key = hash_key(dst_addr.ss_family, &src_addr);
#endif

    struct cache *conn_cache = server_ctx->conn_cache;

    remote_ctx_t *remote_ctx = NULL;
    cache_lookup(conn_cache, key, HASH_KEY_LEN, (void *)&remote_ctx);

    if (remote_ctx != NULL) {
        if (sockaddr_cmp(&src_addr, &remote_ctx->src_addr, sizeof(src_addr))) {
            remote_ctx = NULL;
        }
    }
// reset the timer
    if (remote_ctx != NULL) {
        ev_timer_again(EV_A_ & remote_ctx->watcher);
    }

首先是计算key,对于local,使用它对应的remote地址的sa_family和前端地址结构创建key;对于remote,使用目的地址的ss_family和前端地址结构创建key。也就是说,cache是通过后端的sa_family值和前端的地址对象来索引的。而key的长度是1个int的长度加上sockaddr_storage结构的长度。至于为啥用这两个值的组合作为key,因为cache中存放的是remote_ctx_t结构的指针。看一下remote_ctx_t的定义:

typedef struct remote_ctx {
    ev_io io;
    ev_timer watcher;
    int af;
    int fd;
    int addr_header_len;
    char addr_header[384];
    struct sockaddr_storage src_addr;
#ifdef MODULE_REMOTE
    struct sockaddr_storage dst_addr;
#endif
    struct server_ctx *server_ctx;
} remote_ctx_t;

使用key调用cache_lookup去查询cache中是否有该key对应的remote_ctx。如果cache中有key对应的remote_ctx则取出。取出remote_ctx后,还要检查下remote_ctx中保存的src_addr和当前数据包的src_addr是否一致,如果不一致则remote_ctx置空不再使用。如果remote_ctx可用,则重置他的timeout timer,因为要复用他了。很快会看到,remote_ctx是向后端发送udp包的上下文。

#ifdef MODULE_LOCAL
    //上面提到,根据socks5包头中的frag是否为0决定是否丢弃包
    if (frag) {
        LOGE("[udp] drop a message since frag is not 0, but %d", frag);
        goto CLEAN_UP;
    }
    //取出server_ctx中存放的remote地址
    const struct sockaddr *remote_addr = server_ctx->remote_addr;
    const int remote_addr_len          = server_ctx->remote_addr_len;
    //如果没有从cache中找到可以复用的remote_ctx,则需要创建新的套接字
    if (remote_ctx == NULL) {
        // Bind to any port
        int remotefd = create_remote_socket(remote_addr->sa_family == AF_INET6);
        setnonblocking(remotefd);//设置为非阻塞
        //创建一个全新的remote_ctx对象
        // Init remote_ctx
        remote_ctx                  = new_remote(remotefd, server_ctx);
        remote_ctx->src_addr        = src_addr;//保存src_addr
        remote_ctx->af              = remote_addr->sa_family;//保存af
        remote_ctx->addr_header_len = addr_header_len;
        memcpy(remote_ctx->addr_header, addr_header, addr_header_len);//保存addr header

        // Add to conn cache
        cache_insert(conn_cache, key, HASH_KEY_LEN, (void *)remote_ctx);//加入cache中

        // Start remote io
        ev_io_start(EV_A_ & remote_ctx->io);//开始监听remote fd上的读事件,详见new_remote中的事件设置
        ev_timer_start(EV_A_ & remote_ctx->watcher);
    }
    //对于local, offset==3,这儿从buf中去掉前面3个字节,即剩下addr header和数据
    if (offset > 0) {
        buf->len -= offset;
        memmove(buf->data, buf->data + offset, buf->len);
    }
    //加密去除前3个字节之后的buf
    int err = server_ctx->crypto->encrypt_all(buf, server_ctx->crypto->cipher, buf_size);
    //通过remote fd,将buf发送到后端
    int s = sendto(remote_ctx->fd, buf->data, buf->len, 0, remote_addr, remote_addr_len);

其中,new_remote如下:

remote_ctx_t *
new_remote(int fd, server_ctx_t *server_ctx)
{
    remote_ctx_t *ctx = ss_malloc(sizeof(remote_ctx_t));
    memset(ctx, 0, sizeof(remote_ctx_t));

    ctx->fd         = fd;
    ctx->server_ctx = server_ctx;

    ev_io_init(&ctx->io, remote_recv_cb, fd, EV_READ);//remote fd上有数据可读时调用remote_recv_cb,接收来自后端返回的数据
    ev_timer_init(&ctx->watcher, remote_timeout_cb, server_ctx->timeout,
                  server_ctx->timeout);//设置超时,超时后会从cache中移除该remote_ctx

    return ctx;
}

总之,ss-local对于接收到的udp包解析出地址,去除buf的前3个字节,从cache中取remote_ctx或者创建新的socket和remote_ctx,加密buf并发送到remote,从而完成了转发udp包的工作。同时,启用了remote fd上读事件的监听,当remote有数据包返回到local时,调用remote_recv_cb。

int cache_hit  = 0;
    int need_query = 0;
    if (remote_ctx != NULL) {
       //如果从cache中查询成功
        cache_hit = 1;
        // detect destination mismatch
        if (remote_ctx->addr_header_len != addr_header_len
            || memcmp(addr_header, remote_ctx->addr_header, addr_header_len) != 0) {
            //cache命中的remote_ctx保存的addr header和分析出来的不一致
            if (dst_addr.ss_family != AF_INET && dst_addr.ss_family != AF_INET6) {
                need_query = 1;//fa不是ip的情况则是域名,因为域名时没有设置为0,设置need_query
            }
        } else {
            memcpy(&dst_addr, &remote_ctx->dst_addr, sizeof(struct sockaddr_storage));
        }
    } else {
        if (dst_addr.ss_family == AF_INET || dst_addr.ss_family == AF_INET6) {
            //没有remote_ctx,且dest addr是ip地址,则创建新的socket和remote_ctx
            int remotefd = create_remote_socket(dst_addr.ss_family == AF_INET6);
            if (remotefd != -1) {
                setnonblocking(remotefd);

                remote_ctx                  = new_remote(remotefd, server_ctx);
                remote_ctx->src_addr        = src_addr;
                remote_ctx->server_ctx      = server_ctx;
                remote_ctx->addr_header_len = addr_header_len;
                memcpy(remote_ctx->addr_header, addr_header, addr_header_len);
                memcpy(&remote_ctx->dst_addr, &dst_addr, sizeof(struct sockaddr_storage));//dst_addr是remote才有的
            } else {
                ERROR("[udp] bind() error");
                goto CLEAN_UP;
            }
        }
    }

    if (remote_ctx != NULL && !need_query) {
       //不需要query,则直接send数据了,数据为addr header之后的实际数据,因此这儿要偏移addr_header_len
        size_t addr_len = get_sockaddr_len((struct sockaddr *)&dst_addr);
        int s           = sendto(remote_ctx->fd, buf->data + addr_header_len,
                                 buf->len - addr_header_len, 0,
                                 (struct sockaddr *)&dst_addr, addr_len);

        if (s == -1) {
            ERROR("[udp] sendto_remote");
            if (!cache_hit) {
                close_and_free_remote(EV_A_ remote_ctx);
            }
        } else {
            //发送成功且cache_hit,则保存remote_ctx到cache
            if (!cache_hit) {
                // Add to conn cache
                remote_ctx->af = dst_addr.ss_family;
                char *key = hash_key(remote_ctx->af, &remote_ctx->src_addr);
                cache_insert(server_ctx->conn_cache, key, HASH_KEY_LEN, (void *)remote_ctx);

                ev_io_start(EV_A_ & remote_ctx->io);//启动remote fd上的读事件监听,准备从目标地址接收udp
                ev_timer_start(EV_A_ & remote_ctx->watcher);
            }
        }
    } else {
        //域名的情况,需要解析
        struct addrinfo hints;
        memset(&hints, 0, sizeof(struct addrinfo));
        hints.ai_family   = AF_UNSPEC;
        hints.ai_socktype = SOCK_DGRAM;
        hints.ai_protocol = IPPROTO_UDP;

        struct query_ctx *query_ctx = new_query_ctx(buf->data + addr_header_len,
                                                    buf->len - addr_header_len);
        query_ctx->server_ctx      = server_ctx;
        query_ctx->addr_header_len = addr_header_len;
        query_ctx->src_addr        = src_addr;
        memcpy(query_ctx->addr_header, addr_header, addr_header_len);

        if (need_query) {
            query_ctx->remote_ctx = remote_ctx;
        }

        struct ResolvQuery *query = resolv_query(host, query_resolve_cb,
                                                 NULL, query_ctx, htons(atoi(port)));
        if (query == NULL) {
            ERROR("[udp] unable to create DNS query");
            close_and_free_query(EV_A_ query_ctx);
            goto CLEAN_UP;
        }
        query_ctx->query = query;
    }

query_resolve_cb就不展开了,解析成功后会做类似的操作,并且会把解析出来的addr存入remote_ctx->dst_addr,这样下次就不用再解析了。

上一篇下一篇

猜你喜欢

热点阅读