AndroidRxjava学习安卓精华教程

Android RxJava:细说 线程控制(切换 / 调度 )

2017-11-14  本文已影响4313人  Carson带你学安卓

前言

Github截图

如果还不了解RxJava,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程

希望你们会喜欢。

  1. 本系列文章主要基于 Rxjava 2.0
  2. 接下来的时间,我将持续推出 AndroidRxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注Carson_Ho的安卓开发笔记!!
示意图

目录

示意图

1. RxJava线程控制(调度 / 切换)的作用是什么?

指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型。


2. 为什么要进行RxJava线程控制(调度 / 切换)?

2.1 背景

即,若被观察者 (Observable) / 观察者(Observer)在主线程被创建,那么他们的工作(生产事件 / 接收& 响应事件)就会发生在主线程

下面请看1个RxJava的基础使用

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 步骤1:创建被观察者 Observable & 发送事件
        // 在主线程创建被观察者 Observable 对象
        // 所以生产事件的线程是:主线程

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, " 被观察者 Observable的工作线程是: " + Thread.currentThread().getName());
                // 打印验证
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

// 步骤2:创建观察者 Observer 并 定义响应事件行为
        // 在主线程创建观察者 Observer 对象
        // 所以接收 & 响应事件的线程是:主线程
        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
                Log.d(TAG, " 观察者 Observer的工作线程是: " + Thread.currentThread().getName());
                // 打印验证

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };

        // 步骤3:通过订阅(subscribe)连接观察者和被观察者
        observable.subscribe(observer);
    }
}
示意图

2.2 冲突

示意图

2.3 解决方案

所以,为了解决上述冲突,即实现 真正的异步操作,我们需要对RxJava进行 线程控制(也称为调度 / 切换)


3. 实现方式

采用 RxJava内置的线程调度器Scheduler ),即通过 功能性操作符subscribeOn() & observeOn()实现

3.1 功能性操作符subscribeOn() & observeOn()简介

类型 含义 应用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 耗时等操作
Schedulers.io() io操作线程 网络请求、读写文件等io密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

3.2 具体使用


<-- 使用说明 -->
  // Observable.subscribeOn(Schedulers.Thread):指定被观察者 发送事件的线程(传入RxJava内置的线程类型)
  // Observable.observeOn(Schedulers.Thread):指定观察者 接收 & 响应事件的线程(传入RxJava内置的线程类型)

<-- 实例使用 -->
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
        observable.subscribeOn(Schedulers.newThread()) // 1. 指定被观察者 生产事件的线程
                  .observeOn(AndroidSchedulers.mainThread())  // 2. 指定观察者 接收 & 响应事件的线程
                  .subscribe(observer); // 3. 最后再通过订阅(subscribe)连接观察者和被观察者

