JAVA之RxJava
RxJava概述
RxJava的基本使用
RxJava有三个基本的元素:
- 被观察者(Observable)
- 观察者(Observer)
- 订阅(subscribe)
首先在gradle文件中添加依赖:
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
最新的版本可查看官网 :
RxJava: https://github.com/ReactiveX/RxJava
RxAndroid: https://github.com/ReactiveX/RxAndroid
- 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onComplete();
}
});
- 创建观察者
Observer observer = new Observer<Integer>
@Override
public void onSubscribe(@NonNull Dispo
Log.d("数据", "onSubscribe");
}
@Override
public void onNext(@NonNull Integer in
Log.d("数据", "onNext" + integer);
}
@Override
public void onError(@NonNull Throwable
Log.d("数据", "onError");
}
@Override
public void onComplete() {
Log.d("数据", "onComplete");
}
};
- 订阅
observable.subscribe(observer);
被观察者
只有通过subscribe
订阅绑定观察者后,才可以发送数据给观察者。
RxJava核心思想
RxJava的核心思路是观察者模式和响应式编程。
- 观察者模式:
Observable被观察者
就像一个生产者,Observer观察者
像是一个消费者,生产者通过subscribe
订阅消费者,并开始把数据发送给消费者。 - 响应式编程:把
Observable当作起点
,Observer是终点
,其中间可以添加各式的操作符。
img
RxJava配合Retrofit
Retrofit本身并没有如OkHttp请求网络和RxJav发送数据的功能,但它对这些做了封装管理,所以项目中经常使用OkHttp和RxJava配合Retrofit请求数据。下面做一个demo:
- 请求API
public interface NetApi {
@POST("register")
@FormUrlEncoded
Observable<RegisterResponse> registerAction(@Body RegisterRequest request);
}
- 创建Retrofit配合OkHttp、RxJava、Gson
public class RetrofitUtil {
public static <T> T create(Class<T> tClass) {
return new Retrofit.Builder()
.baseUrl("http://xxxxxxx")
.client(getHttpClient())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build()
.create(tClass);
}
private static OkHttpClient getHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.readTimeout(10, TimeUnit.SECONDS)
.connectTimeout(9, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return builder.build();
}
}
- RxJava获取数据流
RetrofitUtil.create(NetApi.class)
.registerAction(new RegisterRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RegisterResponse>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull RegisterResponse registerRe
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
通过Retrofit的配合,我们不用写OkHttp的请求网络执行代码,也不用把Json字符串转换成对象。只需定义RxJava的操作和接收。
RxJava模式与原理
RxJava的操作符应用
Rxjava操作符应用.png我们可以通过create、just、fromIterable等操作符去请求发送数据;通过map操作符去操作转换发送的数据;通过flatMap操作符重新创建Observable发送数据;通过concat、merge操作符组合多个事件的发送;通过subscribeOn操作符决定上面代码的执行线程环境,通过observeOn操作符决定下面代码的执行线程环境。
更多的操作符号使用可参考:RxJava2 只看这一篇文章就够了
RxJava之create原理分析
wecom-temp-84d497613df1cb553b4478c5abce56d1.png上面这段代码可以分成两部分,也是最简单的
起点和终点
。
1. Observable.create()返回ObservableCreate
类
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
并把ObservableOnSubscribe作为 source
参数传入ObservableCreate类中:
public final class ObservableCreate<T> extends Observable<T> {
// 把`ObservableOnSubscribe`传进来
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
....
2. 通过subscribe
订阅观察者,执行ObservableCreate.subscribe()方法:
- ObservableCreate先执行父类Observable的.subscribe()方法,最后把
Observer
实例传给ObservableCreate类中的subscribeActual
方法执行:
subscribeActual - 首先把observer包装了一层,并调用了observer.onSubscribe()方法
- 执行source接口实例的subscribe,并把包裹的parent传了过去。
3. 往下执行数据给Observer
source
的实例的subscribe方法
通过包裹了Observer的ObservableEmitter
类给Observer发送数据
整体流程图如下:
create流程图
RxJava之map原理分析
map代码1. Observable.map返回ObservableMap对象
2. 往上执行Observable的subscribeActual
方法
ObservableCreate.subscribeActual
3. 执行各操作符的方法,并往下发送数据给Observer
-
create往下发送数据
create发送数据
create发送数据 -
map往下发送数据
map发送数据
map发送数据