RxJava2学习笔记

2020-02-23  本文已影响0人  星空下的萝卜

我们为什么选择RxJava


Rxjava - Events in RxJava

  1. Event can wrap a value(Integer, String, Object...)
  2. Event can wrap an exception
  3. An emitter can either produce events and complete or fail

Rxjava - Event Emitters

  1. Observable
    • Emit zero, one or multiple events, or an error
    • It comes from Rxjava1
    • Flexible and generalist
    • Hot or Cold observable
      1.Cold emitter just produce events when someone subscribe to it. This is the default one.
      2.Hot emitter produce events regardless there is or not a consumer subscribed to it.
    • Observable 会不断的将数据推向有兴趣对数据进行处理的人
  2. Flowable

    • Emit zero, one or multiple events
    • Does not exist in Rxjava1
    • Should be used for infinite streams
    • support for Backpressure(支持背压)
  3. Single - One event or an error
  4. Maybe - Zero or one event, or an error

RxJava - Operators

告诉发射器如何操纵事件
在RxJava中,Operators只是发射器类或对象内的方法

  1. Operators categories
    • Creating Observables
    • Transforming Observables
    • Filtering Observables
    • Combining Observables
    • Error handling
    • Observable Utility
    • Conditional and Boolean
    • Mathenatical and Aggregate
  2. Creating Operators

-Create Observables:

  • Defer
  • Empty
  • From
  • Interval
  • Timer

转换 Observables

观察者的变换算子,数据转换

  1. Map
    • transform the event data, returning a new data (same or different type) 转换事件数据,返回新数据(相同或不同类型)
  2. FlatMap
    • transform the event data, returning another Observable/emitter(with same or different event type)转换事件数据,返回另一个Observable / emitter(具有相同或不同的事件类型)

Exercise solution


RxJava 2依赖于4个基础接口,它们分别是


背压(Backpressure)

当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的操作。为避免过多的此类步骤(通常会因临时缓冲或需要跳过/删除数据而增加内存使用量),因此应用了所谓的背压,这是流控制的一种形式,其中的步骤可以表示多少个项目他们准备好处理了吗?在通常无法一步知道上游将发送给它多少项的情况下,这可以限制数据流的内存使用量。
在RxJava中,专用的Flowable类被指定为支持背压,而Observable被指定为非背压操作(短序列,GUI交互等)。其他类型,Single,Maybe和Completable不支持背压,也不应该支持,总有空间暂时存储一件物品。

观察者模式

RxJava2 以观察者模式为骨架,两种观察者模式:

RxJava骨架.png

Observable

线程调度方式

一、subScribeOn

    /**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     * <p>
     * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
     * </dl>
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
     * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
     * @see #observeOn
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

二、observeOn

    /**
     * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
     * asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
     * <p>
     * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
     * </dl>
     * <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
     * on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
     *
     * @param scheduler
     *            the {@link Scheduler} to notify {@link Observer}s on
     * @param delayError
     *            indicates if the onError notification may not cut ahead of onNext notification on the other side of the
     *            scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
     *            from upstream
     * @param bufferSize the size of the buffer.
     * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
     *         {@link Scheduler}
     * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
     * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
     * @see #subscribeOn
     * @see #observeOn(Scheduler)
     * @see #observeOn(Scheduler, boolean)
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

线程切换需要注意事项

待续。。。

参考文献:

RxJava官方文档
RxJava官方接口文档
参考了这个外国大佬的视频教程
简书作者 nanchen2251 的教程

上一篇下一篇

猜你喜欢

热点阅读