RxJava语法练习
很早之前就听Rxjava 多么神奇,各种评论都有,说学习坡度高,难等等,于是亲自上手试试,说实话刚开始看到这些用法立马就蒙比了,和我们之前的方式不太一样,理解起来还是挺别扭的,之后找了一篇给 Android 开发者的 RxJava 详解 扔物线的,好好研究上几天,认真理解每个例子并自己动手敲出来,也就慢慢能体会到 rxjava 的好用之处了。
刚开始学习的时候,为了加深理解,按照这篇 NotRxJava懒人专用指南,认认真真敲了一遍很有用,推荐给大家。
内容包含如下:
如何获取,观察者模式,基本语法,操作符使用,线程控制使用
注意:以下部分内容部分段落部分引用自给 Android 开发者的 RxJava 详解,如有冒犯之处,我会及时删除。
一、如何获取
RxJava
RxAndroid
在项目中引入依赖就好了,我这里用的不是最新版本
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'
二、观察者模式
这个模式基本上都听过,使用过的就可以直接略过了。
先说下设计模式中的观察者模式:
当对象间存在一对多关系时,则使用观察者模式(Observer Pattern) 比如,当一个对象被修改时,则会自动通知它的依赖对象,观察者模式属于行为型模式。
使用在java中实现观察者模式需要用到java.util包中提供的Observable类和Observer接口,java已经给我们提供好类使用。
Observable可以查看java源码,下面是Observer接口:
public interface Observer {
void update(Observable observable, Object data);
}
举个简单例子:
古董被观察,观察者为people用来观察古董价钱波动
被观察者:
public class Antique extends Observable {
private float mPrice;// 价钱
public Antique(float price) {
this.mPrice = price;
}
public float getPrice() {
return this.mPrice;
}
public void setPrice(float price) {
super.setChanged();
super.notifyObservers(price);// 价格被改变
this.mPrice = price;
}
public String toString() {
return "古董价格为:" + this.mPrice;
}
}
观察者实现Observer接口,重写update方法即可
public class People implements Observer{
private String name;
public People(String name) {
this.name = name;
}
@Override
public void update(Observable observable, Object data) {
Log.e("","People update() -> update name:"+ this.name + ",price:"+ ((Float)data).floatValue());
}
}
主函数调用
Antique house = new Antique(1222f);
People p1 = new People("p1");
People p2 = new People("p2");
People p3 = new People("p3");
house.addObserver(p1);
house.addObserver(p2);
house.addObserver(p3);
Log.e("", house+""); // 输出价格
house.setPrice(111f);
Log.e("", house+""); // 输出价格
这样只要价格发生变化,就会通知所有订阅的人,也就实现了简单的观察者模式。
RxJava 中的观察者模式
观察者:Observer
被观察者:Observable
订阅:subscribe()
观察者
Observer即为观察者,处理事件发生后逻辑,如古董例子,价格改变后,需要updae信息,不过 Rxjava 这里多了几种处理情况,onCompleted(),onError(),onNext(),具体用法看 dmeo 就可以了。
被观察者
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。可以使用create()、just(T...)、from(T[])或from(Iterable<? extends T>)来创建一个 Observable ,并为它定义事件触发规则。
订阅
创建了Observable和Observer之后,再用subscribe()方法将它们联结起来。
三、基本语法
1、打印 Hello World
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted:");
}
@Override
public void onError(Throwable e) {
System.out.println("onError e:"+e);
}
@Override
public void onNext(String s) {
System.out.println("onNext s:"+s);
}
});
//还可以这么写
Observable.just("hello World").subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("call s:"+s);
}
});
2、将字符串数组 names 中的所有字符串依次打印出来
String[] names = {"111", "222", "333"};
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String name) {
System.out.println("test1 name:" + name);
}
});
// 输出
test1 name:111
test1 name:222
test1 name:333
另一种写法
@Test
public void test4(){
//1:被观察者
String [] names ={"111","222","333"};
Observable observable = Observable.from(names);
//2:观察者
Action1 onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println("test4 call"+s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable e) {
System.out.println("test4 call e:"+e);
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
System.out.println("test4 call onCompletedAction");
}
};
//3:订阅:被观察者被观察者订阅
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
}
3、just 将传入的参数依次打印
@Test
public void test2(){
Observable.just("1","2","3").subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("test2 onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("test2 e:"+e);
}
@Override
public void onNext(String s) {
System.out.println("onNext:"+s);
}
});
}
// onNext:1
// onNext:2
// onNext:3
// test2 onCompleted
4、 循环输出list
@Test
public void test5(){
Observable.from(Data.getCats().get(0).getlist()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("test5 call :"+s);
}
});
}
// test5 call :list:0
// test5 call :list:1
// test5 call :list:2
// test5 call :list:3
// test5 call :list:4
// test5 call :list:5
// test5 call :list:6
// test5 call :list:7
// test5 call :list:8
// test5 call :list:9
四、操作符
1、Filter:过滤操作,满足条件才可以通过
@Test
public void FilterTest(){
Observable.just(1,2,3,4,5,6).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 3;//只有>3的情况才会通过
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext integer:"+integer);
}
});
}
// onNext integer:4
// onNext integer:5
// onNext integer:6
// onCompleted
2、first,last 只处理第一个或最后一个
@Test
public void FirstLastTest(){
Observable.just(1,2,3,4,5,6).first()/*.last()*/.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext integer:"+integer);
}
});
}
// onNext integer:1
// onCompleted
3、 take 函数限制开始个数
@Test
public void TakeTest(){
Observable.just(1,2,3,4,5,6).take(3).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext integer:"+integer);
}
});
}
// onNext integer:1
// onNext integer:2
// onNext integer:3
// onCompleted
4、 takeLast:限制最后的个数
@Test
public void TakeLastTest(){
Observable.just(1,2,3,4,5,6).takeLast(3).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext integer:"+integer);
}
});
}
// onNext integer:4
// onNext integer:5
// onNext integer:6
// onCompleted
5、scan:累加器函数
@Test
public void ScanTest(){
Observable.just(1,2,3,4,5,6).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("ScanTest onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("ScanTest onNext interger:"+integer);
}
});
}
// ScanTest onNext interger:1
// ScanTest onNext interger:3
// ScanTest onNext interger:6
// ScanTest onNext interger:10
// ScanTest onNext interger:15
// ScanTest onNext interger:21
// ScanTest onCompleted
// 先输出1,
// 然后 1 + 2 = 3,输出3
// 然后 3 +3 = 6;输出 6
// 然后 4 + 6; 输出10
// 都是相加上一次的结果
6、map:遍历list中的name,使用map操作符来获取list中的单项
@Test
public void test7(){
Observable.from(Data.getCats(10)).map(new Func1<Cat, String>() {
@Override
public String call(Cat cat) {
return cat.toCat();//获取cat name
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("test7 onCompleted:");
}
@Override
public void onError(Throwable e) {
System.out.println("test7 onCompleted e:"+e);
}
@Override
public void onNext(String s) {
System.out.println("test7 onNext :"+s);
}
});
}
// test7 onNext :0
// test7 onNext :1
// test7 onNext :2
// test7 onNext :3
// test7 onNext :4
// test7 onNext :5
// test7 onNext :6
// test7 onNext :7
// test7 onNext :8
// test7 onNext :9
// test7 onCompleted:
7、flatMap: 循环List<Cat> -> cat里面的list -> list 中的 String
@Test
public void test8() {
//循环List<Cat> -> cat里面的list -> list 中的 String
Observable
.from(Data.getCats())
.flatMap(new Func1<Cat, Observable<String>>() {
@Override
public Observable<String> call(Cat cat) {
System.out.println("test8 call #########" + cat.toCat());
return Observable.from(cat.getlist());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("test8 call :" + s);
}
});
}
// 部分log
// test8 call #########Cat0
// test8 call :0:list
// test8 call :1:list
// test8 call :2:list
// test8 call :3:list
// test8 call :4:list
// test8 call #########Cat1
// test8 call :0:list
// test8 call :1:list
// test8 call :2:list
// test8 call :3:list
// test8 call :4:list
// test8 call #########Cat2
// test8 call :0:list
// test8 call :1:list
// test8 call :2:list
// test8 call :3:list
// test8 call :4:list
五、线程控制使用
以上的测试例子中,事件的发起和消费都是在同一个线程执行的,等同于 RxJava 是同步执行(学习使用),但是,观察者模式本身就是等触发后才通知其他订阅者,所以来了解下 RxJava 如何使用线程的。
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe()
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
- Scheduler 的 API (一)
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
-
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
-
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
-
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
-
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
-
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
ImageView显示图片:
Observable.just(R.drawable.ic_launcher).map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
return getResources().getDrawable(integer);
}
}).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, "Error!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
mImageView.setImageDrawable(drawable);
}
});
多次切换线程
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.newThread()) // 指定 subscribe() 发生在 IO 线程
.observeOn(Schedulers.io()) // 指定在IO线程处理
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer+1;
}
})
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(TAG, "number:" + number);
}
});
六、参考
七、小结
致此,我们需要明白和了解以下几点:
- 观察者模式是如何使用
- RxJava 中的观察者模式
- RxJava 基本语法
- RxJava 操作符学习
- RxJava 线程控制
练习代码已上传 WFRxJavaDemo,欢迎指正。