java基础

JAVA之RxJava

2021-08-03  本文已影响0人  isLJli

RxJava概述

RxJava的基本使用

RxJava有三个基本的元素:

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

首先在gradle文件中添加依赖:

implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

最新的版本可查看官网 :
RxJava: https://github.com/ReactiveX/RxJava
RxAndroid: https://github.com/ReactiveX/RxAndroid

  1. 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
      emitter.onNext(1);
      emitter.onComplete();
  }
});
  1. 创建观察者
Observer observer = new Observer<Integer>
   @Override
   public void onSubscribe(@NonNull Dispo
       Log.d("数据", "onSubscribe");
   }
   @Override
   public void onNext(@NonNull Integer in
       Log.d("数据", "onNext" + integer);
   }
   @Override
   public void onError(@NonNull Throwable
       Log.d("数据", "onError");
   }
   @Override
   public void onComplete() {
       Log.d("数据", "onComplete");
   }
};
  1. 订阅
observable.subscribe(observer);

被观察者只有通过subscribe订阅绑定观察者后,才可以发送数据给观察者。

RxJava核心思想

RxJava的核心思路是观察者模式和响应式编程。

  1. 观察者模式: Observable被观察者就像一个生产者,Observer观察者像是一个消费者,生产者通过subscribe订阅消费者,并开始把数据发送给消费者。
  2. 响应式编程:把Observable当作起点Observer是终点,其中间可以添加各式的操作符。
    img

RxJava配合Retrofit

Retrofit本身并没有如OkHttp请求网络和RxJav发送数据的功能,但它对这些做了封装管理,所以项目中经常使用OkHttp和RxJava配合Retrofit请求数据。下面做一个demo:

  1. 请求API
public interface NetApi {

  @POST("register")
  @FormUrlEncoded
  Observable<RegisterResponse> registerAction(@Body RegisterRequest request);

}
  1. 创建Retrofit配合OkHttp、RxJava、Gson
public class RetrofitUtil {

  public static <T> T create(Class<T> tClass) {
      return new Retrofit.Builder()
              .baseUrl("http://xxxxxxx")
              .client(getHttpClient())
              .addConverterFactory(GsonConverterFactory.create())
              .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
              .build()
              .create(tClass);
  }

  private static OkHttpClient getHttpClient() {
      OkHttpClient.Builder builder = new OkHttpClient.Builder()
              .readTimeout(10, TimeUnit.SECONDS)
              .connectTimeout(9, TimeUnit.SECONDS);
      if (BuildConfig.DEBUG) {
          HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
          interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
          builder.addInterceptor(interceptor);
      }
      return builder.build();
  }

}

  1. RxJava获取数据流
RetrofitUtil.create(NetApi.class)
      .registerAction(new RegisterRequest())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<RegisterResponse>() {
          @Override
          public void onSubscribe(@NonNull Disposable d) {
          }
          @Override
          public void onNext(@NonNull RegisterResponse registerRe
          }
          @Override
          public void onError(@NonNull Throwable e) {
          }
          @Override
          public void onComplete() {
          }
      });

通过Retrofit的配合,我们不用写OkHttp的请求网络执行代码,也不用把Json字符串转换成对象。只需定义RxJava的操作和接收。

RxJava模式与原理

RxJava的操作符应用

Rxjava操作符应用.png

我们可以通过create、just、fromIterable等操作符去请求发送数据;通过map操作符去操作转换发送的数据;通过flatMap操作符重新创建Observable发送数据;通过concat、merge操作符组合多个事件的发送;通过subscribeOn操作符决定上面代码的执行线程环境,通过observeOn操作符决定下面代码的执行线程环境。
更多的操作符号使用可参考:RxJava2 只看这一篇文章就够了

RxJava之create原理分析

wecom-temp-84d497613df1cb553b4478c5abce56d1.png
上面这段代码可以分成两部分,也是最简单的起点和终点

1. Observable.create()返回ObservableCreate

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
   Objects.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

并把ObservableOnSubscribe作为 source参数传入ObservableCreate类中:

public final class ObservableCreate<T> extends Observable<T> {
 // 把`ObservableOnSubscribe`传进来
  final ObservableOnSubscribe<T> source;
  public ObservableCreate(ObservableOnSubscribe<T> source) {
      this.source = source;
  }
....

2. 通过subscribe订阅观察者,执行ObservableCreate.subscribe()方法:

subscribe()

3. 往下执行数据给Observer
source的实例的subscribe方法

subscribe

通过包裹了Observer的ObservableEmitter类给Observer发送数据

onNext

整体流程图如下:


create流程图

RxJava之map原理分析

map代码

1. Observable.map返回ObservableMap对象

ObservableMap

2. 往上执行Observable的subscribeActual方法

ObservableMap.subscribeActual
ObservableCreate.subscribeActual

3. 执行各操作符的方法,并往下发送数据给Observer

上一篇下一篇

猜你喜欢

热点阅读