安卓开源库源码学习Android架构Rx系列

Android Rxjava :最简单&全面背压讲解 (

2018-11-01  本文已影响101人  Linhaojian

1.前言

public void backpressureSample(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                int i = 0;
                while(true){
                    Thread.sleep(500);
                    i++;
                    e.onNext(i);
                    Log.i(TAG,"每500ms发送一次数据:"+i);
                }
            }
        }).subscribeOn(Schedulers.newThread())//使被观察者存在独立的线程执行
          .observeOn(Schedulers.newThread())//使观察者存在独立的线程执行
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  Thread.sleep(5000);
                  Log.e(TAG,"每5000m接收一次数据:"+integer);
              }
          });
    }

2.目录

目录.png

3.简介

简介.png

_______________________________________________________________________________

4.使用与原理详解

4.1 Flowable 与 Observable 的区别

 public void flowable(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<=150;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE); //观察者设置接收事件的数量,如果不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
    }

4.2 BackpressureStrategy媒体类

public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    .....
}

4.2.1. ERROR

4.2.2. BUFFER

4.2.3. DROP

4.2.4. LATEST

4.2.5. MISSING

4.3 onBackPressure相关操作符

 Flowable.interval(50,TimeUnit.MILLISECONDS)
        .onBackpressureDrop()//效果与Drop类型一样
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(aLong));
            }
        });

4.4 request()

4.4.1 request(int count):设置接收事件的数量.

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<50;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
request.png

4.4.2 request扩展使用

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<50;j++){
                    e.onNext(j);
                    Log.i(TAG," 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                if(integer==5){
                    subscription.request(3);
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
request扩展.png
总结:可以动态设置观察者接收事件的数量,但不影响被观察者继续发送事件。

4.5 requested

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0;j<15;j++){
                    e.onNext(j);
                    Log.i(TAG,e.requested()+" 发送数据:"+j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        },BackpressureStrategy.BUFFER)
//        .subscribeOn(Schedulers.newThread())
//        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
            }
            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.e(TAG,"onNext : "+(integer));
            }
            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError : "+t.toString());
            }
            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });
requested.png

5.总结

欢迎关注linhaojian_CSDN博客或者linhaojian_简书

不定期分享关于安卓开发的干货。


写技术文章初心

  • 技术知识积累
  • 技术知识巩固
  • 技术知识分享
  • 技术知识交流
上一篇下一篇

猜你喜欢

热点阅读