模拟RxJava的实现原理

2019-10-14  本文已影响0人  vpractical

[TOC]

GitHub代码地址

使用

Observable
               .create(new ObservableOnSubscribe<List<User>>() {
                   @Override
                   public void subscribe(Emitter<List<User>> emitter) {
                       emitter.next(list);
                       emitter.complete();
                       log("create : " + Thread.currentThread().getName());
                   }
               })
               .map(new Function<List<User>, List<User>>() {
                   @Override
                   public List<User> apply(List<User> users) {
                       for (User u : users) {
                           u.age = 3;
                       }
                       return users;
                   }
               })
               .subscribeOn(ThreadScheduler.IO)
               .observerOn(ThreadScheduler.MAIN)
               .subscribe(new Observer<List<User>>() {
                   @Override
                   public void onSubscribe(Disposable disposable) {
                       log("onSubscribe()");
                       log("onSubscribe() : " + Thread.currentThread().getName());
                   }

                   @Override
                   public void onNext(List<User> val) {
                       log("onNext(): " + val.toString());
                       log("onNext() : " + Thread.currentThread().getName());
                   }

                   @Override
                   public void onError(Throwable t) {
                       log("onError(): " + t.toString());
                   }

                   @Override
                   public void onComplete() {
                       log("onComplete()");
                   }
               });

实现

1.被观察者抽象类

public abstract class Observable<T> {

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source){
        return new ObservableCreate<>(source);
    }
    public void subscribe(Observer<? super T> observer){
        subscribeImpl(observer);
    }
    public <R> Observable<R> map(Function<? super T,? extends R> function){
        return new ObservableMap<>(this,function);
    }
    public Observable<T> subscribeOn(ThreadScheduler scheduler){
        return new ObservableSubscribeOn<>(this,scheduler);
    }
    public Observable<T> observerOn(ThreadScheduler scheduler){
        return new ObservableObserverOn<>(this,scheduler);
    }
    protected abstract void subscribeImpl(Observer<? super T> observer);
}

2.create操作符:返回这一层的被观察者ObservableCreate
rxjava 有3个主要构成

public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter) throws Exception;
}

public interface Emitter<T> {
    void next(T val);
    void error(Throwable t);
    void complete();
}

参数是内容分发者接口对象,用户实现该接口后,源码调用订阅方法subscribe(),并创建发射器emitter,用户使用emitter分发数据。

/**
 * create操作符对应的被观察者
 */
class ObservableCreate<T> extends Observable<T> {
    private ObservableOnSubscribe<T> source;

    ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeImpl(Observer<? super T> observer) {
        EmitterCreate<T> emitter = new EmitterCreate<>(observer);
        observer.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        }catch (Exception e){
            e.printStackTrace();
            observer.onError(e);
        }
    }
    static final class EmitterCreate<T> implements Emitter<T>, Disposable {
        private Observer<? super T> observer;
        private boolean isDisposable;
        EmitterCreate(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void disposable() {
            isDisposable = true;
        }
        @Override
        public boolean isDisposable() {
            return isDisposable;
        }
        @Override
        public void next(T val) {
            if(isDisposable) return;
            observer.onNext(val);
        }
        @Override
        public void error(Throwable t) {
            if(isDisposable) return;
            observer.onError(t);
        }
        @Override
        public void complete() {
            if(isDisposable) return;
            observer.onComplete();
        }
    }
}

3.map操作符

class ObservableMap<T, R> extends Observable<R> {
    private Observable<T> observable;
    private Function<? super T, ? extends R> function;
    ObservableMap(Observable<T> observable, Function<? super T, ? extends R> function) {
        this.observable = observable;
        this.function = function;
    }
    @Override
    protected void subscribeImpl(Observer<? super R> observer) {
        observable.subscribe(new MapObserver<>(observer,function));
    }
    private static final class MapObserver<T,R> extends Basic2Observer<T,R> {
        Function<? super T,? extends R> function;
        MapObserver(Observer<? super R> observer,Function<? super T,? extends R> function){
            super(observer);
            this.function = function;
        }
        @Override
        public void onNext(T val) {
            R r = function.apply(val);
            observer.onNext(r);
        }
    }
}
public interface Function<T,R> {
    R apply(T t);
}

4.subscribeOn操作符:

class ObservableSubscribeOn<T> extends Observable<T>{
    private Observable<T> observable;
    private ThreadScheduler scheduler;
    ObservableSubscribeOn(Observable<T> observable, ThreadScheduler scheduler){
        this.observable = observable;
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeImpl(final Observer<? super T> observer) {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                BasicObserver<T> subscribeOnObserver = new BasicObserver<>(observer);
                observable.subscribe(subscribeOnObserver);
            }
        };
        TaskScheduler.run(r,scheduler);
    }
}
public class TaskScheduler {
    private static final Handler HANDLER = new Handler(Looper.getMainLooper());
    private static final ExecutorService SERVICE = Executors.newCachedThreadPool();

    public static void run(Runnable r,ThreadScheduler scheduler){
        boolean isMainThread = Looper.myLooper() == Looper.getMainLooper();
        if(scheduler == ThreadScheduler.DEFAULT){
            r.run();
        }else if(scheduler == ThreadScheduler.MAIN){
            if(isMainThread){
                r.run();
            }else{
                HANDLER.post(r);
            }
        }else{
            SERVICE.submit(r);
        }
    }
}

5.后续的操作符都是同样的道理,在最下层只有一个观察者,他在调用上层的订阅方法时,上层会先回调他的onSubscribe(),参数是Disposable ,由源码实现,当上层回调时,如果他调用Disposable的disposable()方法,上层会中断事件传递

public interface Disposable {
    void disposable();
    boolean isDisposable();
}
上一篇下一篇

猜你喜欢

热点阅读