android开发JavaAndroid知识

ReactiveX 和 RxJava 手记

2016-08-03  本文已影响1914人  androidjp

引言:
学习了一下RxJava,理解其是一个以升级版的观察者模式为核心的异步处理库。旨在以更加简介、可读性更强的代码去实现数据异步处理和线程前通信。
下面,是本人对RxJava基础的学习笔记和总结,算是入门级别。

Rx介绍


ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言。Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

RxJava图解


可先通过图解总览大概:
RxJava之观察者模式的基本运作过程,如下:


RxJava之观察者模式的基本运作过程

注意:Subscribe<T> 是实现 Observable<T>Subscription 的一个抽象类,在调用subscribe(params)方法时,如果这个params类型为Observer<T>,则最终它会转成Subscriber<T>,同时,此方法会返回一个Subscription对象,用于调用unsubscribe()方法解绑。

单线程中RxJava基本用法和例子


1. RxJava的几种基本写法(观察者模式)

方式一:

原始的观察者模式写法,如下:

///被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

///观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
      };

///订阅(让两者产生关联,并启动)
 myObservable.subscribe(mySubscriber);

方式二:

相对方式一,化简定义方法体的部分,使用Action来实现不完整回调,结果如下:

//被观察者
//等价于: call(String) -> onNext(String)过程只调用一次 ->onCompleted()/onError()
Observable<String> myObservable = Observable.just("Hello world");

///观察者
///调用subscribe()时自动生成Subscriber并调用onNext()
Action1<String> onNextAction = new Action1<String>() {
      @Override
      public void call(String s) {
          Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
      }
};

///观察者
///调用subscribe()时自动生成Subscriber并调用onError()
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};

///观察者
///调用subscribe()时自动生成Subscriber并调用onCompleted()
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

//////订阅(让两者产生关联,并启动)
 myObservable.subscribe(onNextAction);
 // myObservable.subscribe(onErrorAction);
 // myObservable.subscribe(onCompletedAction);

方式三:

相对方式二,进行链式调用,如下:

///省略Obervable对象的创建
Observable.just("this is your sign:")
                ///省略Action1对象的创建,直接匿名内部类方式添加订阅
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                    }
                });

注意:

  1. just:如果只是调用: onNext() 【一到多次】 --> onCompleted()这个过程,那么,可以使用just()快速创建Observable

2. 基本应用

1. 打印字符串数组

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });

Observable.from(params) : params是数组类型的参数,在执行时,会调用Subscriber的onNext方法多次,每次处理一个item,之后,调用onCompleted()或者onError().

2. 通过id获取图片并显示

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

多线程中RxJava的使用


在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。

1. 基本写法

Observable.just(1,2,3,4)
                ///指定 subscribe() 发生在 IO 线程
                .subscribeOn(Schedulers.io())
                // 指定 Subscriber 的回调发生在主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.e("TestActivity", "当前线程:"+ Thread.currentThread());
                        String res = "字符串:"+integer;
                        return res;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Toast.makeText(TestActivity.this,"完成",Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("TestActivity", "当前线程:"+ Thread.currentThread());
                        Toast.makeText(TestActivity.this,s,Toast.LENGTH_SHORT).show();
                    }
                });

知识点:

  1. 加了map这个RxJava的映射方法,用于将事件处理的复杂过程【如:输入参数是Integer类型,输出结果是String类型】给被观察者来做,尽可能地减少观察者的工作。
    知识点:
  2. just((1,2,3,4):
    前者等价于如下代码:
```
Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            Log.e("TestActivity", "call当前线程:"+ Thread.currentThread());
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onCompleted();
        }
    })
```
  1. Scheduler:
* 背景:在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 `subscribe()`,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
* 概念:调度器(线程控制器)
* 作用:切换线程传递事件,达到异步的目的
* RxJava内置的Scheduler:(文章下面会详细总结)
  * `Schedulers.immediate()`:默认模式。直接使用当前线程运行。
  * `Schedulers.newThread()`:总是启动新线程,并在新线程中运行。
  * `Sched.io()`:I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
  * `Schedulers.computation()`: 计算所使用的 Scheduler。
  * `AndroidSchedulers.mainThread()`:它指定的操作将在 Android 主线程运行。
  1. Obervable.subscribeOn(Scheduler):让call方法以及之前的操作,发生在指定的线程中运行
  2. Obervable.observeOn(Scheduler):让call之后的回调操作例如map、onNext等操作,发生在指定的线程中运行。

RxJava常用操作--数据转换处理


在事件传递过程中,如果观察者有需要,还可以通过数据转换处理,将传入的数据进行加工或调用,得到更多不同类型的信息。
RxJava提供给我们:map,flatMap来支持数据的‘一对一’和’一对多‘的转换。

  1. map
    作用:实现数据的一对一转化过程
    以下例子可以说明:
///省略Obervable对象的创建
Observable.just("this is your sign:")
              ///将传入的参数由String变成String[]
              .map(new Func1<String, String[]>() {
                  @Override
                  public String[] call(String s) {
                      String[] strings = s.split(" ");
                      return strings;
                  }
              })
              ///将传入的参数由String[]变成Integer
              .map(new Func1<String[], Integer>() {
                  @Override
                  public Integer call(String[] strings) {
                      int len = strings.length;
                      return len;
                  }
              })
              ///将传入的参数由Integer变成String
              .map(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return integer+"";
                  }
              })
              ///省略Action1对象的创建,直接匿名内部类方式添加订阅
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                  }
              });

  1. flatMap
    作用:实现数据的一对多转换过程
    先看如下具体例子:
private void testFlatMap() throws CloneNotSupportedException {
      List<Student> studentList = new ArrayList<>();
      ///测试:构建两个Student对象
      Student xiaoming = new Student();
      Student honghong = new Student();
      ///测试:构建Course对象集
      Course chinese = new Course("语文");
      Course english = new Course("英语");
      Course math  = new  Course("数学");

      ///进行赋值操作,这样一来:
      /// xiaoming:id为“2222”,并有两门课程:语文和英语
      /// honghong:id为“007” ,并有两门课程:英语和数学
      xiaoming.id= "2222";
      honghong.id= "007";
      xiaoming.courseList = new ArrayList<>();
      xiaoming.courseList.add(chinese.clone());
      xiaoming.courseList.add(english.clone());
      honghong.courseList = new ArrayList<>();
      honghong.courseList.add(english.clone());
      honghong.courseList.add(math.clone());

      studentList.add(xiaoming);
      studentList.add(honghong);

      ///下面的过程,就是提取:列表中的列表
      Observable.from(studentList)
              .flatMap(new Func1<Student, Observable<Course>>() {
                  @Override
                  public Observable<Course> call(Student student) {
                      Log.e("学生信息", student.id);
                      return Observable.from(student.courseList);
                  }
              })
              .map(new Func1<Course, String>() {
                  @Override
                  public String call(Course course) {
                      return course.name;
                  }
              })
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.e("course信息",s);
                  }
              });
  }

最终得到结果为:


知识点:

  1. flatMap:
  1. Funx 和 Actionx:

关于Single


RxJava各方法汇总


1. 用于创建Observable的操作符:

2. 用于对Observable发射的数据进行变换的操作符:

3. 线程切换和控制相关操作符:

4. Single相关方法汇总:

参考文章


上一篇 下一篇

猜你喜欢

热点阅读