学习RxJava2---finish

2018-12-31  本文已影响0人  Thor_果冻

RxJava

观看菜鸟窝视频RxJava2整理
我的github整理学习rxjava

RxJava是一种响应式编程
采用观察者模式
好处:异步、简洁

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

观察者模式

观察者(Observer)模式:是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听(Source/Listener)模式或者从属(Dependents)模式

观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,这个主题对象再状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。


基本使用

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "123===";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
    }

    public void click(View view) {
        //点击按钮

        //第一种创建方法,最基本的使用
//        firstMethod();

        //第二种创建方法,简单的方法
        secondMethod();
    }

    private void secondMethod() {
//        Observable<String> observable = Observable.just("1", "2", "3");
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onComplete();
//                e.onError(new Throwable("----onError"));//如果是错误结束,在subscribe订阅的时候必须加上第二个参数
            }
        });

        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: "+s);
            }
        }/*, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "accept: "+throwable.getLocalizedMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: ");
            }
        }*/);
    }

    private void firstMethod() {
        //第一步创建Observable, 被观察这
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Log.d(TAG, "subscribe: 1--");
                e.onNext("1");//发送数据
                Log.d(TAG, "subscribe: 2--");
                e.onNext("2");
                Log.d(TAG, "subscribe: 3--");
                e.onNext("3");
                Log.d(TAG, "subscribe: 完成--");
                e.onComplete();//完成是调用
//                e.onError(new Throwable("我是错误信息"));//在错误的时候调用(完成或错误只能调用一个,不可以两个同时调用)
            }
        });
        //第二部创建observer,观察者
        Observer<String> observer = new Observer<String>() {

            private Disposable dd;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                //
                dd = d;
//                d.dispose();//移除订阅关系
//                d.isDisposed();//是否移除订阅关系
                Log.d(TAG, "onSubscribe: " + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull String s) {
                //事件
                Log.d(TAG, "onNext: " + s);
                if ("2".equals(s)) {
                    dd.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                //错误
                Log.d(TAG, "onError: " + e.getLocalizedMessage());
            }

            @Override
            public void onComplete() {
                //完成
                Log.d(TAG, "onComplete: ");
            }
        };
        //实现订阅关系
        observable.subscribe(observer);
    }
}

Scheduler线程控制

Observable.create(new ObservableOnSubscribe<User>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<User> e) throws Exception {
                //请求网络
                User user = new User("com.thor.wdd", "1", "wdd");

                e.onNext(user);
            }
        }).subscribeOn(Schedulers.io())//Observable切换到子线程
                .observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull User user) {
                        Log.d(TAG, "onNext: " + user.toString());
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

subscribeOn(Schedulers.io())//Observable、Observer切换到子线程
observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程

操作符介绍

操作符分类跳转到官翻

下面都是我整理学习的记录在github

创建操作符

        creat;
        just;//快速创建对象,最多发送十个对象
        fromArray;//数组遍历
        fromIterable;//遍历
        empty;//仅发送Complete事件,直接通知完成
        error;//仅发送Error事件,直接通知异常
        never;//不发送任何事件
        defer;//直到有观察者observer订阅时,才动态创建被观察者对象 并 发送事件
        timer;//延迟指定时间后,调用一次onNext(0)
        interval;//每隔指定事件发送事件,无限叠加发送,从0开始
        intervalRange;//每隔指定时间 就发送事件,并指定发送事件数量
        range;//连续发送一个事件序列,可指定次数

变换操作符

        map;//数据转换
        flatMap;//将被观察者发送的事件序列进行拆分,并且单独转换,再合并成一个新的事件序列,最后进行发送
        concatMap;//类似FlatMap()操作符;区别:拆分并且重新合并生成的事件序列的顺序 等于 被观察者旧序列生产的顺序
        buffer;//缓存区大小 = 每次从被观察者中获取的事件数量, 步长 = 从当前位置向后移动几位

组合/合并操作符

        concat;//concat组合被观察者数量<=4,顺序执行
        concatArray;//concatArray组合观察者数量>4,顺序执行
        merge;//组合被观察者数量<=4,非顺序执行
        mergeArray;//组合被观察者数量>4,非顺序执行
        concatArrayDelayError;//第1个被观察者的Error事件将在第2个被观察者发送完事件后再继续发送,mergeDelayError()操作符同理
        mergeDelayError;
        zip;//严格按照原先事件序列 进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
        combineLatest;//当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据,与reduce区别就是接收的事件是每次都有而reduce只有最后一次输出信息
        reduce;//把前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
        collect;//将被观察者Observable发送的数据事件收集到一个数据结构里
        startWith;//在一个被观察者发送事件前,追加发送一些数据/ 一个新的被观察者, 后调用的startWith先追加
        startWithArray;
        count;//统计被观察者发送事件的数量

功能性操作符

        delay;//指定延迟时间
        do;//在某个事件的生命周期中调用
        retry;//重试,即出现错误,让被观察者重新发送数据
        retryWhen;//出现错误后,判断是否需要重新发送数据
        repeat;//重复不断地发送被观察者事件
        repeatWhen;//有条件地、重复发送 被观察者事件

过滤操作符

        filter;过滤 特定条件的事件
        ofType;过滤 特定类型的数据
        skip;跳过指定事件,也可跳过指定时间时间
        skipLast;跳过指定最后事件,也可跳过最后时间时间
        distinct;去重
        distinctUntilChanged;连续重复才去重
        take;接收事件数量
        takeLast;只接收最后几个事件数量
        throttleFirst;指定时间内只接收第一次事件
        throttleLast/sample;指定时间内只接收最后一次次事件
        throttleWithTimeout/debounce;采样频率,指定时间只接收最新事件
        firstElement;//获取第一个事件
        lastElement;//获取最后一个事件
        elementAt;//获取指定位置事件,如果超出位置没有任何提示,如果需要提示则调用elementAtOrError(),如超出位置需要指定默认值则调用elementAt 2个参数的方法

布尔操作符

    takeWhile;判断发送的每项数据是否满足设置的函数条件
    skipWhile;跳过满足条件的那些数据,发送不满足那些条件的数据
    takeUntil;Predicate参数时:执行到条件成立时,停止发送事件,但本次事件会发送出去;observable参数时:第二个observable开始发送数据时,原始的observable停止发送事件
    sequenceEqual;判定两个Observables需要发送的数据是否相同
    contains;判断发送的数据中是否包含指定数据
    isEmpty;判断发送的数据是否为空
    defaultIfEmpty;在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值
    amb;当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃
上一篇下一篇

猜你喜欢

热点阅读