Android开发Android技术知识Android开发经验谈

RxJava

2019-04-05  本文已影响7人  Anwfly

1.什么是RxJava(ReactiveX.io链式编程)

RXJava是一个响应式编程框架,采用观察者设计模式,
观察者模式本身的目的就是『后台处理,前台回调』的异步机制

概述:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

优点:异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。Android 创造的AsyncTask和Handler,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。(函数风格、代码简单、异步错误处理、轻松使用并发)

2.观察者模式

被观察者

/**
 * 被观察者
 */
public class Watched{

    private List<Watcher > list = new ArrayList<>();

    //注册观察者
    @Override
    public void registerWatcher(Watcher watcher) {
        list.add(watcher);
    }

    //移除观察者
    @Override
    public void unregisterWatcher(Watcher watcher) {
        list.remove(watcher);
    }

    //清空观察者
    @Override
    public void clearWatcher() {
        list.clear();
    }

    //通知观察者
    @Override
    public void notifyWathers(String string) {
        for (Watcher watcher: list ) {
            watcher.update(string);
        }
    }
}

观察者

/**
 * 观察者
 */
public class Watcher {

    //用于观察者更新状态
    @Override
    public void update(String string) {

        System.out.println(Thread.currentThread().toString() + " : " + string);
    }

}

测试类

/**
 * 测试类
 */
public class MyClass {
    public static void main(String[] args){

        //观察者
        Watcher watcher1 = new Watcher();
        Watcher watcher2 = new Watcher();
        Watcher watcher3 = new Watcher();

        //被观察者
        Watched watched = new Watched();

        //被观察者注册观察者
        watched.registerWatcher(watcher1);
        watched.registerWatcher(watcher2);
        watched.registerWatcher(watcher3);

        //通知
        watched.notifyWathers("接收的数");

        //清空
        watched.clearWatcher();
    }
}

3.基本概念(观察者模式)

案例:按钮点击处理、广播注册

通过setOnClickListener()方法,Button持有OnClickListener的引用;当用户击时,Button自动调用OnClickListener的onClick()方法。 
Button——>被观察者
OnClickListener——>观察者
setOnClickListener ——>订阅
onClick ——>事件

RxJava 有3个基本概念:

  1.Observable(可观察者,即被观察者)
  2.Observer(观察者)
  3.subscribe(订阅)事件。

观察者模式

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

普通事件 
     onNext() 接收被观察者发送的消息
特殊的事件:
     onCompleted() 事件队列完结
     onError ()    事件队列异常

注意:

1)RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
2)RxJava 规定,onNext() 接收被观察者发送的消息、可以执行多次;当不会再有新的 onNext () 发出时,需要触发 onCompleted () 方法作为标志。onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
3)在一个正确运行的事件序列中, onCompleted() 和 onError () 有且只有一个,并且是事件序列中的最后一个。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

3.调度器

RxJava中调度器设置方法

subscribeOn():或者叫做事件产生的线程。 
    指定 subscribe()所发生的线程,
    即 Observable.OnSubscribe 被激活时所处的线程。

observeOn():或者叫做事件消费的线程。
    指定 Subscriber所运行在的线程。

几种调度器

在RxJava 中Scheduler——调度器,相当于线程控制器,
RxJava 通过它来指定每一段代码应该运行在什么样的线程。
RxJava 已经内置了几个Scheduler,它们已经适合大多数的使用场景:
  1:Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
  2:Schedulers.newThread():总是启用新线程,并在新线程执行操作。
  3:Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
      行为模式和 newThread()差不多区别在于 io()的内部实现是是用一个无数量上限的线 
      程池可以重用空闲的线程,因此多数情况下 io()比 newThread()更有效率。不要把计算
      工作放在 io()中可以避免创建不必要的线程。
  4:Schedulers.computation():计算所使用的 Scheduler这个计算指的是 CPU密集型计算,
       即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定  
       的线程池,大小为 CPU核数。不要把 I/O 操作放在computation()中,否则 I/O 操作
       的等待时间会浪费CPU。
  5.AndroidSchedulers.mainThread():Android 还有一个专用的
        它指定的操作将在 Android主线程运行。有了这几个 Scheduler,就可以使用
        subscribeOn()和 observeOn()两个方法来对线程进行控制了。 

4.依赖库

//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 库
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//转换器,请求结果转换成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 库

5.简单使用

