Android socket源码解析(二)socket的绑定与监

2021-12-12  本文已影响0人  yjy239

前言

对socket在内核的设计又了初步的印象后,可以进一步的探索socket整个流程。在这里我们先讨论服务端中,如果把准备好一个socket 绑定并进行监听的。

如果遇到什么问题可以来 https://www.jianshu.com/p/62dd608667e2 本文下讨论

正文

来看看服务端初始化的核心:

    ServerSocket server = new ServerSocket(port);
   
    Socket socket = server.accept();

首先看看初始化:

    public ServerSocket(int port) throws IOException {
        this(port, 50, null);
    }

    public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException(
                       "Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throw e;
        }
    }

首先限制了服务监听的端口号必须在0-65535 之间。一旦超出则直接报错IllegalArgumentException.

接着调用bind方法。

 public void bind(SocketAddress endpoint, int backlog) throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!oldImpl && isBound())
            throw new SocketException("Already bound");
        if (endpoint == null)
            endpoint = new InetSocketAddress(0);
        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if (security != null)
                security.checkListen(epoint.getPort());
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throw e;
        }
    }

首先在android中getSecurityManager返回的是空,这里不考察。

接着走的逻辑是两个步骤:

1.SocksSocketImpl bind

    /**
     * Binds the socket to the specified address of the specified local port.
     * @param address the address
     * @param lport the port
     */
    protected synchronized void bind(InetAddress address, int lport)
        throws IOException
    {
       synchronized (fdLock) {
            if (!closePending && (socket == null || !socket.isBound())) {
                NetHooks.beforeTcpBind(fd, address, lport);
            }
        }
        socketBind(address, lport);
        if (socket != null)
            socket.setBound();
        if (serverSocket != null)
            serverSocket.setBound();
    }

核心调用了如下几个方法:

socketBind

    void socketBind(InetAddress address, int port) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        IoBridge.bind(fd, address, port);

        this.address = address;
        if (port == 0) {
            // Now that we're a connected socket, let's extract the port number that the system
            // chose for us and store it in the Socket object.
            localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
        } else {
            localport = port;
        }
    }

    public static void bind(FileDescriptor fd, InetAddress address, int port) throws SocketException {
       ...
        try {
            Libcore.os.bind(fd, address, port);
        } catch (ErrnoException errnoException) {
            if (errnoException.errno == EADDRINUSE || errnoException.errno == EADDRNOTAVAIL ||
                errnoException.errno == EPERM || errnoException.errno == EACCES) {
                throw new BindException(errnoException.getMessage(), errnoException);
            } else {
                throw new SocketException(errnoException.getMessage(), errnoException);
            }
        }
    }

核心调用了Linuxbind方法。

2.libcore_io_Linux Linux_bind

文件: /libcore/luni/src/main/native/libcore_io_Linux.cpp

static void Linux_bind(JNIEnv* env, jobject, jobject javaFd, jobject javaAddress, jint port) {
    // We don't need the return value because we'll already have thrown.
    (void) NET_IPV4_FALLBACK(env, int, bind, javaFd, javaAddress, port, NULL_ADDR_FORBIDDEN);
}

核心是下面两个宏:

#define NET_IPV4_FALLBACK(jni_env, return_type, syscall_name, java_fd, java_addr, port, null_addr_ok, args...) ({ \
    return_type _rc = -1; \
    do { \
        sockaddr_storage _ss; \
        socklen_t _salen; \
        if ((java_addr) == NULL && (null_addr_ok)) { \
            /* No IP address specified (e.g., sendto() on a connected socket). */ \
            _salen = 0; \
        } else if (!inetAddressToSockaddr(jni_env, java_addr, port, _ss, _salen)) { \
            /* Invalid socket address, return -1. inetAddressToSockaddr has already thrown. */ \
            break; \
        } \
        sockaddr* _sa = _salen ? reinterpret_cast<sockaddr*>(&_ss) : NULL; \
        /* inetAddressToSockaddr always returns an IPv6 sockaddr. Assume that java_fd was created \
         * by Java API calls, which always create IPv6 socket fds, and pass it in as is. */ \
        _rc = NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ##args, _sa, _salen); \
        if (_rc == -1 && errno == EAFNOSUPPORT && _salen && isIPv4MappedAddress(_sa)) { \
            /* We passed in an IPv4 address in an IPv6 sockaddr and the kernel told us that we got \
             * the address family wrong. Pass in the same address in an IPv4 sockaddr. */ \
            (jni_env)->ExceptionClear(); \
            if (!inetAddressToSockaddrVerbatim(jni_env, java_addr, port, _ss, _salen)) { \
                break; \
            } \
            _sa = reinterpret_cast<sockaddr*>(&_ss); \
            _rc = NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ##args, _sa, _salen); \
        } \
    } while (0); \
    _rc; }) \




