Rxjava2应用篇

2018-12-21  本文已影响0人  钉某人
Rxjava2实例.png

Github地址:https://github.com/DingMouRen/RxJava2ExamplesDemo

1.后台耗时操作,前台实时更新UI(下载文件,实时更新进度)

final FileCallBack<ResponseBody> fileCallBack = new FileCallBack<ResponseBody>("","") {
            @Override
            public void onSuccess(ResponseBody responseBody) {
                Log.e(TAG,"onSuccess:"+responseBody.toString());
            }

            @Override
            public void progress(long progress, long total) {
                Log.e(TAG,total+"/"+progress);
                mProgressBar.setMax((int) total);
                mProgressBar.setProgress((int) progress);

                DecimalFormat decimalFormat = new DecimalFormat("0.00");
                String scaleStr = decimalFormat.format(progress * 1f/ total );
                mTvProgress.setText( (int)(Float.parseFloat(scaleStr) * 100) +"%");
            }

            @Override
            public void onStart(Disposable disposable) {

            }

            @Override
            public void onCompleted() {
                Log.e(TAG,"onComplete");
                show("下载完成");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,"onError:"+e.getMessage());
            }
        };

        HttpManager.createService(Api.class,new ProgressInterceptor())
                .downloadApk()
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<ResponseBody>() {
                    @Override
                    public void accept(ResponseBody responseBody) throws Exception {
//                                    fileCallBack.saveFile(responseBody);
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FileDownloadObserver<ResponseBody>(fileCallBack));

2.计算一段时间内数据的平均值

 /**
     * rxjava处理
     */
    private void rxjavaCompose() {
        mPublishSubject = PublishSubject.create();
        DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() {
            @Override
            public void onNext(List<Double> temperatureList) {
                double resultSum = 0;//温度的和
                double resultAvera = 0;
                Log.e("onNext","接收到集合的大小:"+temperatureList.size());
                if (temperatureList.size() > 0){
                    for(Double temperature : temperatureList){
                        resultSum += temperature;
                    }
                    resultAvera = resultSum / temperatureList.size();
                }
                Log.e(mActivity.getClass().getSimpleName(),"更新平均温度:"+resultAvera);

                final double finalResultAvera = resultAvera;
                mTvAveraTemperature.post(new Runnable() {
                    @Override
                    public void run() {
                        mTvAveraTemperature.append("平均3秒温度:"+ finalResultAvera +"℃   时间:"+ new Date().toLocaleString()+"\n");
                        int scrollAmount = mTvAveraTemperature.getLayout().getLineTop(mTvAveraTemperature.getLineCount()) - mTvAveraTemperature.getHeight();
                        if (scrollAmount > 0){
                            mTvAveraTemperature.scrollTo(0,scrollAmount);
                        }else {
                            mTvAveraTemperature.scrollTo(0,0);
                        }
                    }
                });
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable = new CompositeDisposable();//用于管理订阅与解除订阅
        mCompositeDisposable.add(disposableObserver);

3.搜索联想功能优化

需要优化的问题

 /**
     * 初始化Observable
     */
    private void initObservable() {

        mPublishSubject = PublishSubject.create();

        mDisposableObserver = new DisposableObserver<MyResponse<String>>() {//Disposable是一个抽象的观察者,可以通过disposable进行异步取消
            @Override
            public void onNext(MyResponse<String> myResponse) {
                Gson gson = new Gson();
                mTvLog.setText(JsonUtils.formatJson(gson.toJson(myResponse)));

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        mPublishSubject.debounce(200, TimeUnit.MILLISECONDS)//不会发射时间间隔小于200毫秒的,
                .filter(new Predicate<String>() {//过滤操作符,只有字符串长度大于0才能发射
                    @Override
                    public boolean test(String s) throws Exception {
                        return s.length() > 0;
                    }
                }).switchMap(new Function<String, ObservableSource<MyResponse<String>>>() {//switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果
            @Override
            public ObservableSource<MyResponse<String >> apply(String s) throws Exception {
                return HttpManager.createService(Api.class).search(s).subscribeOn(Schedulers.io());
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(mDisposableObserver);

        mCompositeDisposable = new CompositeDisposable();//用于取消订阅关系

        mCompositeDisposable.add(mDisposableObserver);//添加到订阅关系
    }

4.轮询操作

应用场景:有的时候需要我们尝试间隔一段时间就向服务器发起一次请求,但是又不适合引入长连接的场景。

可以使用intervalRange操作符,参数含义:

 /**
     * 固定时间间隔的轮询
     */
    private void startFixPolling() {

        Observable<MyResponse<String>> observableFix = Observable.intervalRange(0,5,0,1000, TimeUnit.MILLISECONDS)
                .take(5)
                .flatMap(new Function<Long, ObservableSource<MyResponse<String>>>() {
                    @Override
                    public ObservableSource<MyResponse<String>> apply(Long aLong) throws Exception {
                        return HttpManager.createService(Api.class).polling().subscribeOn(Schedulers.io());
                    }
                });

        DisposableObserver<MyResponse<String>> disposableObserverFix = new DisposableObserver<MyResponse<String>>() {
            @Override
            public void onNext(MyResponse<String> response) {
                mTvFix.append(response.data+"\n");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        observableFix.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(disposableObserverFix);

        mCompositeDisposable.add(disposableObserverFix);
    }

5.根据错误类型进行重试请求

网络请求出错,重试的情况下需要处理的问题:

retryWhen操作符可以实现重新订阅,由onError事件来触发。

 private void retryDemo() {
        Observable<MyResponse<String>> observable = HttpManager.createService(Api.class).retry()
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            private int mRetryCount;
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {

                        long waitTime = 0;//等待时间
                        if (throwable instanceof ConnectException){
                            mainThreadTextChange("ConnectException异常\n");
                            waitTime = 2000;
                        }
                        mRetryCount++;
                        if (waitTime > 0){
                            mainThreadTextChange("2秒后重新发起请求\n");
                        }
                        return waitTime > 0 && mRetryCount <= 4 ?
                                Observable.timer(waitTime,TimeUnit.MILLISECONDS):
                                Observable.error(throwable);
                    }
                });
            }
        });

        DisposableObserver<MyResponse<String>> disposableObserver = new DisposableObserver<MyResponse<String>>() {
            @Override
            public void onNext(MyResponse<String> response) {
                Gson gson = new Gson();
                mTv.append("onNext:\n"+ JsonUtils.formatJson(gson.toJson(response))+"\n");
            }

            @Override
            public void onError(Throwable e) {
                mTv.append("onError:"+e.getMessage()+"\n");
            }

            @Override
            public void onComplete() {
                mTv.append("onComplete\n");
            }
        };

        observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(disposableObserver);

        mCompositeDisposable.add(disposableObserver);
    }

    private void mainThreadTextChange(final String content){
        runOnUiThread(new Runnable() {
            @Override
            public void run() {
                mTv.append(content);
            }
        });
    }

6.多个表单的验证

应用场景:登录场景中,需要账户是一定的长度,密码也有特定的长度,此时使用操作符combineLatest来实现需求。

combineLatest可以接受多个Observable和一个函数作为参数。当其中的任意一个Observable发射数据后,会去获取其他的Observable最后一次发射的数据,回调到函数中。(此函数回调的前提是都至少发射过一次数据)

  private void initRxjava2() {
        mCompositeDisposable = new CompositeDisposable();
        mAccoountPublishSubject = PublishSubject.create();
        mPwdPublishSubject = PublishSubject.create();

        Observable<Boolean> observable = Observable.combineLatest(mAccoountPublishSubject, mPwdPublishSubject,
                new BiFunction<String, String, Boolean>() {
                    @Override
                    public Boolean apply(String account, String pwd) throws Exception {
                        int nameLength = account.length();
                        int pwdLength = pwd.length();
                        return (nameLength >=3 && nameLength <=5) && (pwdLength >=6 && pwdLength <=10);
                    }
                });
        DisposableObserver<Boolean> disposableObserver = new DisposableObserver<Boolean>() {
            @Override
            public void onNext(Boolean aBoolean) {
                if (aBoolean){//两个输入框的内容都符合要求
                    mTvLogin.setEnabled(true);
                    mTvLogin.setBackgroundColor(Color.GREEN);
                }else {//两个输入框有不符合要求的内容
                    mTvLogin.setEnabled(false);
                    mTvLogin.setBackgroundColor(Color.GRAY);
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        observable.subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);

    }

    @Override
    public void initListener() {
        mEditAccount.addTextChangedListener(new EditTextWatcher(mAccoountPublishSubject));
        mEditPwd.addTextChangedListener(new EditTextWatcher(mPwdPublishSubject));

        mTvLogin.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Toast.makeText(mActivity,"登录成功",Toast.LENGTH_SHORT).show();
            }
        });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mCompositeDisposable != null) mCompositeDisposable.clear();
    }

    /**
     * EditText的TextWatcher
     */
    public static class EditTextWatcher implements TextWatcher{

        private PublishSubject mPublishSubject;

        public EditTextWatcher(PublishSubject publishSubject){
            this.mPublishSubject = publishSubject;
        }

        @Override
        public void beforeTextChanged(CharSequence s, int start, int count, int after) {

        }

        @Override
        public void onTextChanged(CharSequence s, int start, int before, int count) {

        }

        @Override
        public void afterTextChanged(Editable s) {
            mPublishSubject.onNext(s.toString());
        }
    }

7.优先加载本地缓存,同时发起网络请求

应用场景:进入新页面,为了提升用户体验,再网络请求没有返回时,优先显示缓存数据。
要求:同时发起请求网络数据和加载本地缓存数据。在网络数据未返回时,显示本地缓存数据;网络数据返回时,显示最新的网络数据

几种实现方式的缺点:

好的实现方式:publish+merge+takeUntil

 /**
     * 优先加载本地缓存数据,同时请求网络数据
     */
    private void requestData(final long delayTimeLocal, long delayTimeNet) {

        mProgressBar.setVisibility(View.VISIBLE);

        Observable<MyResponse<List<CacheToNetData>>> observable =
                getNetData(delayTimeNet).publish(new Function<Observable<MyResponse<List<CacheToNetData>>>, ObservableSource<MyResponse<List<CacheToNetData>>>>() {
                    @Override
                    public ObservableSource<MyResponse<List<CacheToNetData>>> apply(Observable<MyResponse<List<CacheToNetData>>> netResponseObservable) throws Exception {

                        return Observable.merge(getLocalCacheData(delayTimeLocal),netResponseObservable )
                                .takeUntil(new Predicate<MyResponse<List<CacheToNetData>>>() {
                                    @Override
                                    public boolean test(MyResponse<List<CacheToNetData>> listMyResponse) throws Exception {
                                        mainThreadTextLog("获取到的数据类型:"+listMyResponse.msg);
                                        return listMyResponse.msg.equals("成功");
                                    }
                                });
                    }
                });


        DisposableObserver<MyResponse<List<CacheToNetData>>> disposableObserver =
                new DisposableObserver<MyResponse<List<CacheToNetData>>>() {
                    @Override
                    public void onNext(MyResponse<List<CacheToNetData>> listMyResponse) {

                        mProgressBar.setVisibility(View.GONE);

                        if (listMyResponse.code == 1) {
                            if (listMyResponse.msg.equals("本地数据")) {
                                mainThreadTextLog("onNext --- 加载了本地数据");
                            } else {
                                mainThreadTextLog("onNext --- 加载了网络数据");
                            }
                            mAdapter.setData(listMyResponse.data);
                        }


                    }

                    @Override
                    public void onError(Throwable e) {
                        mainThreadTextLog("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        mainThreadTextLog("onComplete");
                    }
                };

        observable.observeOn(AndroidSchedulers.mainThread())
                .subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mCompositeDisposable != null) mCompositeDisposable.clear();
    }

    /**
     * 获取本地缓存数据
     */
    public Observable<MyResponse<List<CacheToNetData>>> getLocalCacheData(final long delayTime) {
        return Observable.create(new ObservableOnSubscribe<MyResponse<List<CacheToNetData>>>() {
            @Override
            public void subscribe(ObservableEmitter<MyResponse<List<CacheToNetData>>> emitter) throws Exception {
                try {
                    mainThreadTextLog("开始加载本地缓存数据");

                    Thread.sleep(delayTime);

                    List<CacheToNetData> list = new ArrayList<>();

                    for (int i = 0; i < 10; i++) {
                        CacheToNetData bean = new CacheToNetData("来自本地缓存", "数据项 --- " + i);
                        list.add(bean);
                    }
                    mainThreadTextLog("结束加载本地缓存数据");
                    emitter.onNext(new MyResponse<List<CacheToNetData>>("本地数据", 1, list));
                    emitter.onComplete();


                } catch (Exception e) {
                    mainThreadTextLog("加载本地缓存数据异常:" + e.getMessage());
                    if (!emitter.isDisposed()) emitter.onError(e);
                }

            }
        }).subscribeOn(Schedulers.io());
    }

    /**
     * 获取网络数据
     *
     * @param delayTime
     * @return
     */
    public Observable<MyResponse<List<CacheToNetData>>> getNetData(long delayTime) {
        mainThreadTextLog("开始请求网络数据");
        return HttpManager.createService(Api.class)
                .getNetData(delayTime)
                .subscribeOn(Schedulers.io())
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends MyResponse<List<CacheToNetData>>>>() {
                    @Override
                    public ObservableSource<? extends MyResponse<List<CacheToNetData>>> apply(Throwable throwable) throws Exception {
                        mainThreadTextLog("请求网络数据失败:" + throwable.getMessage());
                        return Observable.never();
                    }
                });
    }

    /**
     * 主线程更新UI日志
     *
     * @param content
     */
    private void mainThreadTextLog(final String content) {
        mActivity.runOnUiThread(new Runnable() {
            @Override
            public void run() {
                mTvLog.append(content + "\n");
            }
        });
    }

8.倒计时

使用intervalRange操作符实现倒计时功能

   /**
     * 开始倒计时
     * @param countDownTimeLong
     */
    private void startCountDown(final long countDownTimeLong) {

        mCompositeDisposable.clear();

        Observable<Long> observable = Observable.intervalRange(0,countDownTimeLong + 1,0,1, TimeUnit.SECONDS);

        DisposableObserver<Long> disposableObserver = new DisposableObserver<Long>() {
            @Override
            public void onNext(final Long aLong) {
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        mTvTime.setText(formatDuring((countDownTimeLong - aLong) * 1000));
                    }
                });

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        observable.subscribe(disposableObserver);

        mCompositeDisposable.add(disposableObserver);

    }

9.嵌套请求

应用场景:第一个网络请求之后,再进行一次网络请求,才能拿到需要得数据。
例子:有两道门,宝藏在第二道门后面。需要先后打开两道门,必须打开第一道门之后,才能获取到第二道门得开门密码。

 private void requestData() {

        String inputStr = mEditText.getText().toString().trim();

        if (TextUtils.isEmpty(inputStr)){
            Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
            return;
        }
        int intputInt = Integer.parseInt(inputStr);

        HttpManager.createService(Api.class)
                .openFirstDoor(intputInt)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MyResponse<Nest1Bean>>() {
                    @Override
                    public void accept(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {

                        mainThreadTextLog("doOnNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest1BeanMyResponse)));
                    }
                })
                .observeOn(Schedulers.io())
                .flatMap(new Function<MyResponse<Nest1Bean>, ObservableSource<MyResponse<Nest2Bean>>>() {
                    @Override
                    public ObservableSource<MyResponse<Nest2Bean>> apply(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {
                        mainThreadTextLog("获取第二道门得密码,去打开第二道门");
                        return HttpManager.createService(Api.class).openSecondDoor(nest1BeanMyResponse.data.password);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MyResponse<Nest2Bean>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mainThreadTextLog("onSubscribe");
                    }

                    @Override
                    public void onNext(MyResponse<Nest2Bean> nest2BeanMyResponse) {
                        mainThreadTextLog("onNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest2BeanMyResponse)));
                    }

                    @Override
                    public void onError(Throwable e) {
                        mainThreadTextLog("onError:"+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        mainThreadTextLog("onComplete\n\n");
                    }
                });
    }

10.合并两个网络请求的数据

应用场景:有的时候我们需要的数据,可能需要请求两个不同的接口才能得到,使用zip操作符可以实现需求

 private void requestData() {
        String gradeStr = mEditGrade.getText().toString().trim();

        if (TextUtils.isEmpty(gradeStr)){
            Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
            return;
        }
        int gradeInt = Integer.parseInt(gradeStr);

        Observable<MyResponse<Teacher>> observableTeacher = HttpManager.createService(Api.class).getTeacher(gradeInt);

        Observable<MyResponse<List<Student>>> observableStudents = HttpManager.createService(Api.class).getStudents(gradeInt);

        Observable.zip(observableTeacher, observableStudents,
                new BiFunction<MyResponse<Teacher>, MyResponse<List<Student>>, ClassBean>() {
                    @Override
                    public ClassBean apply(MyResponse<Teacher> teacherMyResponse, MyResponse<List<Student>> studentListMyResponse) throws Exception {

                        mainThreadTextLog("请求到得老师数据:\n"+JsonUtils.formatJson(new Gson().toJson(teacherMyResponse))+
                                "\n请求到得学生数据:\n"+JsonUtils.formatJson(new Gson().toJson(studentListMyResponse)));

                        String teacherName = teacherMyResponse.data.name;
                        String grade = teacherMyResponse.data.grade;
                        List<Student> studentList = studentListMyResponse.data;
                        ClassBean classBean = new ClassBean(teacherName,grade,studentList);
                        return classBean;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<ClassBean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(ClassBean classBean) {
                        mainThreadTextLog("onNext合并后得数据:\n"+JsonUtils.formatJson(new Gson().toJson(classBean)));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        mainThreadTextLog("onComplete\n\n");
                    }
                });


    }
上一篇下一篇

猜你喜欢

热点阅读