Rxjava(1) 创建操作

2019-07-31  本文已影响0人  其勇勇

文档网址:https://mcxiaoke.gitbooks.io/rxdocs/content/

private void repeat(){

        /**
         *  main  1
         main  2
         main  1
         main  2
         main  1
         main  2
         main  1
         main  2
         main  1
         main  2
         */
        Observable.range(1, 2).repeat(5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e("qwer",Thread.currentThread().getName() + "  " + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }


    private void range(){
        Observable.range(1,10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e("qwer",Thread.currentThread().getName() + "  " + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void just(){
        Observable.just(1,"df").subscribe();
        Observable.just(1,"df",99L,new Object()).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void timer_interval(){

        Observer<Long> observer = new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        //延迟 1s 后执行一个任务,然后结束
        //Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).subscribe(new Observer<Long>() {

        Observable.timer(1000, TimeUnit.MILLISECONDS).
                subscribeOn(Schedulers.newThread()).
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(observer);

        //每隔 1s 执行一次任务,第一次任务执行前有 1s 的间隔,执行无限次
        Observable.interval(1000, TimeUnit.MILLISECONDS).
                subscribeOn(Schedulers.io()).subscribe(observer);

        //每隔 1s 执行一次任务,立即执行第一次任务,执行无限次
        Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
                subscribeOn(Schedulers.io()).subscribe(observer);

        //每隔 1s 执行一次任务,立即执行第一次任务,只执行五次
        Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
                take(5).
                subscribe(observer);

        //先执行一个任务,等待 1s,再执行另一个任务,然后结束
        Observable.just(0L).doOnNext(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d("qwer", "执行第一个任务");
            }
        }).delay(1000, TimeUnit.MILLISECONDS).subscribe(observer);
    }

    private void from(){
        List<String> data = new ArrayList<>();
        data.add("1");
        data.add("2");
        data.add("3");
        data.add("4");

        Observable.fromIterable(data).doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {

            }
        }).subscribe();

        Observable.fromIterable(data).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("qwer","accept : " + s);
            }

        }).dispose();

        Observable.fromIterable(data).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("qwer","onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.e("qwer","onNext" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("qwer","onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e("qwer","onComplete");
            }
        });


        /**
         * Observable 1
         onNext 1
         Observable 2
         onNext 2
         Observable 3
         onNext 3
         Observable 4
         onNext 4
         */
        Observable.fromIterable(data).doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("qwer"," Observable " + s);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("qwer"," onNext " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void create(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {

                for(int i = 0;i < 13;i++){
                    if(i == 10){
                        emitter.onError(new Throwable(""));
                    }else {
                        emitter.onNext(i);
                    }
                }
                emitter.onComplete();
            }
        }).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                return 100;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("qwer","onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("qwer","onNext" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("qwer","onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e("qwer","onComplete");
            }
        });
    }
上一篇下一篇

猜你喜欢

热点阅读