RxJavaRXJava程序员

RxJava,你好

2017-01-07  本文已影响8209人  唐先僧

在我研究响应式编程的过程中,我所找到的每一篇文章几乎都以响应式编程很难学习的理念开头。针对响应式编程零基础人员准备的文章少之又少。本文尝试通过在android上使用RxJava为初学者厘清响应式编程的基本概念。

什么是响应式编程?

响应式编程就是编程处理异步数据流。

等等,我使用callback也很容易处理异步数据啊。所以这和响应式编程有什么不同呢?

是的,这个概念并不新鲜。它可以通过命令式(imperatively)编程来完成,而且通常都是这么做的。

如果我们不仅仅考虑回调,同时再考虑一下让回调启动并运行的支持机制。使用命令式方法来支持通常会涉及到状态管理还需要考虑状态改变所带来的副作用。在软件开发界,这些考虑已经成为大量错误的原因。响应式编程采用函数式方法;它处理的是流从而避免了全局状态以及相应的副作用。

什么是流?

万物皆流,无物常住 --- 赫拉克利特

流代表一个数据序列。想象一下我们的交通系统。某条高速路上的汽车就是一条一直流动偶尔出现一个瓶颈的对象流。所谓响应式编程,就是我们接收连续流动的数据--数据流--提供处理数据流的方法并将该方法应用到数据流。数据的源头我们并不(也不应)关心。

数据流无处不在。任何物体都可以是数据流:变量,用户输入,属性,缓存,数据结构等等。

什么是声明式编程,什么是命令式编程?

在深入代码之前,我们还是来看一下我们的交通系统网络。让我们假设市长想临时在一条指定的高速路上分间隔摆放停车标志来中断车流。市长会说:“将高速路分成均匀的几段,在每一段的边界上放一个停车标志”。承包商会说:“等等,在分段之前,我需要确定每一段的长度;为了确定分段长度,我需要知道高速路的总长,我们要放置多少个停车标志,以及车辆的平均长度”。在这个场景里,市长拥有足够多的职能部门(包括DOT(交通部)),她在处理事情时只需要专注于宣布她的意图,而不用关心事情具体怎么完成的细节--这就是声明式方法。而另一方面。承包商需要保证整个流程的每一处细节都要考虑周全并准确的完成--这就是命令式方法。如果你可以像市长建设她的城市一样构建一个软件会是什么样子呢?我们一起来看一个示例:

例:使用命令式方法过滤掉偶数。

Integer[] numbers = {1, 2, 3, 4, 5};
List<Integer> lists = Arrays.asList(numbers);
List<Integer> results = new ArrayList<>();

for (Integer num : numbers) {
    if (num % 2 != 0)
        results.add(num);
}

声明式方法

List<Integer> results = lists.stream()
        .filter(s -> s % 2 != 0)
        .collect(Collectors.toList());

很酷,我喜欢声明式方法,但是如果我们不告诉它怎么做,计算机怎么知道做什么呢?
在现在的世界里,任何事情最终落实到操作系统和硬件时都是命令式的。而响应式编程,是函数式编程的一种抽象。就和我们所使用的高阶命令式编程语言是底层二进制以及汇编命令的抽象一样(市长也需要有她的DOT承包商)。

所以,我们怎样在Java中使用声明式编程风格呢?
Java8有一个很惊艳的Stream API,但是如果你和我一样是一个Android开发者,你不能使用Stream API,因为android还不支持Java8的所有特性。尽管如此,你可以使用RxJava,这是由Netflix的开发者为Java提供的一个响应式扩展。

RxJava怎么工作?

响应式代码的基础是被观察者(Observable)观察者(Observer)

被观察者是一个发送数据流或者事件流的类,观察者则对被观察者发送出的数据/事件做出反应。一个被观察者可以有多个观察者,对于被观察者发送出的每一个事件/项目都会被Observer.onNext()方法接收并处理。一旦被观察者发送完了所有的数据它会调用Observer.onComplete()。如果发生错误,被观察者会调用Observer.onError()方法。

注意: 有的被观察者永远都不会终止(比如温度传感器的输出)

观察者和被观察者之间通过Subscription连接,观察者在后面也可以通过Subscription取消订阅被观察者。

听起来和观察者模式很相似,那么观察者模式和RxJava框架之间有什么区别呢?
RxJava的被观察者为观察者模式添加了两个功能。

除此之外,RxJava的威力在于仅仅只需要几行代码就可以变换聚合过滤被观察者发送的数据流,这样可以极大的减少需要维护的状态变量。

给我看代码

创建一个被观察者(Observable):

Integer[] numbers = {1, 2, 3, 4, 5, 6, 7};
List<Integer> lists = Arrays.asList(numbers);
Observable<Integer> integerObservable = Observable.from(lists);

integerObservable将发射数字1、2、3、4、5、6、7然后结束。

