程序猿之路

NIO源码分析(一)

2017-04-26  本文已影响450人  三斤牛肉

先写个简单的例子

// 开启一个server channel来监听
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("localhost",65535));//绑定相应IP及port
//ssc.bind(new InetSocketAddress("localhost",65535));
Selector selector = Selector.open();//开启一个selector
ssc.register(selector,SelectionKey.OP_ACCEPT);//绑定注册事件

while(true){
this.selector.select();//阻塞,只有当至少一个注册的事件发生的时候才会继续.
Set<SelectionKey> selectKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    // 处理事件. 可以用多线程来处理.
    this.dispatch(key);
  }
} 

OK,我们一句一句来看

第一句

ServerSocketChannel ssc = ServerSocketChannel.open();
public static ServerSocketChannel open() throws IOException {
    return SelectorProvider.provider().openServerSocketChannel();
}

这里会根据操作系统得到不同的provider,由于用的是mac,而且一般服务器以linux为主,所以这边返回的是EPollSelectorProvider

进一步SelectorProviderImpl中:

 public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);//这里面可以不看,就是SelectorProvider的赋值
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);//这2句都是native操作,没有看底层的jvm源码,看函数名应该是新建FD并进行赋值
    this.state = ST_INUSE;
    }

第二句

ssc.configureBlocking(false);

这句不展开了,看名字就知道开启非阻塞模式
底层调用了

static native void configureBlocking(FileDescriptor fd, boolean blocking)
    throws IOException;

第三句

ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("localhost",65535));

看ServerSocketChannelImpl中:

public ServerSocket socket() {
    synchronized (stateLock) {
        if (socket == null)
            socket = ServerSocketAdaptor.create(this);//这里只是新建了一个适配器,本质上还是ssc
        return socket;
    }
 }

具体绑定过程:

public void bind(SocketAddress local, int backlog) throws IOException {
    if (local == null)
        local = new InetSocketAddress(0);
    try {
        ssc.bind(local, backlog);//如上所述,本质上还是ssc
    } catch (Exception x) {
        Net.translateException(x);
    }
}

所以我试了下

ssc.bind(new InetSocketAddress("localhost",65535));

也是可以的

第四句

Selector selector = Selector.open();//开启一个selector

一步一步看:

 public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();//这里返回的是EPollSelectorImpl
}

在EPollSelectorProvider中:

public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorImpl(this);
}

第五句

ssc.register(selector, SelectionKey.OP_ACCEPT);

这里一步步跟下去,最终是在AbstractSelectableChannel:

public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    if (!isOpen())
        throw new ClosedChannelException();
    if ((ops & ~validOps()) != 0)//下面的代码块中注明,这里只支出accept行为的注册
        throw new IllegalArgumentException();
    synchronized (regLock) {
        if (blocking)
            throw new IllegalBlockingModeException();
        SelectionKey k = findKey(sel);//查找sel是否已经绑定对应的key了,具体代码这里不贴
        if (k != null) {
            k.interestOps(ops);//这句后面再讲,有多处调用到
            k.attach(att);//这里的attachment为null,一般可以用于保存session等作为有状态通讯
        }
        if (k == null) {
            // New registration
            k = ((AbstractSelector)sel).register(this, ops, att);//这句在下面解释
            addKey(k);//这句就是加到保存的SelectionKey[]全局变量
        }
        return k;
    }
}

ServerSocketChannel中:

/*Server-socket channels only support the accepting of new
 * connections, so this method returns {@link SelectionKey#OP_ACCEPT}.*/
public final int validOps() {
    return SelectionKey.OP_ACCEPT;
}

SelectorImpl:

protected final SelectionKey register(AbstractSelectableChannel ch,
                                      int ops,
                                      Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);//新建key,这个ch就是最初的ssc
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);//主要是这个
    }
    k.interestOps(ops);
    return k;
}

再看EPollSelectorImpl:

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
    pollWrapper.add(ch); //这里要见最底下的图, 核心代码updateList.add(new Updator(channel, EPOLL_CTL_ADD));这个码updateList在下面interestOps会用到
    keys.add(ski);
}

