RxJava学习笔记

2019-01-15  本文已影响0人  lycknight

RxJava

Rxjava的GitHub官网上是这样介绍rxjava的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programes by using observable sequences(RxJava是一个基于Reactive Extensions的JVM实现框架,它使用观察者模式的做法,将异步和基于事件的编程很好的结合起来),虽然是简短的一句话,但是很清晰的解释了RxJava,其中有三个关键字:观察者异步事件

为什么要使用RxJava

总结来说RxJava主要有以下几点优点:

  1. 异步(作用):可以自由切换线程
  2. 观察者模式(模式)
  3. 响应式编程(结构)
  4. 逻辑简单(逻辑简洁):减少回调嵌套

响应式编程

在某种程度上,这并不是新东西。事件总线(Event buses)或常见的单机事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义的操作。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和监听事件数据流。

简洁编程

假设有这样一个需求:给定一个图片目录,需要将这个目录下的所有图片显示到对应的imageview空间上,由于读取图片是一个比较耗时的过程,需要新启动一个线程来获得bitmap类型的图片,然后在主线程上显示图片,我们平时的做法是:

 new Thread() {
            @Override
            public void run() {
                super.run();
                for (File folder : files) {
                    File[] files = folder.listFiles();
                    for (File file : files) {
                        if (file.getName().endsWith(".png")) {
                            final Bitmap bitmap = getBitmapFromFile(file);
                            MainActivity.this.runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    imageCollectorView.addImage(bitmap);
                                }
                            });
                        }
                    }
                }
            }
        }.start();

但是如果是RxJava2,实现的方式是这样的:

Observable.fromArray(files)
                .flatMap(new Function<File, ObservableSource<File>>() {
                    @Override
                    public ObservableSource<File> apply(@NonNull File file) throws Exception {
                        return Observable.fromArray(file.listFiles());
                    }
                })
                .filter(new Predicate<File>() {
                    @Override
                    public boolean test(@NonNull File file) throws Exception {
                        return file.getName().endsWith(".png");
                    }
                })
                .map(new Func1<File, Bitmap>() {
                    @Override
                    public Bitmap call(File file) {
                        return getBitmapFromFile(file);
                    }
                })
                .subscribeOn(rx.schedulers.Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        if (o instanceof  Bitmap){
                            imageView.addImage(o);
                        }
                    }
                });

从代码上看,的确是RxJava的代码变多了,但是从逻辑来说,RxJava的实现是从上到下的链式调用,没有任何嵌套,这在逻辑的间接性上是具有优势的。

基本概念

RxJava使用的是观察者模式来实现的,观察者模式对于我们来说应该是比较熟悉的,我们在日常开发中经常使用到的,例如,setOnClickListener(listener)就是一个观察者模式的应用。

基本用法

Observable(被观察者)的创建

 Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
                e.onNext("next1");//发送字符串"next1"
                e.onNext("next2");//发送字符串"next2"
                e.onComplete();//发送完成
            }
        });
Observable.just("just1","just2");//依次发送“just1”和“just2”
List<String> list = new ArrayList<>();
        list.add("from1");
        list.add("from2");
        list.add("from3");
        Observable.fromIterable(list);//遍历发送list的每个item。
        String[] strings = new String[]{"from1","from2","from3"};
        Observable.fromArray(strings);//遍历发送strings的每个item。
 Observable.defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                return Observable.just("defer");
            }
        });
Observable.interval(1, TimeUnit.SECONDS);//每个一秒发送一个数字
Observable.rang(20,5);//发送20,21,22,23,24
Observable.timer(4,TimeUnit.SECONDS);//4秒后发射一个值
Observable.just("repeat").repeat(3);//重复发射3次

Observer(观察者)的创建

 Observer observer = new Observer() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                
            }

            @Override
            public void onNext(@NonNull Object o) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

创建了Observer之后,就可以将Observable与其关联起来

Observable.subscribe(observer);//订阅

线程控制

如果订阅时不特别指定线程,那么就会在订阅时所在的线程产生事件。如果需要切换线程,就需要用到Scheduler。

有了这几个Scheduler,就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。

举个例子,在I/O线程发送事件,在主线程接收事件

Observable.fromArray("from1","from2","from3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.i("knight","S: "+ s);
                    }
                });

变换

RxJava提供了对事件序列进行变换的支持,也就是核心的功能之一。所谓的变化,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列。

Observable.fromArray(1,2,3)// 输入类型 Integer
                .subscribeOn(Schedulers.io())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer s) throws Exception { // 参数类型 Integer
                        Log.i("knight","thread: "+ Thread.currentThread().getName());
                        return String.valueOf(s);// 返回类型 String
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {// 参数类型 String
                        Log.i("knight","thread: "+ Thread.currentThread().getName());
                        Log.i("knight","S: "+ s);
                    }
                });
 List<Dept> depts = new ArrayList<>();
        Observable.fromIterable(depts)
                .subscribe(new Consumer<Dept>() {
                    @Override
                    public void accept(@NonNull Dept dept) throws Exception {
                        List<String> persons = dept.getPerson();
                        for (String person : persons) {
                            System.out.print(person);
                        }
                    }
                });

使用flatMap()

List<Dept> depts = new ArrayList<>();
        Observable.fromIterable(depts).flatMap(new Function<Dept, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Dept dept) throws Exception {
                return Observable.fromIterable(dept.getPerson());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.print(s);
            }
        });
上一篇下一篇

猜你喜欢

热点阅读