RxJava(一)
2021-04-29 本文已影响0人
涛涛123759
一、RxJava的简单实用
1、导包
在最外层的build.gradle中添加
project.ext {
compileSdkVersion = 28
retrofitVersion = "2.4.0"
gsonVersion = "2.8.2"
cookieVersion = "v1.0.1"
}
导入框架
// OkHttp相关
implementation 'com.facebook.stetho:stetho:1.4.2'
implementation 'com.facebook.stetho:stetho-okhttp3:1.4.2'
// 网络请求相关
implementation "com.squareup.retrofit2:retrofit:$rootProject.retrofitVersion"
implementation "com.squareup.retrofit2:retrofit-mock:$rootProject.retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$rootProject.retrofitVersion"
implementation 'com.squareup.okhttp3:logging-interceptor:3.5.0'
implementation "com.squareup.retrofit2:converter-scalars:$rootProject.retrofitVersion"
implementation "com.squareup.retrofit2:adapter-rxjava2:$rootProject.retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$rootProject.retrofitVersion"
implementation "com.google.code.gson:gson:$rootProject.gsonVersion"
implementation 'io.reactivex.rxjava2:rxjava:2.1.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1' // 操作功能防抖
二、具体代码实现
1、RxJava中的全局Hook
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@NonNull
@Override
public Observable apply(@NonNull Observable observable) throws Exception {
Log.i("--->", observable.toString());
return observable;
}
});
其中Hook主要是多我们操作符的监听
2、封装IO切换操作符
/**
* 封装我们的操作
*/
public final static <UD> ObservableTransformer<UD, UD> rxud() {
return new ObservableTransformer<UD, UD>() {
@Override
public ObservableSource<UD> apply(Observable<UD> upstream) {
return upstream.subscribeOn(Schedulers.io()) // 给上面代码分配异步线程
.observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程;
.map(new Function<UD, UD>() {
@Override
public UD apply(UD ud) throws Exception {
Log.d(TAG, "apply: 我监听到你了,居然再执行");
return ud;
}
});
}
};
}
三、加载Bitmap实例
网络图片链接
private final static String PATH = "http://pic1.win4000.com/wallpaper/c/53cdd1f7c1f21.jpg";
// 弹出加载框
private ProgressDialog progressDialog;
// ImageView控件,用来显示结果图像
private ImageView image;
具体分发流程
public void rxJavaDownloadImageAction(View view) {
// 内部会分发 PATH Stirng
Observable.just(PATH)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(PATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
}).map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
Paint paint = new Paint();
paint.setTextSize(88);
paint.setColor(Color.RED);
return drawTextToBitmap(bitmap, "同学们大家好", paint, 88, 88);
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
// 日志记录
Log.d(TAG, "apply: 是这个时候下载了图片啊:" + System.currentTimeMillis());
return bitmap;
}
})
.compose(rxud())
// 订阅 起点 和 终点 订阅起来
.subscribe(new Observer<Bitmap>() {
// 订阅开始
@Override
public void onSubscribe(Disposable d) {
// 预备 开始 要分发
// TODO 第一步
progressDialog = new ProgressDialog(DownloadActivity.this);
progressDialog.setTitle("download run");
progressDialog.show();
}
// TODO 第四步
// 拿到事件
@Override
public void onNext(Bitmap bitmap) {
image.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
// 错误事件
}
// TODO 第五步
// 完成事件
@Override
public void onComplete() {
if (progressDialog != null) {
progressDialog.dismiss();
}
}
});
}
图片上绘制文字 加水印
private final Bitmap drawTextToBitmap(Bitmap bitmap, String text, Paint paint, int paddingLeft, int paddingTop) {
Bitmap.Config bitmapConfig = bitmap.getConfig();
paint.setDither(true); // 获取跟清晰的图像采样
paint.setFilterBitmap(true);// 过滤一些
if (bitmapConfig == null) {
bitmapConfig = Bitmap.Config.ARGB_8888;
}
bitmap = bitmap.copy(bitmapConfig, true);
Canvas canvas = new Canvas(bitmap);
canvas.drawText(text, paddingLeft, paddingTop, paint);
return bitmap;
}
四、多个接口链式调用
接口API
public interface WangAndroidApi {
// 总数据
@GET("project/tree/json")
Observable<ProjectBean> getProject();
// ITem数据
@GET("project/list/{pageIndex}/json?cid=294") //
Observable<ProjectItem> getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid);
}
bean 类
代码删除了get/set方法
public class ProjectBean {
private int errorCode;
private String errorMsg;
private List<DataBean> data;
public static class DataBean {
private int courseId;
private int id;
private String name;
private int order;
private int parentChapterId;
private boolean userControlSetTop;
private int visible;
private List<?> children;
}
}
public class ProjectItem {
private DataBean data;
private int errorCode;
private String errorMsg;
public static class DataBean {
private int curPage;
private int offset;
private boolean over;
private int pageCount;
private int size;
private int total;
private List<DatasBean> datas;
public static class DatasBean {
private String apkLink;
private String author;
private int chapterId;
private String chapterName;
private boolean collect;
}
}
}
retrofit + okhttp 封装
public class HttpUtil {
private static final String TAG = "HttpUtils";
/**
* 默认 test-a环境
*/
public static String BASE_URL = "https://www.wanandroid.com/";
public static void setBaseUrl(String baseUrl) {
BASE_URL = baseUrl;
}
/**
* 根据各种配置创建出Retrofit
*
* @return 返回创建好的Retrofit
*/
public static Retrofit getOnlineCookieRetrofit() {
// OKHttp客户端
OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
// 各种参数配置
OkHttpClient okHttpClient = httpBuilder
.addNetworkInterceptor(new StethoInterceptor())
.readTimeout(10000, TimeUnit.SECONDS)
.connectTimeout(10000, TimeUnit.SECONDS)
.writeTimeout(10000, TimeUnit.SECONDS)
.build();
return new Retrofit.Builder().baseUrl(BASE_URL)
.baseUrl(BASE_URL)
.client(okHttpClient)
// 添加一个json解析的工具
.addConverterFactory(GsonConverterFactory.create(new Gson()))
// 添加rxjava处理工具
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
}
RxJava响应式调用
public class UseActivity extends AppCompatActivity {
private final static String TAG = UseActivity.class.getSimpleName();
private WangAndroidApi api;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_use);
api = HttpUtil.getOnlineCookieRetrofit().create(WangAndroidApi.class);
antiShakeActonUpdate();
}
/**
* TODO 功能防抖 + 网络嵌套 (解决嵌套的问题) flatMap
*/
@SuppressLint("CheckResult")
private void antiShakeActonUpdate() {
// 对那个控件防抖动
Button bt_anti_shake = findViewById(R.id.bt_anti_shake);
RxView.clicks(bt_anti_shake)
// 2秒钟之内 响应你一次
.throttleFirst(2000, TimeUnit.MILLISECONDS)
// 我只给下面 切换 异步
.observeOn(Schedulers.io())
.flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
@Override
public ObservableSource<ProjectBean> apply(Object o) throws Exception {
return api.getProject(); // 主数据
}
})
.flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
@Override
public ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception {
// 我自己搞一个发射器 发多次 10
return Observable.fromIterable(projectBean.getData());
}
})
.flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
@Override
public ObservableSource<ProjectItem> apply(ProjectBean.DataBean dataBean) throws Exception {
return api.getProjectItem(1, dataBean.getId());
}
})
// 给下面切换 主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ProjectItem>() {
@Override
public void accept(ProjectItem projectItem) throws Exception {
// 如果我要更新UI 会报错2 不会报错1
Log.d(TAG, "accept: " + projectItem);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
}
}
doOnNext()操作符的使用
MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
.registerAction(new RegisterRequest()) // todo 1.请求服务器注册操作 // todo 2
.subscribeOn(Schedulers.io()) // 给上面 异步
.observeOn(AndroidSchedulers.mainThread()) // 给下面分配主线程
.doOnNext(new Consumer<RegisterResponse>() { // todo 3
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
// todo 2.注册完成之后,更新注册UI
}
})
// todo 3.马上去登录服务器操作
.observeOn(Schedulers.io()) // 给下面分配了异步线程
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwor.class)
.loginAction(new LoginReqeust());
return loginResponseObservable;
}
})
.observeOn(AndroidSchedulers.mainThread()) // 给下面 执行主线程
.subscribe(new Observer<LoginResponse>() {
// 一定是主线程,为什么,因为 subscribe 马上调用onSubscribe
@Override
public void onSubscribe(Disposable d) {
progressDialog = new ProgressDialog(RequestActivity.this);
progressDialog.show();
// UI 操作
disposable = d;
}
@Override
public void onNext(LoginResponse loginResponse) {
// TODO 4.登录完成之后,更新登录的UI
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
if (progressDialog != null) {
progressDialog.dismiss();
}
}
});
- doOnNext() :操作符的作用是,允许我们在每次输出一个元素之前做一些额外的事情,但不会使响应事件终止。比如:注册API 之后进行刷新UI,然后拿着注册信息进行登录操作。
五、自定义操作符(View点击事件)
1、定义View的点击事件
public class RxView {
private final static String TAG = RxView.class.getSimpleName();
// 我们自己的操作符 == 函数
public static Observable<Object> clicks(View view) {
return new ViewClickObservable(view);
}
}
2、Observable 包装类
Observable 实现类ViewClickObservable主要作用是订阅Observer,然后定义一个包袱把View和Observer进行封装。
public class ViewClickObservable extends Observable<Object> {
private final View view;
private static final Object EVENT = new Object();
private static Object EVENT2;
public ViewClickObservable(View view) {
this.view = view;
EVENT2 = view;
}
@Override
protected void subscribeActual(Observer<? super Object> observer) {
// 可以干自己的
MyListener myListener = new MyListener(view, observer);
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
//包裹
static final class MyListener implements View.OnClickListener, Disposable {
private final View view;
private Observer<Object> observer;
private final AtomicBoolean isDisposable = new AtomicBoolean();
public MyListener(View view, Observer<Object> observer) {
this.view = view;
this.observer = observer;
}
@Override
public void onClick(View v) {
if (isDisposed() == false) {
observer.onNext(EVENT);
}
}
// 如果用调用了 中断
@Override
public void dispose() {
// 如果没有中断过,才有资格, 取消view.setOnClickListener(null);
if (isDisposable.compareAndSet(false, true)) {
// 主线程 很好的中断
if (Looper.myLooper() == Looper.getMainLooper()) {
view.setOnClickListener(null);
} else { // 主线程,通过Handler的切换
/*new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
view.setOnClickListener(null);
}
};*/
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
@Override
public void run() {
view.setOnClickListener(null);
}
});
}
}
}
@Override
public boolean isDisposed() {
return isDisposable.get();
}
}
}
3、使用自定义类
// 给这个控件做防抖
Button button = findViewById(R.id.button);
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Derry");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(L.TAG, "accept: 终点:" + s);
}
});
}
});