android

RxJava mini

2018-07-19  本文已影响70人  Dsiner

背景

文章通过接口层表象来实现一个简版且稳定的Rxjava mini。给予一个台阶,当你读完文章的末尾,希望你有一探RxJava欲望与信心。

目标

  1. TaskScheduler.executeMain(...); //主线程, 执行任务
  2. TaskScheduler.executeTask(...); //子线程, 线程池执行任务
  3. TaskScheduler.executeSingle(...); //子线程, 单线程执行任务
  4. TaskScheduler.create(...); //任务调度

项目

设计

  1. .func(...).func(...).func(...)...顺序流执行
  2. .observeOn(...)线程切换

效果图

        TaskScheduler.create(new Task<List<String>>() {
            @Override
            public List<String> run() {
                ...do something in io thread
                return new ArrayList<>();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<List<String>, String>() {
                    @Override
                    public String apply(@NonNull List<String> strings) throws Exception {
                        ...do something in new thread, such as time-consuming, map conversion, etc.
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, Boolean>() {
                    @Override
                    public Boolean apply(@NonNull String s) throws Exception {
                        ...do something in io thread, such as time-consuming, map conversion, etc.
                        return true;
                    }
                })
                ...
                .observeOn(Schedulers.mainThread())
                .subscribe(new Observer<Boolean>() {
                    @Override
                    public void onNext(@NonNull Boolean result) {
                        ...do something in main thread
                    }

                    @Override
                    public void onError(Throwable e) {
                        ...do something in main thread
                    }
                });

分析

  1. 线程
  2. 线程切换
  3. 任务调度

1. 线程

public class TaskManager {
    private static TaskManager ins;

    private Handler mainHandler;
    private ExecutorService cachedThreadPool;
    private ExecutorService singleThreadExecutor;

    private TaskManager() {
        mainHandler = new Handler(Looper.getMainLooper());
        cachedThreadPool = Executors.newCachedThreadPool();
        singleThreadExecutor = Executors.newSingleThreadExecutor();
    }

    static TaskManager getIns() {
        if (ins == null) {
            synchronized (TaskManager.class) {
                if (ins == null) {
                    ins = new TaskManager();
                }
            }
        }
        return ins;
    }

    /**
     * Execute sync task in main thread
     */
    void executeMain(Runnable runnable) { mainHandler.post(runnable); }

    /**
     * Execute async task in cached thread pool
     */
    void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }

    /**
     * Execute async task in single thread pool
     */
    void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }

    /**
     * Execute async task in a new thread
     */
    void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}

线程切换的方法:抛runnable到相应线程,由线程来调度执行runnable,runnable中的方法即在相应线程中执行。
如无这样的显式切换线程,代码流(无论多少次方法递归调用)将在当前线程一直执行下去。同一线程,代码总是顺序的执行。

Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());

通过这行代码可以打印出当前在那一个线程。主线程的getName是main。

        new Thread(() -> {
                // Code block 1
                Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                ...
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        // Code block 2
                        Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                        ...
                    }
                });
        }).start();

这是一个通常的代码形式
Code block 1处在一个子线程中执行代码,通过new Handler(Looper.getMainLooper()).post(...)向主线程抛入一个runnable,runnable进入主线程消息队列,然后等主线程消息队列取出该runnable执行时,Code line 2处代码即在主线程中执行。
Code block 1与Code block 2在时间上并行执行。线程池同理。

public class TaskScheduler<T> {
    public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }

    public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }

    public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }

    ...
}

通过单例简单包装,实现目标1、2、3

2. 线程切换

    /**
     * Switch thread
     * scheduler 线程枚举,int类型: defaultThread、newThread、io、mainThread
     */
    public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
        if (scheduler == NEW_THREAD) {
            new Thread(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            }).start();
            return;
        } else if (scheduler == IO) {
            TaskScheduler.executeTask(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            });
            return;
        } else if (scheduler == MAIN_THREAD) {
            if (!isMainThread()) {
                TaskScheduler.executeMain(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                });
                return;
            }
        }
        if (runnable != null) {
            runnable.run();
        }
    }

    public static boolean isMainThread() {
        return Looper.getMainLooper().getThread() == Thread.currentThread();
    }

3. 任务调度

3.1 开始前的准备

我们先来定义3个接口

interface.png

然后是2个对应的包装类,后面会用到

Task -> TaskEmitter
Function -> FunctionEmitter

public class Emitter {
    public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
    public Task<T> task;

    public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
        this.task = task;
        this.scheduler = scheduler;
    }
}
public class FunctionEmitter<T, R> extends Emitter {
    public Function<? super T, ? extends R> function;

    public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
        this.function = function;
        this.scheduler = scheduler;
    }
}

3.2 Create

