Netty

netty源码分析 - EventLoop类关系

2020-03-31  本文已影响0人  晴天哥_王志

系列

Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题

开篇

NioEventLoop

NioEventLoop

AbstractExecutorService

AbstractExecutorService

AbstractEventExecutor

AbstractEventExecutor

AbstractScheduledEventExecutor

AbstractScheduledEventExecutor

SingleThreadEventExecutor

SingleThreadEventExecutor

SingleThreadEventLoop

image.png

NioEventLoop

NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }
}



public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
    }
}


public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private final Queue<Runnable> taskQueue;
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}

NioEventLoopGroup

NioEventLoopGroup

EventExecutorGroup

EventExecutorGroup

AbstractEventExecutorGroup

AbstractEventExecutorGroup

MultithreadEventExecutorGroup

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // NioEvengLoop的选择器chooser
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;


    public EventExecutor next() {
        return chooser.next();
    }
}
MultithreadEventExecutorGroup

MultithreadEventLoopGroup

MultithreadEventLoopGroup
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    public EventExecutor next() {
        return chooser.next();
    }
}

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

EventLoop注册channel流程

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            // 省略代码
        }
        // channle绑定eventLoop对象
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
}


public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    public EventExecutor next() {
        return chooser.next();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读