#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
    return_type _rc = -1; \
    int _syscallErrno; \
    do { \
        bool _wasSignaled; \
        { \
            int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
            AsynchronousCloseMonitor _monitor(_fd); \
            _rc = syscall_name(_fd, __VA_ARGS__); \
            _syscallErrno = errno; \
            _wasSignaled = _monitor.wasSignaled(); \
        } \
        if (_wasSignaled) { \
            jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
            _rc = -1; \
            break; \
        } \
        if (_rc == -1 && _syscallErrno != EINTR) { \
            /* TODO: with a format string we could show the arguments too, like strace(1). */ \
            throwErrnoException(jni_env, # syscall_name); \
            break; \
        } \
    } while (_rc == -1); /* _syscallErrno == EINTR && !_wasSignaled */ \
    if (_rc == -1) { \
        /* If the syscall failed, re-set errno: throwing an exception might have modified it. */ \
        errno = _syscallErrno; \
    } \
    _rc; })

核心能看到通过jni反射获取FileDescriptor 中的fd 具柄,然后调用bind系统调用。

其中bind系统调用的参数,通过inetAddressToSockaddr 把InetAddress 类获取fd,协议族类型,port转化成sockaddr_in结构体:

struct sockaddr_in {
  __kernel_sa_family_t sin_family; // 族群
  __be16 sin_port; // port 端口
  struct in_addr sin_addr; // ip地址
  unsigned char __pad[__SOCK_SIZE__ - sizeof(short int) - sizeof(unsigned short int) - sizeof(struct in_addr)];
};

值得学习的一点是,在c的编程中使用 #define 的定义。可以使用do... while(0)的方法,保证一个代码域的完整性,不被如if等特殊的程序顺序符给截断宏的代码完整逻辑性

3.内核的bind 系统调用

文件:/net/socket.c

SYSCALL_DEFINE3(bind, int, fd, struct sockaddr __user *, umyaddr, int, addrlen)
{
    struct socket *sock;
    struct sockaddr_storage address;
    int err, fput_needed;

    sock = sockfd_lookup_light(fd, &err, &fput_needed);
    if (sock) {
        err = move_addr_to_kernel(umyaddr, addrlen, &address);
        if (err >= 0) {
            err = security_socket_bind(sock,
                           (struct sockaddr *)&address,
                           addrlen);
            if (!err)
                err = sock->ops->bind(sock,
                              (struct sockaddr *)
                              &address, addrlen);
        }
        fput_light(sock->file, fput_needed);
    }
    return err;
}

注意,这里的ops是指socket结构体中的proto_ops.如果此时是IPV4协议,那么就是指inet_stream_opsbind方法指针,也就是inet_bind方法。

3.1.proto_ops inet_bind

文件:/net/ipv4/af_inet.c

int inet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
{
    struct sockaddr_in *addr = (struct sockaddr_in *)uaddr;
    struct sock *sk = sock->sk;
    struct inet_sock *inet = inet_sk(sk);
    struct net *net = sock_net(sk);
    unsigned short snum;
    int chk_addr_ret;
    int err;
    snum = ntohs(addr->sin_port);
    err = -EACCES;
    if (snum && snum < PROT_SOCK &&
        !ns_capable(net->user_ns, CAP_NET_BIND_SERVICE))
        goto out;

    lock_sock(sk);

    /* Check these errors (active socket, double bind). */
    err = -EINVAL;
    if (sk->sk_state != TCP_CLOSE || inet->inet_num)
        goto out_release_sock;

    inet->inet_rcv_saddr = inet->inet_saddr = addr->sin_addr.s_addr;
    if (chk_addr_ret == RTN_MULTICAST || chk_addr_ret == RTN_BROADCAST)
        inet->inet_saddr = 0;  /* Use device */

    /* Make sure we are allowed to bind here. */
    if (sk->sk_prot->get_port(sk, snum)) {
        inet->inet_saddr = inet->inet_rcv_saddr = 0;
        err = -EADDRINUSE;
        goto out_release_sock;
    }

    if (inet->inet_rcv_saddr)
        sk->sk_userlocks |= SOCK_BINDADDR_LOCK;
    if (snum)
        sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
    inet->inet_sport = htons(inet->inet_num);
    inet->inet_daddr = 0;
    inet->inet_dport = 0;
    sk_dst_reset(sk);
    err = 0;
out_release_sock:
    release_sock(sk);
out:
    return err;
}
EXPORT_SYMBOL(inet_bind);