开始前,我们知道一些开源库如Glide,惯用.with(...)形式,这种方式实质:静态方法 + return new Instance(),
这里我们也用这种模式来开始create(...)。

实现分三步走

Step 1: Create

    public static <T> TaskScheduler<T> create(final Task<T> task) {
        TaskScheduler<T> schedulers = new TaskScheduler<T>();
        schedulers.task = task;
        return schedulers;
    }

创建TaskScheduler实例,持有 源任务task

    public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
        this.subscribeScheduler = scheduler;
        return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
    }

指定 源任务task 执行所在线程,丢弃当前TaskScheduler实例。
源任务task线程枚举 注入TaskEmitter后,返回新的实例TaskObserve,后续逻辑全由TaskObserve处理

Step 2: TaskObserve中间件

public static class TaskObserve<T> {
        private TaskEmitter taskEmitter;
        private List<FunctionEmitter> emitters;
        private int observeOnScheduler = Schedulers.defaultThread();

        TaskObserve(TaskEmitter<T> taskEmitter) {
            this.taskEmitter = taskEmitter;
            this.emitters = new ArrayList<>();
        }

        ...
}

TaskObserve: 中间件,初始和map转换时生成,包含以下成员
taskEmitter: 源任务
emitters: 转换队列,map转换时递增
observeOnScheduler: 线程枚举,observeOn观察者所在线程,可重复调用,当然只保留最后一次指定的线程

        TaskObserve(TaskObserve middle) {
            this.taskEmitter = middle.taskEmitter;
            this.observeOnScheduler = middle.observeOnScheduler;
            this.emitters = middle.emitters;
        }

        public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
            this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
            return new TaskObserve<TR>(this);
        }

map转换时,将 转换体Function 、当前 线程枚举 observeOnScheduler注入 FunctionEmitter ,添加到 转换队列
返回新的实例TaskObserve,丢弃当前TaskObserve实例,新实例线程枚举observeOnScheduler默认为默认线程

Step 3: Subscribe,才是开始!!!

核心思想

  1. 先执行 源任务 ,返回值
  2. 递归从 转换队列 取出 FunctionEmitter (含有转换体、线程枚举),Schedulers.switchThread(...)指定线程执行,转换返回值
  3. 转换队列 执行尽,提交任务,任务结束
        public void subscribe(final Observer<T> callback) {
            // 指定源任务线程枚举
            Schedulers.switchThread(taskEmitter.scheduler, () -> {
                    try {
                        // 执行源任务
                        Object t = taskEmitter.task.run();
                        // 转换队列是否为空
                        if (assertInterrupt(t)) {
                            // 转换队列空,提交本次任务,任务结束
                            submit(t, callback);
                            return;
                        }
                        // 转换队列不为空,继续转换
                        apply(t, emitters, callback);
                    } catch (Throwable e) {
                        // 任务流抛出异常,即时中断,任务结束
                        error(e, callback);
                    }
            });
        }
        private boolean assertInterrupt(Object emitter) throws Exception {
            if (emitter == null) {
                // 转换返回值,不能为Null!!!
                throw new RuntimeException("Apply output must not be null!");
            }
            return emitters.size() <= 0;
        }

assertInterrupt判断当前转换队列,是否执行尽了

Step 3 - 1: Apply转换队列转换

        private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
            // 依次从转换队列取出FunctionEmitter,然后移除
            final FunctionEmitter<E, F> f = emitters.get(0);
            emitters.remove(f);
            // 指定当前转换线程枚举
            Schedulers.switchThread(f.scheduler, () -> {
                    try {
                        // 转换,返回转换值
                        Object emitter = f.function.apply(o);
                        // 转换队列是否为空
                        if (assertInterrupt(emitter)) {
                            // 转换队列空,提交本次任务,任务结束
                            submit(emitter, callback);
                            return;
                        }
                        // 转换队列不为空,继续转换
                        apply(emitter, emitters, callback);
                    } catch (Throwable e) {
                        // 任务流抛出异常,即时中断,任务结束
                        error(e, callback);
                    }
            });
        }

Step 3 - 2: Submit提交

        private <S> void submit(final Object result, final Observer<S> callback) {
            // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    try {
                        if (callback != null) {
                            // 成功,任务结束
                            callback.onNext((S) result);
                        }
                    } catch (Throwable e) {
                        error(e, callback);
                    }
            });
        }

        private <S> void error(final Throwable e, final Observer<S> callback) {
            // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    if (callback != null) {
                        // 出错,任务结束
                        callback.onError(e);
                    }
            });
        }

小结:

泛型: java泛型属于类型擦除,无论T、F还是R...,最终都是Object,所以我们可以不用泛型,用Object。
设计: 这里的 任务流 实现方式为递归嵌套调用。

上一篇下一篇

猜你喜欢

热点阅读