11、看!源码之EventExecutor定义与Abstract

2019-05-20  本文已影响0人  starskye

EventExecutor定义与AbstractEventExecutor实现

//EventExecutor是对执行器的定义,此定义是对EventExecutorGroup的一个扩展从而是一个特殊的group但是他并不是executor的group而是Thread的group。
public interface EventExecutor extends EventExecutorGroup {
    //在group中我们讲解了next它采用了executor的选择器来返回next,而此处的next是对group的重写代表的意义则是返回本身executor也就是this。
    @Override
    EventExecutor next();
    //在创建executor的时候会将group传入到executor中而此方法就是为了方便获取当前group所提供的,具体等讲实现的时候将会详细介绍
    EventExecutorGroup parent();
    //如果当前执行executor是当前线程则返回true否则是false
    //判断当前线程是否是执行线程
    boolean inEventLoop();
    //可以说上方方法是此方法的重载,因为此处可以指定一个线程判断执行线程是否是传入的线程
    //而上方方法则默认传入了当前线程
    boolean inEventLoop(Thread thread);
    //创建一个应答,应答的概念在前文已经详细说明过
    <V> Promise<V> newPromise();
    //创建一个进度应答,可以对程序的执行进度进行监听,在前文future中也详细介绍过此处不再做详细说明
    <V> ProgressivePromise<V> newProgressivePromise();

    //创建一个成功的future,创建完成的同时
    //所有阻塞等待结果的方法都会被唤醒
    //所有监听器都会接收到通知
    //isSuccess会返回true
    <V> Future<V> newSucceededFuture(V result);
    //创建一个失败future,创建完成的同时
    //所有阻塞等待结果的方法都会被唤醒
    //所有监听器都会接收到通知
    //isSuccess会返回false
    <V> Future<V> newFailedFuture(Throwable cause);
}

下面是他的实现AbstractEventExecutor

//从申明可以看出他是继承与AbstractExecutorService而此类在ExecutorService的文章集中有详细的介绍
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
    //默认的关闭前的时间具体描述将会结合使用时进行讲述
    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    //默认的关闭超时时间
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
    //当前执行器所属的执行组
    private final EventExecutorGroup parent;
    //之前说过EventExecutor也是继承与Group所以他里面也有迭代的实现而为了切合他的迭代方法此处采用了只有一个元素的集合实现
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

    protected AbstractEventExecutor() {
        this(null);
    }

    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }

    @Override
    public EventExecutorGroup parent() {
        return parent;
    }

    @Override
    public EventExecutor next() {
        return this;
    }
    //之前说过inEvent的实现就是调用了重载的inEvent默认传入的是当前线程
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }
    //默认的迭代器
    @Override
    public Iterator<EventExecutor> iterator() {
        return selfCollection.iterator();
    }
    //刚才说的两个默认值使用地方就是此处设置默认的超时时间和超时前时间,这两个时间的使用会在具体实现出讲解
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
    //已经被废弃建议使用shutdownGracefully
    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

    @Override
    public <V> Promise<V> newPromise() {
        return new DefaultPromise<V>(this);
    }

    @Override
    public <V> ProgressivePromise<V> newProgressivePromise() {
        return new DefaultProgressivePromise<V>(this);
    }

    @Override
    public <V> Future<V> newSucceededFuture(V result) {
        return new SucceededFuture<V>(this, result);
    }

    @Override
    public <V> Future<V> newFailedFuture(Throwable cause) {
        return new FailedFuture<V>(this, cause);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return (Future<?>) super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return (Future<T>) super.submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return (Future<T>) super.submit(task);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PromiseTask<T>(this, runnable, value);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PromiseTask<T>(this, callable);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay,
                                       TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    //安全运行任务说白了就是try了所有的异常
    //剩下的方法并没有什么可以讲的因为都基本是默认的实现就是创建内容什么的所以此处跳过,如果有疑问可以提问题一起讨论
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读