RxJava 使用篇

2018-09-12  本文已影响154人  RobinYeung

一、什么是RxJava

Rx = Reactive Extension

二、基本知识

观察源

观察源

事件源 Subject

背压

事件流

操作符

线程调度

三、其他

扩展 Rx相关库

// RxBinding
RxView.clicks(view)
    .throttleFirst(ms, TimeUnit.MILLISECONDS)
    .compose(RxLifecycleAndroid.bindView(view))
    .subscribe(x -> listener.onClick(view));
    
// RxPermissions
new RxPermissions(getActivity())
    .request(Manifest.permission.CAMERA)
    .subscribe(granded -> {
        if(granted) {
            // ...
        }
    });
    
// RxLifecycle
public class OneFragment extends RxFragment {
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(saveInstanceState);
        PublishSubject.create()
            .compose(bindUntilEvent(FragmentEvent.DESTROY))
            .subscribe();
    }
}

// RxPreferences
RxPreferences.INSTANCE
    .<String>onPreferenceChanged()
    .filter(p -> TextUtils.equels(p.getKey(), SOME_PREFERENCE_KEY))
    .subscribe(p -> ...);
    
// RxNetwork
RxNetwork.INSTANCE
    .onConnectionChanged()
    .filter(info -> info.getNetworkType() == ConnectivityManager.TYPE_WIFI)
    .filter(info -> !info.isConnected())
    .observeOn(AndroidSchedulers.mainThread())
    .subcribe(info -> ...);

常见坑

// a()运行在computation b()运行在io c() d()运行在主线程
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable
            .create(emitter -> c())
            .subscribeOn(AndroidSchedulers.mainThread());            
    })
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
// 当把flatMap里面的subscribeOn移到主流程上,事情就变了
// a()运行在主线程上 b() c() d()运行在io。第二个subscribeOn不生效
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable.create(emitter -> c())
    })
    .subscribeOn(AndroidSchedulers.mainThread());         
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
上一篇下一篇

猜你喜欢

热点阅读