Android开发Retrofit RXjava 网络框架Android开发

RxJava源码简单分析

2018-03-29  本文已影响140人  猪_队友

UML图

image.png

我们应该都学习过观察者模式吧,如果没学过先百度一把,然后再来看这一篇,
现在越来越多的人开始用Rxjava,面试也很多问会不会Rxjava,已然是一项必会的技能了。

RxJava的优势

一个词:异步。
一句话:同样是异步,RxJava更加简洁。(不是代码上的简洁,而是逻辑上的简洁)

小栗子

//观察者 接口
        Observer<String> observable = new Observer<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        };

        //实现observer 的抽象类 基本用法差不多 在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用
        final Subscriber<String> stringSubscriber = new Subscriber<String>() {
            StringBuffer sb = new StringBuffer();

            @Override
            public void onCompleted() {
                textView.setText(sb.toString());
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                sb.append(s + "\n");
            }

            @Override
            public void onStart() {
                super.onStart();
            }

        };

        final Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("你好啊");
                subscriber.onNext("这里是北京");
                subscriber.onNext("欢迎来这里");
                subscriber.onCompleted();
            }
        });

     observable.subscribe(stringSubscriber);
 observable3.subscribe(observable);

so简单的一个小栗子
stringSubscriber是观察者(订阅者),observable1是被观察者。通过subscribe方法就订阅成功了。貌似没啥软用啊,不就一个拿着另一个对象的引用吗?这个大家都会啊。是滴 观察者模式就是这样的。这是最基本的,开始加点作料了哈

1、创建Observable 三个方法

//第一个
 final Observable observable1 = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("你好啊");
                subscriber.onNext("这里是北京");
                subscriber.onNext("你好啊");
                subscriber.onCompleted();
            }
        });
//第二个
  String[] strs = new String[]{"1", "2", "3"};
        final Observable observable2 = Observable.from(strs);
//第三个
 final Observable observable3 = Observable.just("1", "3", "5");

其实他们压根就是一个方法,第二个和第三个只是为了方便,最后都是调用的第一个方法。

//以just方法为例
  public final static <T> Observable<T> just(T t1, T t2, T t3) {
//我擦,调用的是from方法啊
        return from(Arrays.asList(t1, t2, t3));
    }
//from方法
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
//调用的是create方法啊 
        return create(new OnSubscribeFromIterable<T>(iterable));
    }

弄清楚这个,我们看一下 被观察者observable 和 订阅者Subscriber的调用关系
observable.subscribe(subscriber);

结果发现subscribe方法也有几个重载的兄弟方法。此刻有点~~MMP

  final Action1<String> onNestAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(tag, s);
            }
        };
        final Action1<Throwable> onThrowable = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.d(tag, "throwable");
            }
        };
        final Action0 onCompletection = new Action0() {
            @Override
            public void call() {
                Log.d(tag, "completed");
            }
        };


1、
Subscription subscribe(final Observer<? super T> observer)
 observable.subscribe(observable);
2、
Subscription subscribe(Subscriber<? super T> subscriber)
observable.subscribe(stringSubscriber);
3、
Subscription subscribe(final Action1<? super T> onNext)
observable.subscribe(onNestAction)
4、
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
observable.subscribe(onNestAction, onThrowable);
5、
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)
observable.subscribe(onNestAction, onThrowable, onCompletection);

一个一个看吧,好心累哈哈

1:
 public final Subscription subscribe(final Observer<? super T> observer) {
//如果是Subscriber 就走这个方法
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
//如果是Observer   我们就是这个参数 其实套一层Subscriber,里面的方法还是
//Observer的方法,也就告诉我们以后 就用Subscriber好了,Subscriber应该有增强的功能,不然他也不会闲的这么做 我们到下边具体分析一下Subscriber
        return subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(T t) {
                observer.onNext(t);
            }

        });
    }
2:
 public final Subscription subscribe(Subscriber<? super T> subscriber) {
//跳的的Observable的静态方法 参数一个subscriber,一个自身Observable
        return Observable.subscribe(subscriber, this);
    }
|------->
    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        //首先调用的是 onStart()方法
        // new Subscriber so onStart it
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
//如果不是安全的Subscriber 那就套一层
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
//onSubscribeStart 方法直接 return onSubscribe 就是observable.onSubscribe
// 看到Observable类 里 声明 final OnSubscribe<T> onSubscribe;
//然后是onSubscribe.call(subscriber)  通过看我们置顶的图片,我们知道
//其实是调用Action1的call方法

            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (OnErrorNotImplementedException e2) {
                // special handling when onError is not implemented ... we just rethrow
                throw e2;
            } catch (Throwable e2) {
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

归根到底这个2方法就是Action1.call(subscriber)
ok我们接着看

  public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }
        if (onError == null) {
            throw new IllegalArgumentException("onError can not be null");
        }
//这货有调回 第二个方法,把其中的回调  使用了形参给Action对象来调用call
        return subscribe(new Subscriber<T>() {

            @Override
            public final void onCompleted() {
                // do nothing
            }

            @Override
            public final void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public final void onNext(T args) {
                onNext.call(args);
            }

        });
    }

  public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

最后本质是OnSubscribe.call()

hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);
//也就是
observable.onSubscribe.call(subscriber)
  final OnSubscribe<T> onSubscribe;OnSubscribe 实现Action1
//正好对应
 final Observable observable1 = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("你好啊");
                subscriber.onNext("这里是北京");
                subscriber.onNext("你好啊");
                subscriber.onCompleted();
            }
        });

