RxJava2.0初学知识回顾

2018-01-15  本文已影响0人  张正yi

1、RxJava的重要组成

Observable (可观察者,即被观察者)、Observer (观察者)、subscribe (订阅)、事件。
Observable的生命周期中有三个重要的事件,onNext(检索数据)、onCompleted(完成)和onError(发现错误)。

Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用。

Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

RxJava三部曲:初始化一个Observable,初始化一个Observer,建立订阅关系

1.1 Observable创建

Observable<T> create(ObservableOnSubscribe<T> source)

代码效果如下:

   @Test
    public void testObservable() {
        System.out.println("---------testObservale----------");
        // RxJava1的写法,本次使用的是Rxjava2依赖
        //Observable<Integer> observable = Observable.create(new Observable.Onsubscrible<Integer>(){});
        //创建一个Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
             // 创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for (int i = 0; i < 5; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        });

        //创建一个Observer
        Observer<Integer> observer = new Observer<Integer>() {
            /**
            * RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于        
            * RxJava1.x中的Subscription,用于解除订阅。
             */
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("---------onSubscribe:" + d);
            }

            @Override
            public void onNext(Integer integer) {
                // 收到数据
                System.out.println("---------onNext:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---------onError----------");
            }

            @Override
            public void onComplete() {
                System.out.println("---------onComplete----------");
            }
        };

        // 建立连接(订阅)
        observable.subscribe(observer);
    }
   运行结果如下:
    ---------testObservale----------
    ---------onSubscribe:null
    ---------onNext:0
    ---------onNext:1
    ---------onNext:2
    ---------onNext:3
    ---------onNext:4
    ---------onComplete----------

1.2 Observable的其他创建方式

1.2.1 fromIterable()

使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

代码效果如下:

   @Test
    public void testObservable2() {
        List<String> lists = new ArrayList<String>();
        for (int i = 0; i < 5; i++) {
            lists.add("Hello Boy" + i);
        }
        Observable observable = Observable.fromIterable((Iterable<String>) lists);
        //创建一个Observer
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(String str) {
                System.out.println("---------onNext:" + str);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("---------onError----------");
            }
            @Override
            public void onComplete() {
                System.out.println("---------onComplete----------");
            }
        };
        // 建立连接(订阅)
        observable.subscribe(observer);
    }
    运行结果如下:
     ---------onNext:Hello Boy0
     ---------onNext:Hello Boy1
     ---------onNext:Hello Boy2
     ---------onNext:Hello Boy3
    ---------onNext:Hello Boy4
    ---------onComplete----------

1.2.2 just()

使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。
通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。

代码实现如下:

   @Test
    public void testObservable3() {
        List<String> lists = new ArrayList<String>();
        for (int i = 0; i < 5; i++) {
            lists.add("Hello Boy" + i);
        }
        Observable observable = Observable.just(lists);
        //创建一个Observer
        Observer<List<String>> observer = new Observer<List<String>>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(d);
            }

            @Override
            public void onNext(List<String> str) {
                System.out.println("---------onNext:" + str);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---------onError----------");
            }

            @Override
            public void onComplete() {
                System.out.println("---------onComplete----------");
            }
        };
        // 建立连接(订阅)
        observable.subscribe(observer);
    }
    // 运行结果如下:
     Disposable:0
      ---------onNext:[Hello Boy0, Hello Boy1, Hello Boy2, Hello Boy3, Hello Boy4]
      ---------onComplete----------

1.2.3 其它

defer():当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。

interval( ):固定的时间间隔发射一个无限递增的整数序列

range( ):创建一个发射特定整数序列的Observable,接收两个参数,第一个参数是范围的起始值,第二个参数是范围的数据数目

timer( ):创建一个Observable,它在一个给定的延迟后发射一个特殊的值

repeat( ):创建一个Observable,该Observable的事件可以重复调用。

2、RxJava中的操作符

2.1、map()操作符

map 操作符是可以将返回的数据变换成别的数据。一般对服务器端返回结果处理

