述术Rxjava相关Android优秀开源

RxJava 从入门到放弃

2016-09-03  本文已影响4784人  lovejjfg

叫这个题目也是因为这篇博客写了太久太久了!有段时间都觉得完全没有必要写下去的,索性终于完工了,也算是对这段时间的肯定吧!

RxJava基本概念

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

接下来就围绕Observable创建、Observer创建、线程切换、事件类型转换、订阅和取消订阅展开。

Observerble的基本创建方式:

1、create()//最基本的

Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("xxixxii");
            subscriber.onCompleted();//这里必须调用该方法或者onError(),通知订阅者发送完毕,否者无法进行解除订阅。

        }
    });

2、form()//适配集合等

    ArrayList<Student> students = new ArrayList<>();
    students.add(s1);
    students.add(s2);
    students.add(s3);
    students.add(s1);
    students.add(s4);
    students.add(s5);
    students.add(s6);

    Observable.from(students)

3、just()//适配已经写好的方法

    Student s1 = new Student(19, "xiaoqiang");
    Student s2 = new Student(19, "xiaoqiang1");
    Student s3 = new Student(20, "xiaoqiang1");
    Student s4 = new Student(19, "xiaoqiang");
    Student s5 = new Student(21, "xiaoqiang");
    Student s6 = new Student(22, "xiaoqiang");

    Observable.just(s1, s2, s3, s4, s5, s6)

4、merge(o1,02)//将多个合并为一个
5、concat(o1,o2)//one by one emit!

Observer Subscriber的创建

     Subscriber<String> stringSubscriber = new Subscriber<String>() {
        @Override
        public void onStart() {
            Log.e(TAG, "onStart: ");
        }

        @Override
        public void onCompleted() {
            Log.e(TAG, "onCompleted: ");

        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: ");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "onNext: " + s);
        }
    };

    Observer<String> stringObserver = new Observer<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }
    };

这里需要注意:ObserverSubscriber不仅基本使用方式一样,实质上,在 RxJavasubscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 ObserverSubscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onStop()onDestory()等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生。

强大的条件筛选

说了这么多没用的东西,肯定要来点儿实际的才能体会到RxJava的强大功能!

1、take( )只发送指定数量的事件。
2、filter( )过滤指定条件的事件。
3、first()只发送第一个事件。
4、distinct( )只发送不同的事件。(怎么定义为不同?!)
其实还有很多。。。

随时随地线程切换

说完创建过滤你可能觉得这也没撒嘛!那么接下来想想之前在Android开发里要切换线程需要怎么处理呢?view.post()或者使用handler.sendMessage()!而在RxJava中,线程切换不用这么搞了,Schedulers是RxJava中用来管理相关线程调度的,基于订阅和被订阅,这里有两个方法!
1、subscribeOn() 事件产生在哪个线程。

2、observeOn()事件消费在哪个线程。

3、Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
4、Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

5、Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。其行为模式和 newThread()是差不多滴,但是区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
6、Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()中,否则 I/O 操作的等待时间会浪费 CPU。
7、AndroidSchedulers.mainThread():Android 特供,它指定的操作将在 Android 主线程运行。

transform 转换

强大的内部转换功能,让你可以做到要什么就是什么。

1、map():进行对象转换,不会创建新的Observable
2、flatMap():也是进行对象转换,会创建新的Observable
3、buffer()、:缓冲区,缓冲指定的Observable包装成新的
4、Observable发射。
5、toList():将单个的对象转换为集合。

取消订阅

爽了之后重视要记住一件事,那就是要释放相关资源!不然后果也是很严重的,尤其是在使用RxView相关的方法时会警告你需要调用Unsubscribe()来释放相关的引用。

warn.png

释放操作其实很简单。定义一个集合维护相关的Subscription,然后在ActivityonStop()或者onDestroy()方法中释放相关资源。

    Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
            .throttleFirst(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "clicks->doOnUnsubscribe");
                }
            })
            .subscribe(new Action1<Void>() {
                @Override
                public void call(Void aVoid) {
                    methd6();
                }
            });
    //维护相关的资源引用
    subscriptions.add(clickSubscribe);

@Override
protected void onDestroy() {
    for (Subscription s : subscriptions) {
        if (!s.isUnsubscribed()) {
            s.unsubscribe();
            Log.e(TAG, "onDestroy: 取消订阅!");
        }
    }
    super.onDestroy();
}

