我爱编程

RxJava 基础实现

2018-08-04  本文已影响0人  一只笔

1. 定义

RxJava 是一个 基于事件流、实现异步操作的库

2. 作用

实现异步操作
类似于 Android中的 AsyncTaskHandler作用

3. 特点

image.png

4. 实现步骤

  1. 创建事件 Observable.create
  2. 创建观察者 Observer
  3. 订阅observable.subsribe
public void text1() {
       //3创建事件
       Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
           @Override
           public void subscribe(ObservableEmitter<String> e) throws Exception {
               LogUtil.f().i(MainActivity.TAG, "subscribe");
               e.onNext("132");
               e.onNext("456");
               e.onComplete();
               //注意onComplete() 与 onError() 同时只能调用一个
//                e.onError(new Throwable());
           }
       });
       
       //2创建观察者
       Observer<String> observer = new Observer<String>() {
           Disposable disposable;
           @Override
           public void onSubscribe(Disposable d) {
               disposable=d;
               LogUtil.f().i(MainActivity.TAG, "onSubscribe");
           }
           
           @Override
           public void onNext(String s) {
               LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
               //可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
//                disposable.dispose();
           }
           
           
           @Override
           public void onError(Throwable e) {
               LogUtil.f().i(MainActivity.TAG, "onError");
           }
           
           @Override
           public void onComplete() {
               LogUtil.f().i(MainActivity.TAG, "onComplete");
           }
       };
       //3订阅
       observable.subscribe(observer);
       
   }

输出的结


image.png

切断观察者

//可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接

   @Override
            public void onNext(String s) {
                LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
                //可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
                disposable.dispose();
            }

输出的结


image.png

切断后,后的方法就不会再执行

额外说明

观察者 Observer的subscribe()具备多个重载的方法

    public final Disposable subscribe() {}
    // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)

    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // 表示观察者只对被观察者发送的Next事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应

    public final void subscribe(Observer<? super T> observer) {}
    // 表示观察者对被观察者发送的任何事件都作出响应

切换线程

     //切换线程
        observable = observable
                .subscribeOn(Schedulers.io())//运行子线程
                .observeOn(AndroidSchedulers.mainThread());//回调在android 主线程

运行结果:

image.png
以上可以看出,运行是在子线程,回调是在android 主线程.

优雅的实现

      Observable.create(new ObservableOnSubscribe<String>() {
             @Override
             public void subscribe(ObservableEmitter<String> e) throws Exception {
                 Log.i(MainActivity.TAG, "subscribe" + "线程:" + Thread.currentThread().getName());
                 e.onNext("132");
                 e.onNext("456");
                 e.onComplete();
             }
         })
                 .subscribeOn(Schedulers.io())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribe(new Observer<String>() {
                     @Override
                     public void onSubscribe(Disposable d) {
                         Log.i(MainActivity.TAG, "onSubscribe" + "线程:" + Thread.currentThread().getName());
                     }
    
                     @Override
                     public void onNext(String s) {
                         Log.i(MainActivity.TAG, "onNext" + "线程:" + Thread.currentThread().getName());
                     }
    
                     @Override
                     public void onError(Throwable e) {
                         Log.i(MainActivity.TAG, "onError" + "线程:" + Thread.currentThread().getName());
                     }
    
                     @Override
                     public void onComplete() {
                         Log.i(MainActivity.TAG, "onComplete" + "线程:" + Thread.currentThread().getName());
                     }
                 });
    }

运行结果:

image.png

操作符

Map

map是RxJava中最简单的一个变换操作符了, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化. 用事件图表示如下:

       //Integer
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
            //Strig
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return String.format("The %s time", integer);
            }
            //String 叠加
        }).map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return String.format("%s , He is second time", s);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

在上游我们发送的是数字类型, 而在下游我们接收的是String类型, 中间起转换作用的就是map操作符, 运行结果为:

The 1 time , He is second time
The 2 time , He is second time

FlatMap

flatMap是一个非常强大的操作符, 先用一个比较难懂的概念说明一下:

FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList();
                list.add("He is "+integer);
                list.add("He is "+integer);
                list.add("He is "+integer);

                return Observable.fromIterable(list).delay(10,TimeUnit.MICROSECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) throws Exception {
                System.out.println(str);
            }
        });

运行结果:

He is 1
He is 1
He is 1
He is 2
He is 2
He is 2

举个例子:注册成功后调登录

api.register(new RegisterRequest())            //发起注册请求
                .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
                .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求注册结果
                .doOnNext(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        //先根据注册的响应结果去做一些操作
                    }
                })
                .observeOn(Schedulers.io())                 //回到IO线程去发起登录请求
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        return api.login(new LoginRequest());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求登录的结果
                .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse loginResponse) throws Exception {
                        Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
                    }
                });
 public void login() {
            renoteRegister().doOnNext(new Consumer<String>() {//开始注册
                @Override
                public void accept(String s) throws Exception {//注册成功回调
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

            }).flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    System.out.println(s + "--开始登录");
                    System.out.println(Thread.currentThread().getName());
                    return renoteLogin();//调用登录
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {//登录回调
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println(throwable.getMessage());
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }

        //远程登录
        public Observable<String> renoteLogin() {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("登录成功");
                    emitter.onComplete();
                }
            });
        }

        //远程注册
        public Observable<String> renoteRegister() {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("注册成功");
                    emitter.onComplete();
                }
            });
        }

本文源码下载地址
参考原文

参考原文

上一篇下一篇

猜你喜欢

热点阅读