具体看代码实现效果:

  @Test
    public void testObservable4() {
        List<String> lists = new ArrayList<String>();
        for (int i = 0; i < 5; i++) {
            lists.add("Hello Boy" + i);
        }
        Observable observable = Observable.fromIterable((Iterable<String>) lists);
        //创建一个Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Disposable:" + d);
            }
            @Override
            public void onNext(String str) {
                System.out.println("---------onNext:" + str);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("---------onError----------");
            }
            @Override
            public void onComplete() {
                System.out.println("---------onComplete----------");
            }
        };
        Function fun1 = new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                // 这个可以对返回的数据格式进行修改(比如我在每个前面加入"zyzhang")
                return "zyzhang:" + s;
            }
        };
        // 建立连接(订阅)
        observable.map(fun1).subscribe(observer);
    }
    // 运行结果如下:
    ---------onNext:zyzhang:Hello Boy0
    ---------onNext:zyzhang:Hello Boy1
    ---------onNext:zyzhang:Hello Boy2
    ---------onNext:zyzhang:Hello Boy3
    ---------onNext:zyzhang:Hello Boy4
    ---------onComplete----------

2.2、flatMap()操作符

对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。

比如有如下两个实体类:

    class Student {
        private String name;//姓名
        private List<Course> coursesList;//所修的课程
        ...
    }
    class  Course {
        private String name;//课程名
        private String id;
        ...
    }

我们需要学生所修的课程名(课程可以由多个)。如果直接用map实现的话,很简单得到学生的课程列表,然后一个for循环打印学生的课程信息。篇幅关系不再举例。
flatMap实现如下:

  @Test
    public void testObservable5() {
        List<Student> students = new ArrayList<Student>();
        for (int i = 0; i < 5; i++) {
            Student student = new Student();
            student.setName("张三" + i);
            List<Course> courses = new ArrayList<Course>();
            for (int j = 0; j < 2; j++) {
                Course course = new Course();
                course.setName("数学" + j);
                courses.add(course);
                student.setCoursesList(courses);
            }
            students.add(student);
        }

        Observable observable = Observable.fromIterable(students);
        //创建一个Observer
        Observer<Course> observer = new Observer<Course>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(d);
            }

            @Override
            public void onNext(Course course) {
                System.out.println("---------onNext:" + course.getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---------onError----------");
            }

            @Override
            public void onComplete() {
                System.out.println("---------onComplete----------");
            }
        };

        Function fun1 = new Function<Student, Observable<Course>>() {

            @Override
            public Observable<Course> apply(Student student) throws Exception {
                System.out.println(student.getName());
                return Observable.fromIterable(student.getCoursesList());
            }
        };

        // 建立连接(订阅)
        observable.flatMap(fun1).subscribe(observer);
    }
  // 运行结果如下:
  张三0
  ---------onNext:数学0
  ---------onNext:数学1
  张三1
  ---------onNext:数学0
  ---------onNext:数学1
  张三2
  ---------onNext:数学0
  ---------onNext:数学1
  张三3
  ---------onNext:数学0
  ---------onNext:数学1
  张三4
  ---------onNext:数学0
  ---------onNext:数学1
  ---------onComplete----------

2.3、filter()操作符

集合进行过滤

2.4、take()操作符

取出集合中的前几个

2.5、doOnNext()操作符

此操作符可以在消费者也就是观察者 接收到数据之前做事

3、compose()操作符

4、subscribeOn() 和observeOn()

对线程进行控制,subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。

  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            Log.d("所在的线程:",Thread.currentThread().getName());
            Log.d("发送的数据:", 1+"");
            e.onNext(1);
        }
     }).subscribeOn(Schedulers.io()) 
          .observeOn(AndroidSchedulers.mainThread()) 
          .subscribe(new Consumer<Integer>() {
               @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("所在的线程:",Thread.currentThread().getName());
                    Log.d("接收到的数据:", "integer:" + integer);
                }
           });
// 运行结果
// 所在的线程:RxCachedThreadScheduler-1
// 发送的数据:1
// 所在的线程: main
// 接收到的数据:integer:1

可以看到,Observable(被观察者)发送事件的线程的确改变了,而Observer(观察者)仍然在主线程中接收事件。由此我们实现了线程调度的操作,可以在此基础上尽情的进行异步操作。

上一篇下一篇

猜你喜欢

热点阅读