RxJava原理分析

2020-09-11  本文已影响0人  雷涛赛文

      之前看有的项目上使用到了RxJava,最近比较系统的学习了一下,将学习收获整理一下。

一.RxJava简介

a.定义
定义 作用 特点
一个基于事件流、实现
异步操作的库
实现异步操作
类似与AyncTask、Handler的使用
逻辑简洁
实现优雅:基于事件流的链式调用
使用简单:随着程序的复杂,依然保持简洁优雅
b.思路

       被观察者 (Observable) 通过订阅(Subscribe)按顺序发送事件给观察者(Observer)
       观察者(Observer)按顺序接收事件&作出对应的响应动作。

二.使用流程

a:创建被观察者(Observable)&定义需发送的事件
Observable<String> obervable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("事件1");
        e.onNext("事件2");
        e.onNext("事件3");
    }
});
b:创建观察者(Observer) & 定义响应事件的行为
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //该方法最先调用
    }

    @Override
    public void onNext(String value) {
       //事件接收
       Log.e(TAG, "value is: " + value);
    }

    @Override
    public void onError(Throwable e) {
       //异常回调
    }

    @Override
    public void onComplete() {
      //事件发送完毕回调
    }
}   

       从方法名字可以得到:事件的接收处理是在onNext()里面,异常处理是在onError()里面,事件发送完毕处理是在onComplete()里面,事件的最早触发是在onSubscribe()里面[后面源码会分析到];

c:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);

       Observable的subscribe()具备多个重载的方法,可以灵活运用,来实现自己的需求;

//表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe() {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者对被观察者发送的任何事件都作出响应(一般采用此种方式)
public final void subscribe(Observer<? super T> observer) {}

       通过subscribe传入Consumer实例时,执行对应事件的响应回调都是在accept里面进行处理;

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

       输出以下结果:

value is: 事件1
value is: 事件2
value is: 事件3

       以上就完成了基于事件流的链式调用过程,接下来通过源码来分析一下详细工作过程:

三.源码分析

a:创建被观察者(Observable)& 定义需发送的事件
//调用create时创建Observable
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //最终是创建了ObservableCreate类对象 
    //将创建的ObservableOnSubscribe对象传入 
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

       通过以上可以看到,通过create()方法,创建了ObservableCreate对象,然后将ObservableOnSubscribe变量source作为参数传入。
       接下来看一下ObservableCreate的实现:

//ObservableCreate类,继承Observable类
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //复写了subscribeActual()
    //作用:订阅时,通过接口回调调用Observerable与Observer的方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //先调用oberver的onSubscribe方法
        //印证了上述onSubscribe先调用的猜想
        observer.onSubscribe(parent);

        try {
            //调用创建的ObservableOnSubscribe对象的subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    //执行e.onNext("事件1")方法
    static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;
        //observer是订阅前创建的Observer
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //发送的事件不可为空
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //若无断开连接(未调用Disposable.dispose())
            if (!isDisposed()) {
                //调用Observer的onNext()方法
                observer.onNext(t);
            }
        }

        // onError()及onComplete()方法调用后都会调用dispose();就不会发送onNext()事件了
        // 印证了Observer在收到onError()或onComplete()后,就不会再收到onNext()了
        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

       通过源码可以看到在执行Observable.create()实际上是创建了一个ObservableCreate对象,并将创建的ObservableOnSubscribe对象作为参数传入,复写了subscribeActual()方法,在方法内创建了携带Observer的CreateEmitter对象,并分别回调了Observer的onSubscribe,ObservableOnSubscribe的subscribe方法。在create时,仅仅是定义,即:subscribeActual()此时还未被回调。

b:创建观察者(Observer) & 定义响应事件的行为
//Observer.java
public interface Observer<T> {
    //Observer是一个接口
    // 接口内含4个方法,分别用于响应对应于被观察者发送的不同事件
    void onSubscribe(Disposable d); // 内部参数:Disposable 对象,可结束事件
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //该方法最先调用
    }

    @Override
    public void onNext(String value) {
       //事件接收
       Log.e(TAG, "value is: " + value);
    }

    @Override
    public void onError(Throwable e) {
       //异常回调
    }

    @Override
    public void onComplete() {
      //事件发送完毕回调
    }
}   
c:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //subscribeActual属于抽象方法,由子类实现;即由创建Observable时创建的ObservableCreate类对象
        //即在调用subscribe时,实际上是调用了创建Observable时创建的ObservableCreate类对象里面的subscribeActual()方法
        //印证了前面所说的在Observable.create()时未执行subscribeActual()
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
    }
}
//抽象方法,需要非抽象子类实现
protected abstract void subscribeActual(Observer<? super T> observer);

       从源码可以看到,在执行subscribe()时,最终是调用了ObservableCreate类里面的subscribeActual()方法,在subscribeActual()方法内先是调用了Observer的onSubscribe(),接下来调用source即:ObservableOnSubscribe复写的subscribe()方法,在ObservableOnSubscribe的subscribe()内调用了ObservableEmitter的onNext()、onError()等方法,就开始了事件流的执行。
      总结

