每天吃一点Androidの响应式编程之RxJava\RxAndr

2018-08-28  本文已影响344人  耑意儿
目录:
1、前言
2、创建
3、订阅
4、线程控制:调度器Scheduler
5、网络请求失败,重试
Android小仙
感谢以下文章提供的指导:
给Android开发者的RxJava详解
RxJava/RxAndroid 使用实例实践

一、前言

1、RxJava到底是做什么的?

异步:
也就是说,RxJava是一个实现异步操作的库

2、同样是做异步,RxJava跟AsyncTask / Handler / XXX / ... 有什么区别?

简洁:
随着程序逻辑变得越来越复杂,依然能够保持简洁。
逻辑简洁
在AndroidStudio中可以实现Lambda化预览

3、目的:“后台处理,前台回调”的异步机制

二、创建

2.1、创建观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
2.2、创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

2.3、关于订阅者与观察者

订阅者内部实现了观察者的基本方法,从某种程度上来讲,订阅者同时扮演了观察者这个角色。

2.4、订阅者与观察者的区别
2.5、创建被观察者

被观察者持有订阅者对象,当事件执行到相应阶段时,调用订阅者的方法以达到提醒的作用。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

三、订阅

给 被观察者Observable 配置 观察者Observer/Subscriber

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

其实真的很别扭,按照逻辑来说应该是观察者订阅被观察者,但是从代码上来看却变成了反过来的
!!!

四、线程控制:调度器Scheduler

RxJava遵循的是线程不变的原则,在哪个线程调用subscribe就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
如果需要切换线程,那就需要用到调度器了。

4.1、示例

栗子一、

Observable.just(1, 2, 3, 4)
     // 指定 subscribe() 发生在 IO 线程
    .subscribeOn(Schedulers.io()) 
    // 指定 Subscriber 的回调发生在主线程
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

栗子二、

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

五、重试

1、重试类
public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {

    public final String TAG = this.getClass().getSimpleName();
    private final int maxRetries;
    private final int retryDelaySecond;
    private int retryCount;

    public RetryWithDelay(int maxRetries, int retryDelaySecond) {
        this.maxRetries = maxRetries;
        this.retryDelaySecond = retryDelaySecond;
    }

    @Override
    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
        return throwableObservable
                .flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                        if (++retryCount <= maxRetries) {
                            // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                            Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
                                    + " second, retry count " + retryCount);
                            return Observable.timer(retryDelaySecond,
                                    TimeUnit.SECONDS);
                        }
                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
2、网络请求
HttpApi.api(getActivity()).getAdImage()
                // 表示getAdImage执行在I/O线程
                .subscribeOn(Schedulers.io())
                // 重试,三次,延迟1秒
                .retryWhen(new RetryWithDelay(2,1))
                // 在开始网络请求前的预操作
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        getActivity().showToast("要准备开始网络请求咯!!!");
                    }
                })
                // 表示上面的doOnSubscribe执行在主线程
                .subscribeOn(AndroidSchedulers.mainThread())
                // 表示下面的观察者执行在主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> images) throws Exception {
                        adImageUrls.clear();
                        adImageUrls.addAll(images);
                        getView().setListData(images);
                    }
                    // 异常处理
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        getActivity().showToast(getActivity().getString(R.string.net_loading_failed));
                    }
                });


上一篇下一篇

猜你喜欢

热点阅读