这个过程实际上很简单,就是把socket结构体转化回inet_sock结构体。并在inet_sockinet_sport记录来源的ip地址。初始化 inet_daddr以及 inet_dport.也就是初始化目标通信的端口ip和port。

4.SocketServer socketListen

同理,listen也是类似的逻辑。最后会调用到PlainSocketImplsocketListen

    void socketListen(int count) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        try {
            Libcore.os.listen(fd, count);
        } catch (ErrnoException errnoException) {
            throw errnoException.rethrowAsSocketException();
        }
    }
static void Linux_listen(JNIEnv* env, jobject, jobject javaFd, jint backlog) {
    int fd = jniGetFDFromFileDescriptor(env, javaFd);
    throwIfMinusOne(env, "listen", TEMP_FAILURE_RETRY(listen(fd, backlog)));
}

template <typename rc_t>
static rc_t throwIfMinusOne(JNIEnv* env, const char* name, rc_t rc) {
    if (rc == rc_t(-1)) {
        throwErrnoException(env, name);
    }
    return rc;
}

能看到这个过程中,实际上还是调用了listen系统调用。不过一旦listen调用返回异常,就会把异常跑到了Java层。

5.Linux 内核listen

SYSCALL_DEFINE2(listen, int, fd, int, backlog)
{
    struct socket *sock;
    int err, fput_needed;
    int somaxconn;

    sock = sockfd_lookup_light(fd, &err, &fput_needed);
    if (sock) {
        somaxconn = sock_net(sock->sk)->core.sysctl_somaxconn;
        if ((unsigned int)backlog > somaxconn)
            backlog = somaxconn;

        err = security_socket_listen(sock, backlog);
        if (!err)
            err = sock->ops->listen(sock, backlog);

        fput_light(sock->file, fput_needed);
    }
    return err;
}