参数Action1的方法和参数Observer都是 最后 return subscribe(new Subscriber<T>() 然后在回调方法里,写下相应的对象调用方法。
参数是Subscriber 的 调用OnSubscribe.call(Subscriber)正好和Observable的Create方法形成闭环。

RxJava优秀的地方在于线程控制

变换 ------这个是RxJava牛逼的地方了

借用抛物线大神的例子

Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

我们发现我们just之后用调用map方法,我们以前见过Action1方法,这个Func1方法又是个什么东西呢?

public interface Func1<T, R> extends Function {
    R call(T t);
}
public interface Function {

}

public interface Action1<T> extends Action {
    void call(T t);
}
public interface Action extends Function {

}

突然发现是本家啊,都是继承Function的接口,不过区别在于call函数的返回值,Action1<T> 没有返回值,而Func1<T, R>的目的是把T类型转换为R类型,别的都是一样的。原理照旧放到后面。
还有一个更加难以理解的函数flatMap()俗称铺平,Kotlin里也有这个函数。

  ArrayList<String> arrayList1 = new ArrayList<>();
                arrayList1.add("语文");
                arrayList1.add("数学");
                arrayList1.add("英语");
                ArrayList<String> arrayList2 = new ArrayList<>();
                arrayList2.add("语文");
                arrayList2.add("数学");
                arrayList2.add("政治");
                ArrayList<String> arrayList3 = new ArrayList<>();
                arrayList3.add("物理");
                arrayList3.add("数学");
                arrayList3.add("化学");
                ArrayList<String> arrayList4 = new ArrayList<>();
                arrayList4.add("地理");
                arrayList4.add("政治");
                arrayList4.add("英语");
                ArrayList<String> arrayList5 = new ArrayList<>();
                arrayList5.add("数学");
                arrayList5.add("化学");
                arrayList5.add("英语");
                Student[] students = new Student[]{new Student("小明", 12,arrayList1), new Student("花花", 1,arrayList2),
                        new Student("小红", 22,arrayList3), new Student("大胖", 12,arrayList4), new Student("哼哼子", 2,arrayList5)};


                Observable.from(students)
                        .flatMap(new Func1<Student, Observable<String>>() {
                            @Override
                            public Observable<String> call(Student student) {
                                return Observable.from(student.getCourse());
                            }
                        }).subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.e(tag,s.toString()+"");
                    }
                });

再次引用抛物线大神的话---》
flatMap() 和 map() 有一个相同点:
它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
栗子已经举出来了,我们开始看看原理吧,怎么实现的呢?先从简单的Map方法来看。

 public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//貌似没我们想的那么简单啊  OperatorMap是什么东西呢?
        return lift(new OperatorMap<T, R>(func));
    }

//接口Operator 继承Fun1 和Subscriber 说明 他应该有 call方法
  public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;
//原来是 Func1  方法
    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }
//看到这里感觉这个方法似曾相识,对滴 就是Observe和Action1调用的方法就是说Fun1和Action1本质是一样的,但是有不一样的地方比如onNext(T t
)
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
//注意 返回值 是Subscriber 也就是new OperatorMap<T, R>(func)返回的是Subscriber,由此我们了解OperatorMap这个类的作用就是把Fun1方法里的处理变成Subscriber,并且是已经处理好类型的。
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
//这里不是直接调用  o.onNext(t)而是调用transformer.call(t)的返回值。
Fun1.call(T t)的返回值正好是 他要转换的类型
//这其实就是中转了一下 ,也就是两次call方法。第一次是把原来返回值,编程转换后的返回值,然后调用 观察者的call方法,参数正好是转换后的类型。

                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}
//这样看起来已经处理完毕了 那么list(OperatorMap)方法是什么呢
 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//果然又是 这个老套的方法  借OnSubscribe壳调用operator的方法
        return new Observable<R>(new OnSubscribe<R>() {
//这个call方法已经调用的转过类型的Subscriber了
            @Override
            public void call(Subscriber<? super R> o) {
                try {
//实际是   st =  operator.call(o) 返回的是处理过的Subscriber
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
//一个新的Subscriber 创造出来了调用onStart()方法,
                        st.onStart();
//这里便是 onSubscribe调用Subscriber,和之前的流程一样的
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

map看起来确实很灵活,我们接着看flatMap

//ScalarSynchronousObservables我们不需要去管这个
 public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
//原来是间接的调用了map(func)返回变换过的Observable,和上面步骤一样,看来不同在merge()方法里了
        return merge(map(func));
    }

  public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
//又是lift方法啊我们知道 list方法 会根绝参数的Operator的处理,接受处理过的新的Subscriber 那我们看看OperatorMerge.<T>instance(false)是怎么变换的吧的吧
        return source.lift(OperatorMerge.<T>instance(false));
    }

 public static <T> OperatorMerge<T> instance(boolean delayErrors) {
        if (delayErrors) {
            return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
        }
//调用的是这个方法
        return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
    }

 private static final class HolderNoDelay {
        /** A singleton instance. */
        static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE);
    }


 @Override
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;
        child.add(subscriber);
        child.setProducer(producer);
        
        return subscriber;
    }

@Override
        public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t instanceof ScalarSynchronousObservable) {
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                addInner(inner);
                t.unsafeSubscribe(inner);
                emit();
            }
        }

这个内部实现就很复杂了,我本以为是递归,不想到里面的东西真的好多,我的理解是 是 先是left变化,然后再一次的left的变化,或者是再一次对Observable的处理。看源码真的是一脸懵逼,只能连蒙带猜。

大神的原话:
讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

image.png
上一篇下一篇

猜你喜欢

热点阅读