public static void baseRx(){
    //创建被观察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("1111");
            emitter.onNext("2222");
            emitter.onNext("3333");
            emitter.onNext("4444");
            //emitter.onError(new Throwable("abc"));
            //emitter.onComplete();
        }
    });

    //创建观察者
    Observer<String> observer = new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {//关闭线程
            Log.e(TAG, "onSubscribe: " );
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "onNext: "+ s );
        }

        @Override
        public void onError(Throwable e) {//失败
            Log.e(TAG, "onError: "+e.getMessage() );
        }

        @Override
        public void onComplete() {//成功
            Log.e(TAG, "onComplete: " );
        }
    };
    //被观察者订阅观察者
    observable.subscribe(observer);

    //线程切换
    observable
            //被订阅者在子线程中
            .subscribeOn(Schedulers.io())
            //订阅者在主线程中
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);

    //观察中可以重复指定线程
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())//主
            .observeOn(Schedulers.io())//子
            .observeOn(AndroidSchedulers.mainThread())//主
            .subscribe(observer);
}

6.Android功能使用

private void rxAndroid() {

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(MyServer.Url)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build();

    MyServer myServer = retrofit.create(MyServer.class);

    Observable<ResponseBody> call = myServer.getDate();

    call.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(ResponseBody responseBody) {


                    try {
                        Log.e(TAG, "onNext: "+responseBody.string() );
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

private void rxAndroidBean() {

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(MyServer.Url)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build();

    MyServer myServer = retrofit.create(MyServer.class);

    Observable<Bean> call = myServer.getDate2();

    call.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Bean>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Bean responseBody) {

                    Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

7.其他操作符使用

//遍历输出
public static void rxFrom(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer);
        }
    });
}

//数组合并输出
public static void rxJust(){
    Integer[] a = {1,2,3};
    Integer[] b = {9,8,7};
    Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
        @Override
        public void accept(Integer[] integers) throws Exception {
            for (Integer i: integers) {
                Log.e(TAG, "accept: "+i);
            }
        }
    });
}

//范围输出
public  static  void rxRange(){
    Observable.range(0,20).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer );
        }
    });
}

//过滤输出
public static void rxFilter(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            if (integer>3){
                return true;
            }
            return false;
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer );
        }
    });
}

//定时器
public static void rxInterval(){

    Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.e(TAG, "accept: "+aLong );
        }
    });
}

//数组转换
public static void rxMap(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) {

            return integer+"abc";
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            Log.e(TAG, "accept: "+s );
        }
    });
}

//一个对象转换为一组对象
public static void rxFlatMap(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            String[] strs = new String[3];
            for (int i =0;i<strs.length;i++){
                strs[i] = integer +  strs[i];
            }
            return Observable.fromArray(strs);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "accept: "+s );
        }
    });
}

//Observable压缩合并
public static void rxZip(){
    Integer[] a= {1,2,3};
    Integer[] b={4,5,6};
    Observable<Integer> observableA = Observable.fromArray(a);
    Observable<Integer> observableB = Observable.fromArray(b);

    Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {

        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            return integer + ":" + integer2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "accept: "+s );
        }
    });

}

//合并
public static void rxMerge(){
    Integer[] a ={1,2,3};
    String[] b = {"abc","aaa","bbb"};
    char[] c = {'a','b','c'};

    Observable<Integer> A = Observable.fromArray(a);
    Observable<String> B = Observable.fromArray(b);
    Observable<char[]> C = Observable.fromArray(c);

    Observable
            .merge(A,B,C)
            .subscribe(new Consumer<Serializable>() {
                @Override
                public void accept(Serializable serializable) throws Exception {
                    Log.e(TAG, "accept: ."+serializable );
                }
            });
}

8.RxAndroid好处

用途
   是一个实现异步操作的库,具有简洁的链式代码,提供强大的数据变换。
优势
   异步好简单、代码好简洁,一个简单、一个简洁,这就意味着工作效率。

subscribeOn只能定义一次,除非是在定义doOnSubscribe
observeOn可以定义多次,决定后续代码所在的线程

9.RxJava:好处

使用Rxjava的好处在于,我们可以方便的切换方法的执行线程,对线程动态切换,该过程无需我们自己手动创建和启动线程。使用Rxjava创建的代码虽然出现在同一个线程中,但是我们可以设置使得不同方法在不同线程中执行。上述功能的实现主要归功于RxJava的Scheduler实现,Scheduler 提供了『后台处理,前台回调』的异步机制。
上一篇下一篇

猜你喜欢

热点阅读