步骤 逻辑实现 源码分析
创建Observable&定义发送事件 1.调用Observable.create()
2.创建ObservableOnSubscribe对象
3.复写subscribe()
1.创建ObservableCreate类对象
2.复写subscribeActual()方法
创建Observer&定义响应事件的行为 1.创建Observer类对象
2.复写onSubscribe(),onNext(),onError(),onComplete()方法
1.Observer是一个接口
2.接口内有四个方法,分别响应Observable发送的不同事件
subscribe订阅 调用Observable.subscribe(observer) 1.最终调用Observable的子类对象ObservableCreate类的subscribeActual()方法,主要调用如下:
a.创建CreateEmitter对象;
b. 调用ObservableOnSubscribe对象复写的subscribe();
c.调用Observer复写的onSubscribe();
d.在subscribe()里面调用onNext(),onError(),onComplete()再回调Observer复写的对应方法;

      用一张类图来表示一下类之间的联系

1.png

四.线程切换

observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
            .observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果

      RxJava2的订阅原理是执行subscribe()时从下往上依次调用Observable的各个子类的subscribeActual()方法,在最上层调用onNext()等方法时,会从上往下依次调用Observer的onNext()等方法,最终会调用app传入的observer的next()等方法。

a.subscribeOn(Schedulers.io())

      首先subscribeOn(Schedulers.io())最终会调用ObservableSubscribeOn.subscribeActual()方法,内部是将source.subscribe()放到一个Runnable执行,该source就是ObservableCreate(),即会调用到ObservableCreate.subscribeActual(),最终会调用到ObservableOnSubscribe.subscribe(CreateEmitter):

//Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

//ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    ......
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        //切换线程
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                //在新线程里面执行subscribe
                source.subscribe(parent);
            }
        }));
    }
    ......
}

      Schedulers.io()返回一个IoScheduler,该类继承Scheduler,看一下scheduleDirect(new Runnable())执行了什么操作:

//IoScheduler.java从Scheduler.java继承
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

      通过以上可以看到,在执行scheduleDirect(x,x,x)后,会先执行createWorker(),接着执行w.schedule(Runnable),看一下实现逻辑:

//IoScheduler.java
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

    static final class EventLoopWorker extends Scheduler.Worker {
        ......
        ......
        @Override
        public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
            ......
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

//NewThreadWorker.java
   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

      可以看到,把传入的Runnable封装成为一个ScheduleRunnable对象。并把这个对象放入线程池中去执行,执行的时候会运行ScheduleRunnable的run方法,最终又会调用ObservableSubscribeOn的run方法,进而调用source.subscribe(),至此subscribeOn()的线程切换就完成了。

b.observeOn(AndroidSchedulers.mainThread())

      observeOn()时会传入AndroidSchedulers.mainThread(),会创建HandlerScheduler,然后创建Handler,将主线程的Looper传入Handler,后续消息队列都是在主线程执行的,看一下具体逻辑:

//AndroidSchedulers.java
public final class AndroidSchedulers {

    private static final class MainHolder {
        //创建HandlerScheduler,内部持有主线程Handler的引用
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    ......
}

      然后在Observable.observeOn()时会创建ObservableObserveOn,并把上述创建的HandlerScheduler传入,先看一下ObservableObserveOn的逻辑实现:

//ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    ......

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
         implements Observer<T>, Runnable {
        .......
        .......
        @Override
        public void onNext(T t) {
            ........
            //在onNext()中执行schdule()
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //执行worker的schedule方法,该worker是HandlerScheduler中的HandlerWorker,后面会讲到
                worker.schedule(this);
            }
        }

        void drainNormal() {
             ......
             //a就是app传入的Observer
             a.onNext(v);
             ......
        }

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
}

       当执行subscribe(observer)时,会先调用ObservableObserveOn. subscribeActual(),从上面可以看到:
       会先执行scheduler.createWorker(),这个Worker 对象实际上是在AndroidSchedulers.mainThread()内部的HandlerScheduler中生成的,接下来会讲到;
       然后执行source.subscribe(ObserveOnObserver),该ObserveOnObserver对app传入的observer进行了封装,当最上层调用onNext()等方法后,会最终调用到ObserveOnObserver内部onNext()等方法,从上面逻辑实现可以看到,进而会调用schedule()---->worker.schedule(this)[ObserveOnObserver本身是一个Runnable],该worker就是HandlerWorker,接下来执行到HandlerWorker的schedule(x,x,x),这里面会有一个主线程的Handler对象,然后把特定的线程任务[ObserveOnObserver]通过handler.sendMessageDelayed()方法转移到主线程中去执行,一起看一下HandlerScheduler的实现逻辑:

final class HandlerScheduler extends Scheduler {
    ........
    @Override
    public Worker createWorker() {
        //该handler是主线程的handler
        return new HandlerWorker(handler);
    }

   private static final class HandlerWorker extends Worker {
        ......

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            .......
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
            .......
            return scheduled;
        }
        ........
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        .......

        @Override
        public void run() {
            try {
                 //该delegate就是ObserveOnObserver[本身就是一个runnable],最终在主线程调用run()
                delegate.run();
            }
            .......
    }

       最后会在主线程里面执行observer的onNext()等方法,以上就是observer线程切换。
       简单总结一下:
       subscribe(observer)后涉及到Observable类的执行顺序:ObservableObserveOn-->ObservableSubscribeOn-->ObservableCreate-->ObservableOnSubscribe.subscribe(CreateEmitter e);
       e.onNext()后涉及到Observer类的执行顺序:CreateEmitter-->SubscribeOnObserver-->ObserveOnObserver-->Observer(app)

五.实例分析

       由于RxJava是开源的库,要想使用的话需要添加依赖,在AndroidStudio里面添加如下:

// Android 支持 Rxjava,使用RxJava2的版本
implementation 'io.reactivex.rxjava2:rxjava:2.0.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
a.简单事件流处理
Observable.create(new ObservableOnSubscribe<String>() {
     @Override
     public void subscribe(ObservableEmitter<String> e) throws Exception {
         e.onNext("你好");
         e.onNext("RxJava");
         e.onNext("今天学习一下RxJava");
     }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  //Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
  .subscribe(new Observer<String>() {
         private Disposable mDisposable;
         @Override
         public void onSubscribe(Disposable d) {
             // 该方法最先调用
             Log.e(TAG, "-----onSubscribe()------");
             mDisposable = d;
         }

         @Override
         public void onNext(String value) {
             Log.e(TAG, "-----onNext(): " + value);
             //1.dispose()后,observer就不再接收后面的消息,即"今天学习一下RxJava"接收不到了
             if (value.equals("RxJava")) {
                mDisposable.dispose();
             }
            //2.在收到onComplete()或onError()之后,就不会回调该方法了
         }

        @Override
        public void onError(Throwable e) {

        }

        @Override
         public void onComplete() {
              Log.e(TAG, "-----onComplete()");
         }
});

/**
  *以下两种方式可以代替e.onNext("你好");e.onNext("RxJava");e.onNext("今天学习一下RxJava");
  */
//1.Observable observable = Observable.just("你好","RxJava","今天学习一下RxJava");
//String[] words = {"你好","RxJava","今天学习一下RxJava"};
//2.Observable observable = Observable.fromArray(words);
未加dispose()输出结果为:
-----onNext():你好
-----onNext():RxJava
-----onNext():今天学习一下RxJava
加dispose()输出结果为:
-----onNext():你好
-----onNext():RxJava
b.Retrofit+RxJava网络请求

      Retrofit是square开源的网络Restful请求框架,底层是基于okhttp的,开发者只需要定义接口就可以了,Retrofit提供了注解可以表示该接口请求的请求方式、参数、url等。定义好了接口以后,在调用该远程接口的时候直接使用该接口就好像通过RPC方式使用本地类一样方便。
      1.加入依赖

// Android 支持 Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.1.0'
// 衔接 Retrofit & RxJava,要注意使用RxJava2的版本
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
// 支持Gson解析
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'

      2.创建接收服务器返回数据的类
      根据请求返回的Json数据格式,来定义与其对应的类,本实例是参考天气查询,返回的数据格式如下:

{"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
                      "city":"青岛",
                      "forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
                      "ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
"status":1000,
"desc":"OK"}

      创建返回数据对应的类WeatherInfo.java

public class WeatherInfo {
    private DataBean data;
    private int status;
    private String desc;

    public DataBean getData() {
        return data;
    }

    public void setData(DataBean data) {
        this.data = data;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    public static class DataBean {
        private YesterdayBean yesterday;
        private String city;
        private String aqi;
        private String ganmao;
        private String wendu;
        private List<ForecastBean> forecast;

        public YesterdayBean getYesterday() {
            return yesterday;
        }

        public void setYesterday(YesterdayBean yesterday) {
            this.yesterday = yesterday;
        }

        public String getCity() {
            return city;
        }

        public void setCity(String city) {
            this.city = city;
        }

        public String getAqi() {
            return aqi;
        }

        public void setAqi(String aqi) {
            this.aqi = aqi;
        }

        public String getGanmao() {
            return ganmao;
        }

        public void setGanmao(String ganmao) {
            this.ganmao = ganmao;
        }

        public String getWendu() {
            return wendu;
        }

        public void setWendu(String wendu) {
            this.wendu = wendu;
        }

        public List<ForecastBean> getForecast() {
            return forecast;
        }

        public void setForecast(List<ForecastBean> forecast) {
            this.forecast = forecast;
        }

        public static class YesterdayBean {

            private String date;
            private String high;
            private String fx;
            private String low;
            private String fl;
            private String type;

            public String getDate() {
                return date;
            }

            public void setDate(String date) {
                this.date = date;
            }

            public String getHigh() {
                return high;
            }

            public void setHigh(String high) {
                this.high = high;
            }

            public String getFx() {
                return fx;
            }

            public void setFx(String fx) {
                this.fx = fx;
            }

            public String getLow() {
                return low;
            }

            public void setLow(String low) {
                this.low = low;
            }

            public String getFl() {
                return fl;
            }

            public void setFl(String fl) {
                this.fl = fl;
            }

            public String getType() {
                return type;
            }

            public void setType(String type) {
                this.type = type;
            }

            public String toString() {
                return "{" + "\"date\":\"" + getDate() + "\"," + "\"high\":\"" + getHigh() + "\","
                        + "\"fx\":\"" + getFx() + "\"," + "\"low\":\"" + getLow() + "\","
                        + "\"fl\":\"" + getFl() +
                        "\"," + "\"type\":\"" + getType() + "\"}";
            }
        }

        public static class ForecastBean {

            private String date;
            private String high;
            private String fengli;
            private String low;
            private String fengxiang;
            private String type;

            public String getDate() {
                return date;
            }

            public void setDate(String date) {
                this.date = date;
            }

            public String getHigh() {
                return high;
            }

            public void setHigh(String high) {
                this.high = high;
            }

            public String getFengli() {
                return fengli;
            }

            public void setFengli(String fengli) {
                this.fengli = fengli;
            }

            public String getLow() {
                return low;
            }

            public void setLow(String low) {
                this.low = low;
            }

            public String getFengxiang() {
                return fengxiang;
            }

            public void setFengxiang(String fengxiang) {
                this.fengxiang = fengxiang;
            }

            public String getType() {
                return type;
            }

            public void setType(String type) {
                this.type = type;
            }

            public String toString() {
                return "{" + "\"date\":\"" + getDate() + "\"," + "\"fengli\":\"" + getFengli()
                        + "\"," + "\"dengxiang\":\"" + getFengxiang() + "\"," + "\"high\":\""
                        + getHigh() + "\"," + "\"low\":\"" + getLow() + "\"," + "\"type\":\""
                        + getType() + "\"}";
            }
        }

        public String toString() {
            String s = "";
            for (ForecastBean fb : forecast) {
                s += fb.toString() + ",";
            }
            return "\"data\":" + "{" + "\"yesterday\":" + getYesterday().toString() + "," +
                    "\"city\":\"" + getCity() + "\"," + "\"forest\":" + "[" + s + "]" + "," + 
                    "\"ganmao\":\"" + getGanmao() + "\"," + "\"wendu\":\"" + getWendu() + "\""
                    + "}";
        }
    }

    public String toString() {
        return "{" + getData().toString() + "," + "\"status\":" + getStatus() + "," + "\"desc\":\""
                + getDesc() + "\"" + "}";
    }
}

      3.创建用于描述网络请求的接口RetrofitApi
      采用注解 + Observable<...>接口描述网络请求参数

public interface RetrofitApi {

    /**
     * 注解里传入 网络请求 的部分URL地址
     * Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
     * 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
     * 采用Observable<...>接口
     * getWeatherInfo()是接受网络请求数据的方法
     */
    @GET("weather_mini")
    Observable<WeatherInfo> getWeatherInfo(@Query("city") String city);
   
     //可以直接在GET中加入请求的参数
    @GET("weather_mini?city=青岛")
    Observable<WeatherInfo> getWeatherInfoTwo();
}

      4.创建Retrofit实例及执行请求

private void getWeatherInfo() {
        //1.创建Retrofit对象
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://wthrcdn.etouch.cn/") // 设置网络请求Url
                .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(可将得到的Json串转换为对应的WeatherInfo类)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                .build();
        //2.创建网络请求接口的实例
        final RetrofitApi request = retrofit.create(RetrofitApi.class);
        //3.采用Observable<...>形式对网络请求进行封装
        Observable<WeatherInfo> observable = request.getWeatherInfo("青岛");
        //4.通过线程切换发送网络请求
        observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
                .observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果
                .subscribe(new Observer<WeatherInfo>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(WeatherInfo result) {
                        //5.接收服务器返回的数据
                        Log.e(TAG, "Weather info is: " + result.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "请求失败 : " + e.toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

      请求后onNext()里面打印如下:

Weather info is: {"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
                      "city":"青岛",
                      "forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
                      "ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
"status":1000,
"desc":"OK"}
c.功能防抖及联想搜索优化

      功能防抖主要是为了频繁点击只处理其中一次事件;联想搜索优化主要是为了在输入文字时不频繁去服务器进行请求。以上两种实现都是在observer中对onNext()事件进行拦截处理,看是否满足条件,满足就执行下一步observer.onNext(),不满足就不往下执行。
      /*对控件点击及文字输入进行监听,采用了RxBinding,需要加入依赖

// Rxbinding
implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'

      功能防抖实现:

//使用throttleFirst(1, TimeUnit.SECONDS):参数1:指定的时间段内;参数2:指定时间的单位
//在1s内多次点击,只响应第一次
private void throttleOperation() {
    RxView.clicks(clickBtn).throttleFirst(1, TimeUnit.SECONDS)
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(
                new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object value) {
                        Log.e(TAG, "执行点击事件");
                        getWeatherInfo();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e("Seven", "对throttle Complete事件作出响应");
                    }
                });
    }
//频繁点击,输出如下:
09-11 10:19:15.138 E/TAG   (31259): 执行点击事件
09-11 10:19:16.221 E/TAG   (31259): 执行点击事件
09-11 10:19:17.306 E/TAG   (31259): 执行点击事件

      看一下关键代码实现是在ObservableThrottleFirstTimed.java里面的DebounceTimedObserver类:

static final class DebounceTimedObserver<T>
    extends AtomicReference<Disposable>
    implements Observer<T>, Disposable, Runnable {
        ......
        ......
        @Override
        public void onNext(T t) {
            if (!gate && !done) {
                gate = true;

                actual.onNext(t);

                Disposable d = get();
                if (d != null) {
                    d.dispose();
                }
                DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
            }
        }

        @Override
        public void run() {
            gate = false;
        }
        ......
        ......
    }

      通过以上逻辑可以看到,在首次执行完onNext()后,会将gate设为true,然后执行worker.schedule(this,timeout, unit),延时timeout执行该runnable,在run()中把gate设为false,下次onNext()就可以执行了。
      联想搜索优化:

//联想搜索优化,根据指定时间过滤事件的过滤操作符
//比如在搜索框输入文字时,在指定时间内不再有文字输入时,才会发送请求,否则不发送
//若在这段时间内,输入框有文字输入或变化,则继续等待该段时间,循环上述过程
private void debounceOperation() {
    RxTextView.textChanges(editTxt)
                .debounce(1, TimeUnit.SECONDS).skip(1)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<CharSequence>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(CharSequence charSequence) {
                        resultTxt.setText("服务器请求字符串 = " + charSequence.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                      
                    }

                    @Override
                    public void onComplete() {

                   }
                });
    }

      看一下关键代码实现是在ObservableDebounceTimed.java里面的DebounceTimedObserver类:

static final class DebounceTimedObserver<T> implements Observer<T>, Disposable {
        ......
        ......

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            long idx = index + 1;
            index = idx;
            Disposable d = timer.get();
            if (d != null) {
                d.dispose();
            }
            DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
            if (timer.compareAndSet(d, de)) {
                d = worker.schedule(de, timeout, unit);
                de.setResource(d);
            }

        }
        ......
        ......
        void emit(long idx, T t, DebounceEmitter<T> emitter) {
            if (idx == index) {
                actual.onNext(t);
                emitter.dispose();
            }
        }
    }

    static final class DebounceEmitter<T> extends AtomicReference<Disposable> implements Runnable, Disposable {
        ......
        ......
        @Override
        public void run() {
            if (once.compareAndSet(false, true)) {
                parent.emit(idx, value, this);
            }
        }
        .......
    }

      通过以上逻辑可以看到,在执行onNext()时并没有直接执行下一个observer.onNext(),而是执行了worker.schedule(de,timeout, unit),延时timeout执行该runnable,在run()中执行该observer的emit(),然后在执行下一个observer的onNext()。

d.操作符使用案例**

      merge()

/**
 * 使用Merge操作符,合并两个Observable
 * 如果两个Observable<T>,T是相同的类型,比如String, 那么在subscribe里面的new Observer<String>和 onNext(String)
 * 如果两个Observable<T>,T不是相同的类型,如下: 那么在subscribe里面的new Observer<Object>和 onNext(Object)即可
 */
private void mergeOperation() {
    //设置第1个Observable:获取字符串
    Observable<String> strOb = Observable.just("字符串");

    //设置第2个Observable:请求获取整数
    Observable<Integer> intOb = Observable.just(1000);

    //通过merge()合并事件 & 同时发送事件
    Observable.merge(strOb, intOb)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object value) {
                        if (value instanceof String) {
                            result += value;
                        } else {
                            Log.d(TAG, "数据为: " + (Integer)value);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    // 接收合并事件后,统一展示
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "获取数据完成");
                        setMergeTxt(result);
                    }
                });
    }
//打印如下:
09-11 10:31:14.873 D/TAG   (31259): 数据为: 1000
09-11 10:31:14.873 D/TAG   (31259): 获取数据完成
09-11 10:31:14.875 D/TAG   (31259): result is: 字符串

      zip()

@SuppressLint("CheckResult")
private void zipOperation() {
    Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://fy.iciba.com/")
                .addConverterFactory(GsonConverterFactory.create()) 
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();

    RetrofitApi request = retrofit.create(RetrofitApi.class);

    //采用Observable<...>形式 对 2个网络请求 进行封装
    //即2个网络请求异步 & 同时发送
    Observable<Translation> observable = request.getTranslation().subscribeOn(
                Schedulers.io()); // 新开线程进行网络请求1
    Observable<Translation1> observable1 = request.getTranslationTwo().subscribeOn(
                Schedulers.io());// 新开线程进行网络请求2

    // 通过使用Zip()对两个网络请求进行合并再发送
    Observable.zip(observable, observable1,
                new BiFunction<Translation, Translation1, String>() {
                    // 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
                    @Override
                    public String apply(Translation translation,
                            Translation1 translation1) throws Exception {
                        return translation.show() + " & " + translation1.show();
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    // 成功返回数据时调用, Consumer<T>, T对应apply的返回值,apply对应BiFunction最后的参数类型
                    @Override
                    public void accept(String combineInfo) throws Exception {
                        // 结合显示2个网络请求的数据结果
                        Log.d(TAG, "最终接收到的数据是:" + combineInfo);
                        setZipTxt(combineInfo);
                    }
                }, new Consumer<Throwable>() {
                    // 网络请求错误时调用
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "请求失败: " + throwable.toString() );
                    }
    });
}
e.其他

      RxJava对应很多操作符供开发使用,详情请参考大神整理的以下表格:


RxJava操作符.png

六.总结

      基于事件流的异步操作库,事件流是通过subscribe触发的(实际上是subscribeActual),每个Observable子类的subscribeActual实现逻辑不同。
      从下往上订阅,不断执行Observable的subscribeActual();
      事件从上往下发射,不断执行Observer的onNext()等方法。
非常感谢网上大神整理的一些文章,学习之后记录下来备后续使用。

上一篇下一篇

猜你喜欢

热点阅读