这里的逻辑和bind十分相似。本质上先从socket结构体中获取ops,也就是proto_ops结构体。在ipV4的协议中也就是指代inet_stream_ops。在这里也就是指向方法指针inet_listen
`

5.1.inet_stream_ops inet_listen

文件:/net/ipv4/af_inet.c

int inet_listen(struct socket *sock, int backlog)
{
    struct sock *sk = sock->sk;
    unsigned char old_state;
    int err;

    lock_sock(sk);

    err = -EINVAL;
    if (sock->state != SS_UNCONNECTED || sock->type != SOCK_STREAM)
        goto out;

    old_state = sk->sk_state;
    if (!((1 << old_state) & (TCPF_CLOSE | TCPF_LISTEN)))
        goto out;

    if (old_state != TCP_LISTEN) {
        if ((sysctl_tcp_fastopen & TFO_SERVER_ENABLE) != 0 &&
            inet_csk(sk)->icsk_accept_queue.fastopenq == NULL) {
            if ((sysctl_tcp_fastopen & TFO_SERVER_WO_SOCKOPT1) != 0)
                err = fastopen_init_queue(sk, backlog);
            else if ((sysctl_tcp_fastopen &
                  TFO_SERVER_WO_SOCKOPT2) != 0)
                err = fastopen_init_queue(sk,
                    ((uint)sysctl_tcp_fastopen) >> 16);
            else
                err = 0;
            if (err)
                goto out;
        }
        err = inet_csk_listen_start(sk, backlog);
        if (err)
            goto out;
    }
    sk->sk_max_ack_backlog = backlog;
    err = 0;

out:
    release_sock(sk);
    return err;
}
EXPORT_SYMBOL(inet_listen);

注意这个sk_max_ack_backlog数值就是accept的缓存区大小。

5.2.fastopen_init_queue

static inline int fastopen_init_queue(struct sock *sk, int backlog)
{
    struct request_sock_queue *queue =
        &inet_csk(sk)->icsk_accept_queue;

    if (queue->fastopenq == NULL) {
        queue->fastopenq = kzalloc(
            sizeof(struct fastopen_queue),
            sk->sk_allocation);
        if (queue->fastopenq == NULL)
            return -ENOMEM;

        sk->sk_destruct = tcp_sock_destruct;
        spin_lock_init(&queue->fastopenq->lock);
    }
    queue->fastopenq->max_qlen = backlog;
    return 0;
}

这里面有一个十分重要的字段icsk_accept_queue.这个对象是一个承载来自客户端的请求数据链表结构体。

而在这个方法实际上初始化了fastopen_queue。这是TFO(TCP Fast Open)核心结构体.这个技术实际上在很早之前就植入到内核中了。

它本质上在三次握手的阶段,客户端和服务端可以在cookie校验成功后互相通信一些数据。三次握手中第一步生成校验cookie。而后在SYN回包中就可以带上一些数据。

至于这么做的原因是Google在2011年的时候,发现重新链接的场景比较多,且是耗时的一个原因。会大致上耗费多一个RTT。因此做了这个优化并放入到2.6.3的内核版本中。

详细的可以阅读这篇文章http://www.vants.org/?post=210

服务端监听socket的核心结构体

来看看整个结构体的构成:

struct request_sock {
    struct sock_common      __req_common;
    struct request_sock     *dl_next;
    u16             mss;
    u8              num_retrans; /* number of retransmits */
    u8              cookie_ts:1; /* syncookie: encode tcpopts in timestamp */
    u8              num_timeout:7; /* number of timeouts */
    /* The following two fields can be easily recomputed I think -AK */
    u32             window_clamp; /* window clamp at creation time */
    u32             rcv_wnd;      /* rcv_wnd offered first time */
    u32             ts_recent;
    unsigned long           expires;
    const struct request_sock_ops   *rsk_ops;
    struct sock         *sk;
    u32             secid;
    u32             peer_secid;
};

/** struct listen_sock - listen state
 *
 * @max_qlen_log - log_2 of maximal queued SYNs/REQUESTs
 */
struct listen_sock {
    u8          max_qlen_log;
    u8          synflood_warned;
    /* 2 bytes hole, try to use */
    int         qlen;
    int         qlen_young;
    int         clock_hand;
    u32         hash_rnd;
    u32         nr_table_entries;
    struct request_sock *syn_table[0];
};

struct fastopen_queue {
    struct request_sock *rskq_rst_head; /* Keep track of past TFO */
    struct request_sock *rskq_rst_tail; /* requests that caused RST.
                         * This is part of the defense
                         * against spoofing attack.
                         */
    spinlock_t  lock;
    int     qlen;       /* # of pending (TCP_SYN_RECV) reqs */
    int     max_qlen;   /* != 0 iff TFO is currently enabled */
};


struct request_sock_queue {
    struct request_sock *rskq_accept_head;
    struct request_sock *rskq_accept_tail;
    rwlock_t        syn_wait_lock;
    u8          rskq_defer_accept;
    /* 3 bytes hole, try to pack */
    struct listen_sock  *listen_opt;
    struct fastopen_queue   *fastopenq; /* This is non-NULL iff TFO has been
                         * enabled on this listener. Check
                         * max_qlen != 0 in fastopen_queue
                         * to determine if TFO is enabled
                         * right at this moment.
                         */
};
request_sock_queue

首先来看看最外层的核心结构体request_sock_queue的构成

request_sock

对于每一个请求来说,当客户端发送请求,通过网卡进入内核后,都会变成一个个request_sock缓存在队列中。等待服务端的消费处理。

在这里之所以会对应一个socket结构体,是因为在下面accept系统调用的时候会多创建一个新的socket。

fastopen_queue

能看到fastopen_queue这个结构体和request_sock_queue十分相似。因为他的定位就是在三次握手过程中传递数据。

inet_csk_listen_start

文件:/net/ipv4/inet_connection_sock.c

int inet_csk_listen_start(struct sock *sk, const int nr_table_entries)
{
    struct inet_sock *inet = inet_sk(sk);
    struct inet_connection_sock *icsk = inet_csk(sk);
    int rc = reqsk_queue_alloc(&icsk->icsk_accept_queue, nr_table_entries);

    if (rc != 0)
        return rc;

    sk->sk_max_ack_backlog = 0;
    sk->sk_ack_backlog = 0;
    inet_csk_delack_init(sk);

    sk->sk_state = TCP_LISTEN;
    if (!sk->sk_prot->get_port(sk, inet->inet_num)) {
        inet->inet_sport = htons(inet->inet_num);

        sk_dst_reset(sk);
        sk->sk_prot->hash(sk);

        return 0;
    }

    sk->sk_state = TCP_CLOSE;
    __reqsk_queue_destroy(&icsk->icsk_accept_queue);
    return -EADDRINUSE;
}
EXPORT_SYMBOL_GPL(inet_csk_listen_start);

ServerSocket accept

    public Socket accept() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isBound())
            throw new SocketException("Socket is not bound yet");
        Socket s = new Socket((SocketImpl) null);
        implAccept(s);
        return s;
    }
    protected final void implAccept(Socket s) throws IOException {
        SocketImpl si = null;
        try {
            if (s.impl == null)
              s.setImpl();
            else {
                s.impl.reset();
            }
            si = s.impl;
            s.impl = null;
            si.address = new InetAddress();
            si.fd = new FileDescriptor();
            getImpl().accept(si);

            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkAccept(si.getInetAddress().getHostAddress(),
                                     si.getPort());
            }
        } catch (IOException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        } catch (SecurityException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        }
        s.impl = si;
        s.postAccept();
    }

核心就是两个方法:

而SocketImpl是AbstractPlainSocketImpl派生类,而accept的方法是隶属SocketImpl

AbstractPlainSocketImpl accept

/libcore/ojluni/src/main/java/java/net/AbstractPlainSocketImpl.java

    /**
     * Accepts connections.
     * @param s the connection
     */
    protected void accept(SocketImpl s) throws IOException {
        acquireFD();
        try {
            // Android-added: BlockGuard
            BlockGuard.getThreadPolicy().onNetwork();
            socketAccept(s);
        } finally {
            releaseFD();
        }
    }

socketAccept 方法则是由PlainSocketImpl 实现:

/libcore/ojluni/src/main/java/java/net/PlainSocketImpl.java

    void socketAccept(SocketImpl s) throws IOException {
        if (fd == null || !fd.valid()) {
            throw new SocketException("Socket closed");
        }

        // poll() with a timeout of 0 means "poll for zero millis", but a Socket timeout == 0 means
        // "wait forever". When timeout == 0 we pass -1 to poll.
        if (timeout <= 0) {
            IoBridge.poll(fd, POLLIN | POLLERR, -1);
        } else {
            IoBridge.poll(fd, POLLIN | POLLERR, timeout);
        }

        InetSocketAddress peerAddress = new InetSocketAddress();
        try {
            FileDescriptor newfd = Libcore.os.accept(fd, peerAddress);

            s.fd.setInt$(newfd.getInt$());
            s.address = peerAddress.getAddress();
            s.port = peerAddress.getPort();
        } catch (ErrnoException errnoException) {
            if (errnoException.errno == EAGAIN) {
                throw new SocketTimeoutException(errnoException);
            } else if (errnoException.errno == EINVAL || errnoException.errno == EBADF) {
                throw new SocketException("Socket closed");
            }
            errnoException.rethrowAsSocketException();
        }

        s.localport = IoBridge.getLocalInetSocketAddress(s.fd).getPort();
    }

这里面的实现核心如下分为几点

IoBridge.poll

这个方法最终回调用到Libcore.os.poll方法。而这个方法最终调用到一个native方法:

文件:/libcore/luni/src/main/native/libcore_io_Linux.cpp

static jint Linux_poll(JNIEnv* env, jobject, jobjectArray javaStructs, jint timeoutMs) {
    static jfieldID fdFid = env->GetFieldID(JniConstants::structPollfdClass, "fd", "Ljava/io/FileDescriptor;");
    static jfieldID eventsFid = env->GetFieldID(JniConstants::structPollfdClass, "events", "S");
    static jfieldID reventsFid = env->GetFieldID(JniConstants::structPollfdClass, "revents", "S");

    // Turn the Java android.system.StructPollfd[] into a C++ struct pollfd[].
    size_t arrayLength = env->GetArrayLength(javaStructs);
    std::unique_ptr<struct pollfd[]> fds(new struct pollfd[arrayLength]);
    memset(fds.get(), 0, sizeof(struct pollfd) * arrayLength);
    size_t count = 0; // Some trailing array elements may be irrelevant. (See below.)
    for (size_t i = 0; i < arrayLength; ++i) {
        ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
        if (javaStruct.get() == NULL) {
            break; // We allow trailing nulls in the array for caller convenience.
        }
        ScopedLocalRef<jobject> javaFd(env, env->GetObjectField(javaStruct.get(), fdFid));
        if (javaFd.get() == NULL) {
            break; // We also allow callers to just clear the fd field (this is what Selector does).
        }
        fds[count].fd = jniGetFDFromFileDescriptor(env, javaFd.get());
        fds[count].events = env->GetShortField(javaStruct.get(), eventsFid);
        ++count;
    }

    std::vector<AsynchronousCloseMonitor*> monitors;
    for (size_t i = 0; i < count; ++i) {
        monitors.push_back(new AsynchronousCloseMonitor(fds[i].fd));
    }

    int rc;
    while (true) {
        timespec before;
        clock_gettime(CLOCK_MONOTONIC, &before);

        rc = poll(fds.get(), count, timeoutMs);
        if (rc >= 0 || errno != EINTR) {
            break;
        }

        // We got EINTR. Work out how much of the original timeout is still left.
        if (timeoutMs > 0) {
            timespec now;
            clock_gettime(CLOCK_MONOTONIC, &now);

            timespec diff;
            diff.tv_sec = now.tv_sec - before.tv_sec;
            diff.tv_nsec = now.tv_nsec - before.tv_nsec;
            if (diff.tv_nsec < 0) {
                --diff.tv_sec;
                diff.tv_nsec += 1000000000;
            }

            jint diffMs = diff.tv_sec * 1000 + diff.tv_nsec / 1000000;
            if (diffMs >= timeoutMs) {
                rc = 0; // We have less than 1ms left anyway, so just time out.
                break;
            }

            timeoutMs -= diffMs;
        }
    }

    for (size_t i = 0; i < monitors.size(); ++i) {
        delete monitors[i];
    }
    if (rc == -1) {
        throwErrnoException(env, "poll");
        return -1;
    }

    // Update the revents fields in the Java android.system.StructPollfd[].
    for (size_t i = 0; i < count; ++i) {
        ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
        if (javaStruct.get() == NULL) {
            return -1;
        }
        env->SetShortField(javaStruct.get(), reventsFid, fds[i].revents);
    }
    return rc;
}

这里面的逻辑很简单:

在一个死循环中被系统调用poll阻塞起来,而这个过程中会阻塞参数timeoutMs的时间。如果poll阻塞提前结束,但是阻塞的结果不是正常结束还会在这个循环中继续阻塞起来。

Libcore.os.accept

static jobject Linux_accept(JNIEnv* env, jobject, jobject javaFd, jobject javaSocketAddress) {
    sockaddr_storage ss;
    socklen_t sl = sizeof(ss);
    memset(&ss, 0, sizeof(ss));
    sockaddr* peer = (javaSocketAddress != NULL) ? reinterpret_cast<sockaddr*>(&ss) : NULL;
    socklen_t* peerLength = (javaSocketAddress != NULL) ? &sl : 0;
    jint clientFd = NET_FAILURE_RETRY(env, int, accept, javaFd, peer, peerLength);
    if (clientFd == -1 || !fillSocketAddress(env, javaSocketAddress, ss, *peerLength)) {
        close(clientFd);
        return NULL;
    }
    return (clientFd != -1) ? (env, clientFd) : NULL;
}

#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
    return_type _rc = -1; \
    int _syscallErrno; \
    do { \
        bool _wasSignaled; \
        { \
            int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
            AsynchronousCloseMonitor _monitor(_fd); \
            _rc = syscall_name(_fd, __VA_ARGS__); \
            _syscallErrno = errno; \
            _wasSignaled = _monitor.wasSignaled(); \
        } \
        if (_wasSignaled) { \
            jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
            _rc = -1; \
            break; \
        } \
        if (_rc == -1 && _syscallErrno != EINTR) { \
            /* TODO: with a format string we could show the arguments too, like strace(1). */ \
            throwErrnoException(jni_env, # syscall_name); \
            break; \
        } \
    } while (_rc == -1); /* _syscallErrno == EINTR && !_wasSignaled */ \
    if (_rc == -1) { \
        /* If the syscall failed, re-set errno: throwing an exception might have modified it. */ \
        errno = _syscallErrno; \
    } \
    _rc; })

核心就是这个define声明的宏调用了accept系统调用。关于这个宏为什么使用do while方式包裹之前的文章已经聊过了。

accept系统调用

文件:/net/socket.c

SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen, int, flags)
{
    struct socket *sock, *newsock;
    struct file *newfile;
    int err, len, newfd, fput_needed;
    struct sockaddr_storage address;

    if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
        return -EINVAL;

    if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
        flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;

    sock = sockfd_lookup_light(fd, &err, &fput_needed);
...

    err = -ENFILE;
    newsock = sock_alloc();
...

    newsock->type = sock->type;
    newsock->ops = sock->ops;

    __module_get(newsock->ops->owner);

    newfd = get_unused_fd_flags(flags);
...
    newfile = sock_alloc_file(newsock, flags, sock->sk->sk_prot_creator->name);
    if (unlikely(IS_ERR(newfile))) {
        err = PTR_ERR(newfile);
        put_unused_fd(newfd);
        sock_release(newsock);
        goto out_put;
    }

    err = security_socket_accept(sock, newsock);
    if (err)
        goto out_fd;

    err = sock->ops->accept(sock, newsock, sock->file->f_flags);
    if (err < 0)
        goto out_fd;

    if (upeer_sockaddr) {
        if (newsock->ops->getname(newsock, (struct sockaddr *)&address,
                      &len, 2) < 0) {
            err = -ECONNABORTED;
            goto out_fd;
        }
        err = move_addr_to_user(&address,
                    len, upeer_sockaddr, upeer_addrlen);
        if (err < 0)
            goto out_fd;
    }

    /* File flags are not inherited via accept() unlike another OSes. */

    fd_install(newfd, newfile);
    err = newfd;

...
}

首先能看到整个accpet系统调用中,实际上可以存在两个socket,两个socket所对应的fd。

inet_accept

文件: /net/ipv4/af_inet.c

int inet_accept(struct socket *sock, struct socket *newsock, int flags)
{
    struct sock *sk1 = sock->sk;
    int err = -EINVAL;
    struct sock *sk2 = sk1->sk_prot->accept(sk1, flags, &err);

    if (!sk2)
        goto do_err;

    lock_sock(sk2);

    sock_rps_record_flow(sk2);
    WARN_ON(!((1 << sk2->sk_state) &
          (TCPF_ESTABLISHED | TCPF_SYN_RECV |
          TCPF_CLOSE_WAIT | TCPF_CLOSE)));

    sock_graft(sk2, newsock);

    newsock->state = SS_CONNECTED;
    err = 0;
    release_sock(sk2);
do_err:
    return err;
}
EXPORT_SYMBOL(inet_accept);

注意这里的sk1->sk_prot则是指向了proto结构体。在IPV4中就是指tcp_prot.因此这个accept方法指针就是指inet_csk_accept方法。

inet_csk_accept

文件:/net/ipv4/inet_connection_sock.c

struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    struct request_sock_queue *queue = &icsk->icsk_accept_queue;
    struct sock *newsk;
    struct request_sock *req;
    int error;

    lock_sock(sk);

    /* We need to make sure that this socket is listening,
     * and that it has something pending.
     */
    error = -EINVAL;
    if (sk->sk_state != TCP_LISTEN)
        goto out_err;

    /* Find already established connection */
    if (reqsk_queue_empty(queue)) {
        long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);

        /* If this is a non blocking socket don't sleep */
        error = -EAGAIN;
        if (!timeo)
            goto out_err;

        error = inet_csk_wait_for_connect(sk, timeo);
        if (error)
            goto out_err;
    }
    req = reqsk_queue_remove(queue);
    newsk = req->sk;

    sk_acceptq_removed(sk);
    if (sk->sk_protocol == IPPROTO_TCP && queue->fastopenq != NULL) {
        spin_lock_bh(&queue->fastopenq->lock);
        if (tcp_rsk(req)->listener) {
            /* We are still waiting for the final ACK from 3WHS
             * so can't free req now. Instead, we set req->sk to
             * NULL to signify that the child socket is taken
             * so reqsk_fastopen_remove() will free the req
             * when 3WHS finishes (or is aborted).
             */
            req->sk = NULL;
            req = NULL;
        }
        spin_unlock_bh(&queue->fastopenq->lock);
    }
out:
    release_sock(sk);
    if (req)
        __reqsk_free(req);
    return newsk;
out_err:
    newsk = NULL;
    req = NULL;
    *err = error;
    goto out;
}
EXPORT_SYMBOL(inet_csk_accept);

整个代码的核心还是inet_csk_wait_for_connect这个方法。

inet_csk_wait_for_connect

static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    DEFINE_WAIT(wait);
    int err;

    for (;;) {
        prepare_to_wait_exclusive(sk_sleep(sk), &wait,
                      TASK_INTERRUPTIBLE);
        release_sock(sk);
        if (reqsk_queue_empty(&icsk->icsk_accept_queue))
            timeo = schedule_timeout(timeo);
        lock_sock(sk);
        err = 0;
        if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
            break;
        err = -EINVAL;
        if (sk->sk_state != TCP_LISTEN)
            break;
        err = sock_intr_errno(timeo);
        if (signal_pending(current))
            break;
        err = -EAGAIN;
        if (!timeo)
            break;
    }
    finish_wait(sk_sleep(sk), &wait);
    return err;
}

能看到这个过程实际上就是通过一个死循环阻塞整个accpet的执行流程。注意在这个过程中,将不会对已经建立连接的继续使用poll等方式阻塞。而是先通过prepare_to_wait_exclusive把当前进程设置为 TASK_INTERRUPTIBLE 状态并把当前进程加入到等待队列中。

如果此时为icsk_accept_queue队列为空,则会通过schedule_timeout不断的让渡CPU给其他进程直到超时。

当唤醒CPU后,就会再度检测icsk_accept_queue是否为空,不为空则返回,并且检测当前的sock的状态如果已经不是Listen监听状态也会直接返回。通过signal_pending检测是否有需要处理的信号·。

到这里就是完成了accpet的逻辑了。但是三次握手的逻辑呢?这部分的逻辑实际是由connect系统调用实现的。

小结

从这里能看到整个socket的接口api设计规范,实际上还是很整齐的,在整个socket结构体中可以分为两大类操作结构体:

熟悉这套流程后,以后阅读源码就能直接找到对应的方法。

对于服务端来说,除了创建一个socket,执行如下步骤:

bind

服务端每次一次启动socket服务,都需要绑定一个端口。当然这个端口可以在java层设置为0,前提是你要在socket初始化的时候设置好端口port号。这样才能在socket关联的fd句柄中找到端口号并在bind方法中保存下来。

而这个过程中涉及了2个比较核心的数据结构:

struct inet_sock {
    /* sk and pinet6 has to be the first two members of inet_sock */
    struct sock     sk;
#if IS_ENABLED(CONFIG_IPV6)
    struct ipv6_pinfo   *pinet6; //ipv6 的信息
#endif
    /* Socket demultiplex comparisons on incoming packets. */
#define inet_daddr      sk.__sk_common.skc_daddr //外部ip地址
#define inet_rcv_saddr      sk.__sk_common.skc_rcv_saddr //绑定的本地ip地址
#define inet_dport      sk.__sk_common.skc_dport //目标地址端口
#define inet_num        sk.__sk_common.skc_num // 本地绑定的端口

    __be32          inet_saddr; // 发送的源地址
    __s16           uc_ttl; //单播允许存活时间
    __u16           cmsg_flags;
    __be16          inet_sport; // 来源端口
    __u16           inet_id; // DF packages id

    struct ip_options_rcu __rcu *inet_opt;
    int         rx_dst_ifindex;
    __u8            tos; //TOS 4 bit的TOS分别代表:最小时延、最大吞吐量、最高可靠性和最小费用
    __u8            min_ttl;
    __u8            mc_ttl; //多播允许存活时间
    __u8            pmtudisc;
    __u8            recverr:1,
                is_icsk:1, //是否是inet_connection_sock
                freebind:1,
                hdrincl:1,
                mc_loop:1,
                transparent:1,
                mc_all:1,
                nodefrag:1;
    __u8            rcv_tos;
    int         uc_index; //单播设备id
    int         mc_index;//多播设备id
    __be32          mc_addr;
    struct ip_mc_socklist __rcu *mc_list;
    struct inet_cork_full   cork;
};

熟悉这个数据结构后来看看,整个bind的核心事件:

sockaddrsin_port获取·到端口号,sk->sk_prot->get_port把端口snum设置到inet_sockinet_num中,并校验当前的端口号是否小于1024,因为只有超级用户才能使用小于1024的端口号,如果发现非法则绑定失败。最后绑定到inet_dport中。

listen

这个过程做了三件事情:

accept

到这里bind,listen,accept的准备工作完成了,就等待客户端connect进行三次握手联通服务器。

上一篇下一篇

猜你喜欢

热点阅读