Android开发经验谈Android开发Android技术知识

part06_Rxjava背压原理

2018-08-30  本文已影响20人  IT魔幻师

作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处:https://www.jianshu.com/p/23f74055e999


一、RxJava1与RxJava2 对比

二、背压

事件上游产生的事件高于事件下游消费的事件导致内存不断扩大
rxjava1并没有对这个的解决方案
rxjava2
添加了一个新的被观察者角色操作符Flowable所有的Observable操作都可以用Flowable替换

什么时候用 Observable:
一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
如果式 鼠标事件,频率不超过1000 Hz,基本上不会背压;

什么时候用 Flowable:
处理以某种方式产生超过10K的元素;
文件读取与分析,例如 读取指定行数的请求;网络IO流;
有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的。

三、背压策略

1.BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常
//阻塞时队列
2.BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理
3.BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃
4.BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件

四、Flowable的使用

   @Test
    public void testFlowable() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 1000000; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribe(new FlowableSubscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                //使用Flowable需要在此处给其一个最大的事件处理能力
                //设置为最大的处理能力
                s.request(500);
//                s.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                //模拟处理
                try {
                    Thread.currentThread().sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("处理事件:"+integer);

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
上一篇 下一篇

猜你喜欢

热点阅读