android

★42.RxJava2

2017-06-29  本文已影响0人  iDragonfly

简介

基础知识

相关概念

相关方法

简单示例

1. 普通示例

1. 定义Observable

Observable<Integer> observable = Observable.create(emitter -> {
    // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
    // 发送消息
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    // 消息发送完毕
    emitter.onComplete();
});

2. 定义Observer

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
        // 订阅时
        // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "" + value);
        // 接收到事件时
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
        // 接收到错误时
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
        // 事件接收完毕时
    }
};

3. 订阅

注意 :此处代码的书写顺序看起来不符合直觉,这样设计是为了便于 链式调用

observable.subscribe(observer);

2. 链式示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
    // 发送消息
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    // 消息发送完毕
    emitter.onComplete();
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
        // 订阅时
        // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "" + value);
        // 接收到事件时
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
        // 接收到错误时
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
        // 事件接收完毕时
    }
});

3. 无Observer示例

subscribe()的所有重载形式

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

代码

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    emitter.onNext(4);
}).subscribe(
        integer -> Log.d(TAG, "onNext: " + integer),
        throwable -> Log.d(TAG, "onError"),
        () -> Log.d(TAG, "onComplete"));

示例解说

事件流向图

事件流向 示意图
只发送onNext()事件
发送onComplete()事件
发送onError()事件

线程相关

切换线程示例

// observable的链式操作中
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

线程种类

线程种类 描述
Schedulers.newThread() 线程
Schedulers.computation() 用于CPU计算密集型的操作的 线程
Schedulers.io() 用于IO操作的 线程 ,如网络IO,文件IO,数据库IO。
AndroidSchedulers.mainThread() AndroidUI主线程

注意事项

过滤操作符

Sample

简介

示意图

变换操作符

Map

示意图

简介

用于将一种类型的 事件 转换为另一种类型的 事件

啰嗦示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
}).map(new Function<Integer, String>() {
    @Override
    public String apply(@NonNull Integer integer) throws Exception {
        return "This is result " + integer;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        Log.d(TAG, s);
    }
});

简洁示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
        .map(integer -> "This is result " + integer)
        .subscribe(s -> Log.d(TAG, s));

FlatMap

示意图

简介

啰嗦示例

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
    }
}).flatMap(new Function<Integer, Observable<String>>() {
    @Override
    public Observable<String> apply(Integer integer) throws Exception {
        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer);
        }
        return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

简洁示例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
        .flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
            }
        })
        .subscribe(s -> Log.d(TAG, s));

ConcatMap

Zip

示意图

简介

啰嗦示例

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "emit 1");
        emitter.onNext(1);

        Log.d(TAG, "emit complete1");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "emit A");
        emitter.onNext("A");

        Log.d(TAG, "emit complete2");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
        return integer + s;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        Log.d(TAG, "onNext: " + value);
    }
});

简洁示例

Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    Log.d(TAG, "emit 1");
    emitter.onNext(1);

    Log.d(TAG, "emit complete1");
    emitter.onComplete();
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
    Log.d(TAG, "emit A");
    emitter.onNext("A");

    Log.d(TAG, "emit complete2");
    emitter.onComplete();
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, (integer, s) -> integer + s).subscribe(s -> Log.d(TAG, "onNext: " + s));

特殊情况处理

对于ObserverUI主线程 进行 事件 处理,但是接收到 事件 时,所在的ActivityFragment已经退出的情况,需要通过调用Disposable.dispose()Subscription.cancel()中断订阅

1. 创建CompositeDisposable

2. 保存Disposable

Observer.onSubscribe(...)中:

@Override
public void onSubscribe(Disposable d) {
    mCompositeDisposable.add(d);
}

3. 中断订阅

ActivityFragmentonDestroy()中:

mCompositeDisposable.clear();

Flowable

简介

Backpressure问题简介

简洁示例

Flowable<Integer> upstream = Flowable.create(emitter -> {
    // 直到下游开始请求事件
    while (emitter.requested() == 0) {
        if (emitter.isCancelled())
            break;
    }
    Log.d(TAG, "emit 1");
    emitter.onNext(1);
    Log.d(TAG, "emit complete");
    emitter.onComplete();
}, BackpressureStrategy.ERROR);

Subscriber<Integer> downstream = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        Log.d(TAG, "onSubscribe");
        s.request(10);
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
        Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

示例解说

上一篇下一篇

猜你喜欢

热点阅读