android进阶

RxJava详解笔记

2017-03-13  本文已影响789人  Gray_Wang

注:本文仅为本人阅读原文章所做笔记,如有理解错误,欢迎指正

链接

原文
RxJava
RxAndroid

一、API介绍和原理简析

A(观察者)在B(被观察者)发生了需要被监控到的某个变化时的一瞬间做出反应

Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。

对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。

订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。

Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件。

RxJava 有四个基本概念:Observable (可观察者,即被观察者)Observer (观察者)subscribe (订阅)事件

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

但是在RxJava代码中却是Observable.subscribe(Observer)即被观察者订阅了观察者。

可以理解为被观察者通知观察者:你可以观察我了,我做出的变化将会让你知道。

二、基本实现

1.创建Observer(观察者)

(1) Subscriber的实现中多了一个可选的OnStart方法,此方法可以用作流程开始时的一些初始化操作;此方法默认在Subscribe(订阅)时调用,无法指定所执行的线程,因此不能用于改变UI,例如显示ProgressBar;如果需要,可以使用Observable.doOnSubscribe(),默认情况下doOnSubscribe方法执行在subscribe方法发生的线程,但是如果在doOnSubscribe方法之后有subscribeOn方法的话,它将执行在里它最近的subscribeOn方法指定的线程内。代码如下:

private void toSubscribe(Observable o, Subscriber s) {
    o.subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s);
}

(2) Subscriber还实现了另外一个接口Subscription中的unSubscribe方法,用于取消订阅防止内存泄漏,一般在这个方法调用之前会先调用isSubscribed方法判断一下状态。

2.创建Observable(被观察者)

3.Subscribe(订阅)

observable.subscribe(observer)或者observable.subscribe(subscriber)


Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

实例


String[] names = ...;
Observable.from(names)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String name) {
                Log.d(tag, name);
            }
        });

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

三、线程控制:Schedule(一)

Scheduler的API

SubscribeOn()指定subscribe()所发生的线程;ObserveOn()指定Subscriber所发生的线程。

四、变换

1.API

RxJava 提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。

Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });


Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
    }
};
Observable.from(books)
        .flatMap(new Func1<Book, Observable<String>>() {
            @Override
            public Observable<String> call(Book book) {
                return getBookName(book);
            }
        }).subscribe(subscriber);


Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);

flatMap()map()有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map()不同的是,flatMap()中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。flatMap()的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

2.变换的原理

变换的功能虽然各有不同,但是实质上都是针对事件序列的处理和发送。在RxJava的内部,它们是基于同一个基础的变化方法:lift(Operator)

精简来讲:在Observable执行了lift(Operator)方法之后,会返回一个新的Observable,这个新的Observable会像一个代理一样,负责接收原始的Observable发出的事件,并在处理之后发送给Subscriber。


Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String string) {
        // 接收处理之后的事件
    }
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
    }
});
// 返回一个Observable
observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        return new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
            @Override
            public void onNext(Integer integer) {
                // 处理事件并发送给Subscriber
                subscriber.onNext(integer + "");
            }
        };
    }
}).subscribe(subscriber);

也可以使用compse对Observable整体进行转换:


observable.compose(new Observable.Transformer<Integer, String>() {
    @Override
    public Observable<String> call(Observable<Integer> integerObservable) {
        return null;
    }
});

五、线程控制:Scheduler(二)


Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread)
        .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

通过对observeOn()的多次调用实现程序线程的多次切换;但是,不同于observeOn()subscribeOn()的位置放在那里都可以,但是它只能被调用一次。

六、其它

与Retrofit的结合使用、RxBinding、RxBus请阅读原文

上一篇 下一篇

猜你喜欢

热点阅读