手动create()一个Observable的话,一定要调用 onComplete()或者onError()来结束这个事件,不然资源也不会被释放的。

动手时间

练习一

统计集合中年龄大于20的学生姓名(年龄姓名一致的视为同一个!)
首先当然是创建Observable

    //初始化数据
    Student s1 = new Student(19, "xiaoqiang0");
    Student s2 = new Student(20, "xiaoqiang1");
    Student s3 = new Student(21, "xiaoqiang2");
    Student s4 = new Student(22, "xiaoqiang3");
    Student s5 = new Student(23, "xiaoqiang4");
    Student s6 = new Student(24, "xiaoqiang5");
    Student s7 = new Student(25, "xiaoqiang6");
    Student s8 = new Student(25, "xiaoqiang5");
    students = new ArrayList<>();
    students.add(s1);
    students.add(s2);
    students.add(s3);
    students.add(s1);
    students.add(s4);
    students.add(s5);
    students.add(s6);
    students.add(s7);
    students.add(s8);

        Observable.just(students)//创建Observable
            .flatMap(new Func1<ArrayList<Student>, Observable<Student>>() {
                @Override
                public Observable<Student> call(ArrayList<Student> students) {
                    //变换为新的Observable
                    return Observable.from(students);
                }
            })
            //过滤掉年龄和姓名相同的对象
            .distinct()
            //过滤掉年龄小于20的对象
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 20;
                }

            })
            //将事件对象由Student 转换为 String
            .map(new Func1<Student, String>() {
                @Override
                public String call(Student student) {
                    return student.getName();
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: 取消订阅了!!");

                }
            })
            //订阅
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "onNext: " + s);
                }
            });


E/MainActivity: onNext: xiaoqiang1
E/MainActivity: onNext: xiaoqiang2
E/MainActivity: onNext: xiaoqiang3
E/MainActivity: onNext: xiaoqiang4
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: onNext: xiaoqiang6
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: call: 取消订阅了!!

PS:至于这里对象的唯一性判断是通过复写equal()hashCode()来实现的!

@Override
public boolean equals(Object o) {
    return o instanceof Student && this.getAge() == ((Student) o).getAge() && this.getName().equals(((Student) o).getName());
}

@Override
public int hashCode() {
    return Arrays.hashCode(new Object[]{getAge(), getName()});
}

练习二

concat()使用:本地有缓存读取本地的数据,没有走网络请求并缓存到本地。

Observable<String> netObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            Log.e(TAG, "走网络了!!");
            subscriber.onNext("这是缓存数据!!");
            subscriber.onCompleted();
        }
    }).doOnNext(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.e(TAG, "call: 保存数据到本地");
            rxPreferences.getString("cash").asAction().call(s);

        }
    });
    Observable<String> nativeObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            if (TextUtils.isEmpty(rxPreferences.getString("cash").get())) {
                Log.e(TAG, "没有缓存,走网络!");
                subscriber.onCompleted();
            } else {
                Log.e(TAG, "有缓存!");
                subscriber.onNext(rxPreferences.getString("cash").get());
                subscriber.onCompleted();
            }
        }
    });

    Observable.concat(nativeObservable, netObservable)
            .first()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成了!");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "错误了!");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, s);
                }
            });

   //第一次              
 E/MainActivity: 没有缓存,走网络!
 E/MainActivity: 走网络了!!
 E/MainActivity: call: 保存数据到本地
 E/MainActivity: 这是缓存数据!!
 E/MainActivity: 完成了!
 //第二次
 E/MainActivity: 有缓存!
 E/MainActivity: 这是缓存数据!!
 E/MainActivity: 完成了!

练习三

merge()使用:服务端和本地都有相关数据,汇总展示。使用merge()对应的事件顺序是无序的,谁先产生了谁就先发送!

    Observable<String> just = Observable.just("S", "O", "S")
            .subscribeOn(Schedulers.newThread())
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    SystemClock.sleep(20);
                }
            });

    Observable<String> just1 = Observable.just("S","T","R").subscribeOn(Schedulers.newThread())
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    SystemClock.sleep(20);
                }
            });

    Observable.merge(just1, just)
            .subscribeOn(Schedulers.newThread())
            .distinct()
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, "call: " + s);
                }
            });


 E/MainActivity: call: S
 E/MainActivity: call: T
 E/MainActivity: call: R
 E/MainActivity: call: O