再来说多处用到的k.interestOps(ops);
SelectionKeyImpl:

SelectionKey nioInterestOps(int ops) {      // package-private
    if ((ops & ~channel().validOps()) != 0)
        throw new IllegalArgumentException();
    channel.translateAndSetInterestOps(ops, this);
//这句
    interestOps = ops;
    return this;
}

SocketChannelImpl,这里讲SelectionKey对应的操作转为EPOLL对应的事件类型,具体可以查看EPOLL相关文档:

public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
    int newOps = 0;
    if ((ops & SelectionKey.OP_READ) != 0)
        newOps |= PollArrayWrapper.POLLIN;
    if ((ops & SelectionKey.OP_WRITE) != 0)
        newOps |= PollArrayWrapper.POLLOUT;
    if ((ops & SelectionKey.OP_CONNECT) != 0)
        newOps |= PollArrayWrapper.POLLCONN;
    sk.selector.putEventOps(sk, newOps);
}

跟踪下去EPollSelectorImpl,中间代码不贴了:

void putEventOps(SelectionKeyImpl sk, int ops) {
    if (closed)
        throw new ClosedSelectorException();
    pollWrapper.setInterest(sk.channel, ops);
}

继续EPollArrayWrapper,这里将注册事件及对应的channel保存到一个updateList中:

void setInterest(SelChImpl channel, int mask) {
    synchronized (updateList) {
        // if the previous pending operation is to add this file descriptor
        // to epoll then update its event set
        if (updateList.size() > 0) {
            Updator last = updateList.getLast();//防止已经加过了的channel直接更新事件
            if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
                last.events = mask;
                return;
            }
        }

        // update existing registration
        updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
    }
}

到这里为止其实初始化的工作都做好了,接下来

第六句

this.selector.select();

还是去掉中间代码,直接看核心的EPollSelectImpl:

protected int doSelect(long timeout) throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();//处理已经取消的key
    try {
        begin();
        pollWrapper.poll(timeout);//核心代码看下面
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys(); //核心代码,见下面
    if (pollWrapper.interrupted()) { //中断处理,暂时不看
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

EPollArrayWrapper:

int poll(long timeout) throws IOException {
    updateRegistrations();  //核心代码更新前面updateList中的操作到native
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); //核心代码,执行native的epollWait方法,等待注册事件上来
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

//这个函数就是遍历前缓存的所有注册时间的updateList,去掉闲置的,执行native的epollCtl方法
void updateRegistrations() {
    synchronized (updateList) {
        Updator u = null;
        while ((u = updateList.poll()) != null) {
            SelChImpl ch = u.channel;
            if (!ch.isOpen())
                continue;

            // if the events are 0 then file descriptor is put into "idle
            // set" to prevent it being polled
            if (u.events == 0) {
                boolean added = idleSet.add(u.channel);
                // if added to idle set then remove from epoll if registered
                if (added && (u.opcode == EPOLL_CTL_MOD)) //如果是闲置的,删除
                    epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
            } else {
                // events are specified. If file descriptor was in idle set
                // it must be re-registered (by converting opcode to ADD)
                boolean idle = false;
                if (!idleSet.isEmpty())
                    idle = idleSet.remove(u.channel);
                int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
                epollCtl(epfd, opcode, ch.getFDVal(), u.events);
            }
        }
    }
}

//更新SelectedKeys给后面的this.selector.selectedKeys();使用
private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        int nextFD = pollWrapper.getDescriptor(i);
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                ski.channel.translateAndSetReadyOps(rOps, ski);//这句处理返回的ops,过滤掉异常的操作等,可以结合IO模式中的epoll_ctl事件类型查看
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}

最后:

Set<SelectionKey> selectKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    // 处理事件. 可以用多线程来处理.
    this.dispatch(key);
  }
} 

这里就很简单了,selectedKeys()已经在上一步处理好了,这里只是拿出来做相应的业务操作

这样一个服务端的启动操作就完成了,后面还要注册SocketChannel的读写事件,就到以后再分析。
Epoll相关内容以前写过,就不写了。

整个epoll的类关系图:

epoll.png
上一篇下一篇

猜你喜欢

热点阅读