程序猿之路

NIO源码分析(三)

2017-05-03  本文已影响301人  三斤牛肉

解释下Channel中accept,read,write中的begin/end函数做了什么

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (!open)
                            return;
                        open = false;
                        interrupted = target;
                        try {
//这里关闭了当前的channel
                          AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);//如果当前线程被中断,则优雅的关闭channel
}

protected final void end(boolean completed)
    throws AsynchronousCloseException
{
    blockedOn(null);
    Thread interrupted = this.interrupted;
//如果当前线程被中断,则抛出中断关闭异常,即当前线程被其他线程调用interrupt中断
    if (interrupted != null && interrupted == Thread.currentThread()) {
        interrupted = null;
        throw new ClosedByInterruptException();
    }
//如果通讯未完成且channel已经关闭,抛出异步关闭异常,即当前channel被其他线程调用close的时候
    if (!completed && !open)
        throw new AsynchronousCloseException();
}

单步跟踪blockedOn()函数可以得到
其实等价于
Thread.currentThread().blockedOn(interruptor);
由于blockedOn这个函数是protected的,所以需要绕一大圈

private volatile Interruptible blocker;  

void blockedOn(Interruptible b) {
    synchronized (blockerLock) {
        blocker = b;
    }
}

这个blocker又干了啥呢,搜索Thead类全文:

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);//看到了吗,当线程执行interrupt的时候就会去执行上面的interruptor类了
            return;
        }
    }
    interrupt0();
}

所以总结下begin的作用是,给当前的thread设置一个blocker使之可以在线程被中断时优雅的关闭channel,并且最后检测一遍currentThread是否被中断
end的作用是取消begin设置的blocker,并对线程中断/channel关闭等状态抛出相应异常


同样在SelectorImpl中也有begin/end对

protected int doSelect(long timeout)
    throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    ...
}

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread ignore) {
                    AbstractSelector.this.wakeup();//这里是唤醒当前selector
                }};
    }
    AbstractInterruptibleChannel.blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

end就不贴了,是一样的
wakeup做了什么?

public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {//interruptTriggered默认是false的,当close的时候置为true(具体看代码)
            pollWrapper.interrupt();//这里其实调用了native的interrupt(outgoingInterruptFD)方法
//这里中断了write的fd
            interruptTriggered = true;
        }
    }
    return this;
}

具体看pollWrapper.interrupt()怎么作用的

EPollSelectorImpl(SelectorProvider sp) {
    super(sp);
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    pollWrapper = new EPollArrayWrapper();
  //其实在selector构造函数的时候就已经初始化interrupt
    pollWrapper.initInterrupt(fd0, fd1);
    fdToKey = new HashMap<Integer,SelectionKeyImpl>();
}

EPollArrayWrapper:
void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;//写通道
    incomingInterruptFD = fd0;//读通道
    epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);//添加fd0兴趣事件为可读(EPOLLIN)
}

查看EPollArrayWrapper.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
   int fakebuf[1];
    fakebuf[0] = 1;
//可以看到这里往fd1写入了一个字节,使写通道fd0变成readable状态,selector因为有事件就绪而中止阻塞(上面的epollCtl已经注册了fd0的read事件)
    if (write(fd, fakebuf, 1) < 0) {
        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
    }
}

总结selectorImpl的begin同样也设置了一个blocker,
当this.selector.select()阻塞时,要怎么中断呢,
selector在初始化的时候会先给自己分配2个用于中断的fd,一个读,一个写,然后给读fd注册一个感兴趣的读事件,这样在中断时只要给写fd写入一个字节,那么读事件就会感知到,使得selector被唤醒。

上一篇下一篇

猜你喜欢

热点阅读