注意: 创建被观察者的方式有很多很多。更多信息可以参考官方文档

Subscriber

Subscriber是一种特殊类型的观察者,它可以取消订阅被观察者。

    Observable.
    Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer data) {
           Log.d("Rx", "onNext:"+data);
         }
    
        @Override
        public void onCompleted() {     
           Log.d("Rx","Complete!"); 
        }
    
        @Override
        public void onError(Throwable e) { 
          // handle your error here
        }
    };

将Subscriber连接到被观察者:

被观察者是惰性的,在没有订阅者监听之前它不会做任何事情。

    myObservable.subscribe(mySubscriber);
    // Outputs:
    // onNext: 1
    // onNext: 2
    // onNext: 3
    // onNext: 4
    // onNext: 5
    // onNext: 6
    // onNext: 7
    // Complete!

改变流:

RxJava提供了许多改变流的运算符。下面几个操作方法是最常用的。

这里我过滤掉了所有的奇数项。

    ---1---2---3---4----5----6----7---|-->
             filter(x % 2 == 0)
    -------2-------4---------6--------|-->

注意: Func<T, R>表示一个单参数的函数,T是第一个参数的类型,R是返回结果的类型。

这里我使用map运算符将发射出的数据改变成另外一个数。我改变了integerObservable所发射出的每一项,所以最后每一个数据都变成了该数据的平方。

    ---1---2---3---4----5----6----7---|-->
            map(x -> x * x)
    ---1---4---9---16---25---36---49---|-->

RxJava中有大量的操作符用于处理流变换。

好吧,你不是说响应式编程是异步的吗?

如果你不告诉它需要使用异步的方式,RxJava默认是同步的。

但是同步是响应式系统必须的行为吗?

确定是使用异步还是同步的被观察者需要根据具体的问题分析。例如:从内存缓存中获取数据并立即返回也许使用同步会更合适。另一方面,如果被观察者会产生网络调用或者一些耗时的数据处理则应该使用异步的方式。总的原则就是:如果是在开发一个图形系统,当一项任务起源于UI线程并且需要阻塞(或者大量的计算操作)时应该采用异步的方式。

在异步从何而来这个问题上,RxJava持不可知论者的态度。

了解,现在告诉我怎么创建一个异步的observable?

首先,我们来看一下使用RxJava之前,将一个密集的长时间的I/O操作转移的其他线程(非ui线程)时的处理方式。

以前的方式

    private class FetchUsersTask 
                extends AsyncTask<String, Void, User> {
    
      protected User doInBackground(String... someData) {
        String userId=params[0];
        User user = UsersService.getUser(userId);
        return user;
      }
    
      protected void onPostExecute(User user) {
        //handle the result and update the view
      }
    }

FetchUsersTask调用usersService.getUsers()并返回一个字符串列表,然后传递给onPostExecute()方法。看起来非常简单,但是这段代码中存在一些问题

RxJava方式:

现在我们来看一下怎么使用RxJava来异步加载数据。

    Observable.fromCallable(new Callable<User>() {
          @Override public User call() throws Exception {
             return UsersService.getUser(userId);
           }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<User>() {
          @Override public void onCompleted() {
            Log.d("Rx", "Completed");
          }
    
          @Override public void onError(Throwable e) {
            Log.d("Rx", e.getMessage());
          }
    
          @Override public void onNext(User user) {
            Log.d("Rx", user.getName());
          }
        });

这里subscribeOn(Schedulers.io())将使observable工作在新的线程,而observeOn(AndroidSchedulers.mainThread()))将使订阅者在主UI线程上去处理observable发送出来的结果。

这和AsyncTask很相似但是更简单更简洁。RxJava解决了我前面提到的所有问题。

使用Lambda后的代码:

    Observable.fromCallable(() -> UsersService.getUserIds())
         .flatMap(userIds -> Observable.from(userIds))
         .flatMap(userId -> Observable.just(UserService.getUser(userId))
         .subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread()))
         .subscribe(new Subscriber<User>() {
            @Override
            public void onCompleted() {
                Log.d("Rx", "emit","Completed!");
            }
    
            @Override
            public void onError(Throwable e) {
                Log.d("Rx", "emit", e.getMessage());
            }
    
            @Override
            public void onNext(User user) {
                Log.d("Rx", "emit", user.getName());
            }
    });

下面的图标描述了将一个含有单个字符串列表流变化成含有多个用户信息流的过程。

    -------{~~~~~~~~~~~~list of user ids [1,2,3,4,5]~~~~~~~~~}---|-->
       
               flatMap(userIds -> Observable.from(userIds))
    -------1------------2----------3------------4------------5---|--->
              flatMap (userId -> UserService.getUser(userId))
    ----user1--------user2------user3--------user4-------user5---|--->

参考文献

本文译自Howdy RxJava

上一篇 下一篇

猜你喜欢

热点阅读