//或者这样
 E/MainActivity: call: S
 E/MainActivity: call: T
 E/MainActivity: call: O
 E/MainActivity: call: R

练习四

RxView和RxCompoundButton的使用:

RxCompoundButton.checked(checkBox).call(rxPreferences.getBoolean("checked").get());
    //noinspection ConstantConditions
    Subscription checkedSubscription1 = RxCompoundButton.checkedChanges(checkBox)
            .subscribe(rxPreferences.getBoolean("checked").asAction());
    subscriptions.add(checkedSubscription1);

    Subscription checkedSubscription = rxPreferences.getBoolean("checked")
            .asObservable()
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "rxPreferences->doOnUnsubscribe");

                }
            })
            .subscribe(new Subscriber<Boolean>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "rxPreferences->onNext: +onCompleted");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Boolean aBoolean) {
                    Log.e(TAG, "rxPreferences->onNext: " + aBoolean);
                }
            });
    subscriptions.add(checkedSubscription);

练习五

RxJava和Retrofit的搭配使用:

   Subscription beforeSubscribe = BaseDataManager.getDailyApiService()
        .getBeforeDailyStories(date)
                .subscribeOn(Schedulers.io())//事件产生在子线程
                .doOnSubscribe(new Action0() {//subscribe之后,事件发送前执行。
                    @Override
                    public void call() {
                        isLoadingMore = true;
                        Log.e("TAG", "call: true");
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<DailyStories>() {
                    @Override
                    public void call(DailyStories dailyStories) {
                        mView.onLoadMore(dailyStories);
                        mView.isLoadingMore(false);
                        Log.e("TAG", "call: false");
                        isLoadingMore = false;
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        mView.onLoadError(throwable.toString());
                        isLoadingMore = false;
                    }
                });
        subscribe(beforeSubscribe);

相关回调

对于Observable,这里有一系列的回调方法,作用在不同的时期,其中常用的是doOnSubscribe()doOnNext()
另外在Subscriber里,还有一个onStart()的方法!

callBack.png
Observable.just("L", "O", "V", "E")
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnSubscribe");

                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnUnsubscribe");
                }
            })
            .doOnEach(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "doOnEach: onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "doOnEach: onError");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "doOnEach: onNext:"+s);
                }
            })
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, "call: doOnNext");
                }
            })
            .doOnRequest(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.e(TAG, "call: doOnRequest");
                }
            })
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnTerminate");
                }
            })
            .doAfterTerminate(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doAfterTerminate");
                }
            })
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "subscribe->call: onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "subscribe->call: onError");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "subscribe->call: onNext:" + s);
                }
            });

E/MainActivity: subscribe->call: onStart:
E/MainActivity: call: doOnRequest
E/MainActivity: call: doOnSubscribe
E/MainActivity: doOnEach: onNext:L
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:L
E/MainActivity: doOnEach: onNext:O
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:O
E/MainActivity: doOnEach: onNext:V
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:V
E/MainActivity: doOnEach: onNext:E
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:E
E/MainActivity: doOnEach: onCompleted
E/MainActivity: call: doOnTerminate
E/MainActivity: subscribe->call: onCompleted
E/MainActivity: call: doOnUnsubscribe
E/MainActivity: call: doAfterTerminate

通过Log可以看到一个事件的订阅过程。
首先回调Subscriber.onStart()表明ObservableSubscriber已经建立了连接,但是这个时候事件还没有开始发射。
然后是 doOnRequest()doOnSubscribe(),这个时候事件也没有开始发射。
接着是doOnEach()doOnNext()onNext(),这个时候事件正式发射了。
最后发射完了,就是onCompleteddoOnTerminate(),然后取消订阅释放资源doOnUnsubscribe()

Rx全家桶

欢迎加入Rx全家桶豪华套餐,只有你想不到的,没有它做不到的!

Rx全家桶

小结

总的来说,使用RxJava之后,可以让我们的代码更加清晰一目了然,而且线程的切换也非常的简单,不用自己维护相关线程和写那该死的Runnable,内部强大的事件转换和筛选过滤也是为我们开发省去了不少的工作。

参考文档

1、给 Android 开发者的 RxJava 详解
2、RxJava wiki
3、RxJava使用场景小结
4、Demo下载

---- Edit By Joe ----

上一篇 下一篇

猜你喜欢

热点阅读