1. 若Observable.subscribeOn()多次指定被观察者 生产事件的线程,则只有第一次指定有效,其余的指定线程无效
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
        observable.subscribeOn(Schedulers.newThread()) // 第一次指定被观察者线程 = 新线程
                  .subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定被观察者线程 = 主线程
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(observer);
示意图
2. 若Observable.observeOn()多次指定观察者 接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
        observable.subscribeOn(Schedulers.newThread())
                  .observeOn(AndroidSchedulers.mainThread()) // 第一次指定观察者线程 = 主线程
                  .doOnNext(new Consumer<Integer>() { // 生产事件
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "第一次观察者Observer的工作线程是: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.newThread()) // 第二次指定观察者线程 = 新的工作线程
                .subscribe(observer); // 生产事件


// 注:
// 1. 整体方法调用顺序:观察者.onSubscribe()> 被观察者.subscribe()> 观察者.doOnNext()>观察者.onNext()>观察者.onComplete() 
// 2. 观察者.onSubscribe()固定在主线程进行
示意图

4. 具体实例

下面,我将采用最常见的 Retrofit + RxJava 实现 网络请求 的功能,从而说明 RxJava的线程控制的具体应用

4.1 功能说明

  1. 先切换到工作线程 发送网络请求
  2. 再切换到主线程进行 UI更新
金山词典

4.2 步骤说明

  1. 添加依赖
  2. 创建 接收服务器返回数据 的类
  3. 创建 用于描述网络请求 的接口(区别于传统形式)
  4. 创建 Retrofit 实例
  5. 创建 网络请求接口实例 并 配置网络请求参数(区别于传统形式)
  6. 发送网络请求(区别于传统形式)
  7. 发送网络请求
  8. 对返回的数据进行处理

本实例侧重于说明 RxJava 的线程控制,关于Retrofit的使用请看文章:这是一份很详细的 Retrofit 2.0 使用教程(含实例讲解)

4.3 步骤实现

步骤1: 添加依赖

a. 在 Gradle加入Retrofit库的依赖

build.gradle

dependencies {

// Android 支持 Rxjava
// 此处一定要注意使用RxJava2的版本
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

// Android 支持 Retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'

// 衔接 Retrofit & RxJava
// 此处一定要注意使用RxJava2的版本
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

// 支持Gson解析
compile 'com.squareup.retrofit2:converter-gson:2.1.0'

}

b. 添加 网络权限
AndroidManifest.xml

<uses-permission android:name="android.permission.INTERNET"/>
步骤2:创建 接收服务器返回数据 的类
// URL模板
http://fy.iciba.com/ajax.php

// URL实例
http://fy.iciba.com/ajax.php?a=fy&f=auto&t=auto&w=hello%20world

// 参数说明:
// a:固定值 fy
// f:原文内容类型,日语取 ja,中文取 zh,英语取 en,韩语取 ko,德语取 de,西班牙语取 es,法语取 fr,自动则取 auto
// t:译文内容类型,日语取 ja,中文取 zh,英语取 en,韩语取 ko,德语取 de,西班牙语取 es,法语取 fr,自动则取 auto
// w:查询内容
API格式说明

Translation.java

public class Translation {
    private int status;

    private content content;
    private static class content {
        private String from;
        private String to;
        private String vendor;
        private String out;
        private int errNo;
    }

    //定义 输出返回数据 的方法
    public void show() {
        System.out.println( "Rxjava翻译结果:" + status);
        System.out.println("Rxjava翻译结果:" + content.from);
        System.out.println("Rxjava翻译结果:" + content.to);
        System.out.println("Rxjava翻译结果:" + content.vendor);
        System.out.println("Rxjava翻译结果:" + content.out);
        System.out.println("Rxjava翻译结果:" + content.errNo);
    }
}
步骤3:创建 用于描述网络请求 的接口

采用 注解 + Observable<...>接口描述 网络请求参数

GetRequest_Interface.java

public interface GetRequest_Interface {

    @GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
    Observable<Translation> getCall();
     // 注解里传入 网络请求 的部分URL地址
    // Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
    // 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
    // 采用Observable<...>接口 
    // getCall()是接受网络请求数据的方法
}
接下来的步骤均在MainActivity.java内实现(请看注释)

MainActivity.java

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        //步骤4:创建Retrofit对象
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
                .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                .build();

        // 步骤5:创建 网络请求接口 的实例
        GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

        // 步骤6:采用Observable<...>形式 对 网络请求 进行封装
        Observable<Translation> observable = request.getCall();

        // 步骤7:发送网络请求
        observable.subscribeOn(Schedulers.io())               // 在IO线程进行网络请求
                  .observeOn(AndroidSchedulers.mainThread())  // 回到主线程 处理请求结果
                  .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Translation result) {
                        // 步骤8:对返回的数据进行处理
                        result.show() ;
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "请求失败");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "请求成功");
                    }
                });
    }
}

4.4 测试结果

示意图

4.5 Demo地址

Carson_Ho的Github地址 = RxJava2实战系列:线程控制


5. 特别注意

5.1 依赖包问题

示意图

build.gradle

android {
    ...
    packagingOptions {
        exclude 'META-INF/rxjava.properties'
    }
}

5.2 应用程序崩溃问题

当出现多个Disposable时,可采用RxJava内置容器CompositeDisposable进行统一管理

// 添加Disposable到CompositeDisposable容器
CompositeDisposable.add()

// 清空CompositeDisposable容器
CompositeDisposable.clear() 

6. 总结


请点赞!因为你的鼓励是我写作的最大动力!

相关文章阅读


欢迎关注Carson_Ho的简书!

不定期分享关于安卓开发的干货,追求短、平、快,但却不缺深度

上一篇 下一篇

猜你喜欢

热点阅读