RxJava 使用总结

2023-01-20  本文已影响0人  Dapengyou

Rxjava

角色 作用 类比
Observable(被观察者) 事件的生产者 顾客
Observer(观察者) 事件消费者,接收事件后作出响应 厨师
Subscribe(订阅) 将Observable和Observer连接在一起 服务员
Event(事件) Observable通知Observer的载体 菜品

RxJava 3 主要特点

RxJava 3 与RxJava 2的主要区别是:

下面是具体使用

依赖引用

implementation "io.reactivex.rxjava3:rxjava:3.1.5"

Observable(被观察者)的创建

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
});

Observer(观察者) 的创建:

Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
};

形成订阅

通过 subscribe 形成订阅

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

        Log.d(TAG, "onSubscribe: 达成订阅");
    }

    @Override
    public void onNext(String s) {

        Log.d(TAG, "onNext: 响应了"+s);
    }

    @Override
    public void onError(Throwable e) {

        Log.d(TAG, "onError: 执行错误");
    }

    @Override
    public void onComplete() {

        Log.d(TAG, "onComplete: 执行完成");
    }
});

更简洁的代码

如果我们不关心onCompleted()或者onError()的话,那么可以使用一个更简单的类来定义onNext()期间要完成什么功能

Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

当我们不关心onCompleted()或者onError()时,为了更简化代码,可以使用以下代码:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

但是这种简便的写法不利于处理异常,所以可能会报 crash,这时再添加一个 new Action1<Throwable>() 能够很好的 catch 住 crash,代码如下:

Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        });

如果还想实现 onCompleted() 还可以再加一个 new Action0() 用来完成 onCompleted(), 代码如下:

 Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error encountered: " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("Sequence complete");
            }
        });

更简便点还可以使用 Lambda:

 Observable.unsafeCreate((Observable.OnSubscribe<String>) subscriber -> {
        System.out.println(subscriber);
        }).subscribe(s -> System.out.println(s), 
                throwable -> System.out.println("Error encountered: " + throwable.getMessage()),
                () -> System.out.println("Sequence complete"));

分配线程

subscribeOn

为上面的内容 分配线程 Schedulers.io()

observeOn

为下面分配线程,一般是主线程 AndroidSchedulers.mainThread()

开发中常用到的操作符

例子:使用 map 将 Long 型转化为 String,再使用 FlatMap 将 String 转化为 Long

 Observable.interval(5, TimeUnit.SECONDS)
                .map(new Func1<Long, String>() {
                    @Override
                    public String call(Long aLong) {
                        return String.valueOf(aLong);
                    }
                })
                .flatMap(new Func1<String, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(String aLong) {
                        Observable<Long> observable = Observable.just(Long.parseLong(aLong));
                        return observable;
                    }
                }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long string) {
                            Log.d("test", String.valueOf(string));
                    }
                });

Func vs Action:Func 提供数据,而 Action 消耗数据,Func0 代表没有参数,Func1 有一个参数,以此类推,Action 同理,可以有 N 个参数

操作符列表

下面是常用的操作符列表:

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 Catch和Retry
  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操作 To
  10. 连接操作 Connect, Publish, RefCount, Replay
  11. 反压操作,用于增加特殊的流程控制策略的操作符

创建操作

用于创建Observable的操作符

变换操作

这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档

过滤操作

这些操作符用于从Observable发射的数据中进行选择

组合操作

组合操作符用于将多个Observable组合成一个单一的Observable

错误处理

这些操作符用于从错误通知中恢复

辅助操作

一组用于处理Observable的操作符

条件和布尔操作

这些操作符可用于单个或多个数据项,也可用于Observable

算术和聚合操作

这些操作符可用于整个数据序列

连接操作

一些有精确可控的订阅行为的特殊Observable

转换操作

上一篇下一篇

猜你喜欢

热点阅读