RxJava 和 ReacticeX
简介
(本文持续更新中.....)
因为RxJava已经发布了很久了,所以网上有很多的教程,但是这些教程,都伴随这RxJava大版本更新,而变得让人摸不着头脑。
有些时候,这些教程已经不再适用与新版本,我们在使用的时候,往往都能碰到一些不能使用的api,就像这些文章,写的很好,但是我们看了之后第一个例子都没有办法用最新的包来使用。
https://blog.csdn.net/johnny901114/article/details/51042840
为了帮助大家理解,我想先提一下 RxJava 是如何实现的链式调用:其实 RxJava 只不过是一个静态方法的集合。
如果还不能理解的话,就可以理解为 Observable 就是一个包含无数个静态方法的类。
标注
本文为本人学习参考,还有很多不足,如果大家感觉我写的有问题的话,可以在评论里指出。非常感谢大家的支持。
如果感觉阅读困难,那么请绕道其他文章。
前引
C 10k问题
C10K问题是优化网络套接字以同时处理大量客户端的问题,名称C10k是同事处理一万个链接的代名词。请注意,并发连接与每秒请求数不同,尽管他们很相似: 每秒处理许多请求需要高吞吐量,而大量并发连接数则需要对连接进行有效的调度。
不会吧,到现在还有程序员不知道c10k问题的吗
https://zhuanlan.zhihu.com/p/23114695 来自知乎的文章。
http://www.kegel.com/c10k.html C10k问题的英文参考网站。
Reactive
异步编程,一种全新的编程方式。基于观察者模式,函数式编程,流式数据处理。从而提出的一种全新的编程方式。使用异步的方式进行数据处理,可以提高计算机的并发效率,因为异步非阻塞的方式可以最大限度的压榨计算机的性能。
基础
泛型基础
通配符(extends and super)
https://www.zhihu.com/question/20400700
如何查阅资料
在官网的Docs的链接中,我们可以看到五个分类,分别是(这里是直接拷贝的官网的描述,不懂的请自行翻译):
Observable
: In ReactiveX an observer subscribes to an Observable.
Operators
:Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementation tends to name its operators to resemble those of similar methods that are already familiar from other contexts in that language.
Single
:A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.
Subject
:A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
Scheduler
:If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
我们可以根据所对应的问题去到专门的专题中找到自己想要的结果。ReactiveX 对所有的异步编程的模块进行了分类。
Observable
说起 RxJava 前提是这些被称作 Base Class 的对象。这些是学习 RxJava 的基础。
Flowable and Observable
io.reactivex.rxjava3.core.Flowable
:0..N flows,supporting Reactive_Streams and backpressure.
io.reactivex.rxjava3.core.Observable
:0..N flows,no backpressure.
通常,我们认为Observable 和 Flowable 是相同的,但是Flowable是支持背压的数据源。
Single、Maybe、Completable
io.reactivex.rxjava3.core.Single
:a flow of exactly 1 item or an error.
io.reactivex.rxjava3.core.Completable
: a flow without items but only a completion or error signal.
io.reactivex.rxjava3.core.Maybe
: a flow with no items,exactly one item or an error.
代码参考
Observable<Task> taskObservable = Observable.fromIterable(DataSource.createTasksList())
.subscribeOn(Schedulers.io())
.filter(task -> {
Log.d(TAG, "test: " + Thread.currentThread().getName());
Thread.sleep(1000);
return task.isComplete();
}).flatMap(observable -> new Observable<Task>() {
@Override
protected void subscribeActual(@NonNull Observer<? super Task> observer) {
}
})
.retry()
.cache()
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: called.");
}
@Override
public void onNext(@NonNull Task task) {
Log.d(TAG, "onNext: " + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + task.getDescription());
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: called.");
}
});
Operators
ReactiveX 把我们常见的操作符分成了11中操作
-
Creating Observables (创建可观察者操作符)
-
Transforming Observables (变换可观察者操作符)
-
Filtering Observables (过滤可观察者操作符)
-
Combining Observables (合并可观察者操作符)
-
Error Handling Operators (异常处理操作符)
-
Observable Utility Operators (可观察者工具操作)
-
Conditional and Boolean Operators (条件操作和布尔操作)
-
Mathematical and Aggregate Operators (数学和聚合操作)
-
Backpressure Operators (背压操作)
-
Connectable Observable Operators (可连接的观察者操作)
-
Operators to Convert Observables (转化操作)(状态的迁移)
其他
Obserable 文章的最后部分,也给出了 ReactiveX 异步编程的范式
"A Decision Tree of Observable Operators"
我们可以通过参考这些做法来找到我们想要的操作符,以达到我们想要的效果。
Single
Subject
Scheduler
线程调度器
高级
Backpressure
通常我们所说的Backpressure一般是指数据源产生的数据过快的现象
这里参考一篇知乎上大神的文章
https://www.zhihu.com/question/49618581
一般的,解决背压的方式有两种:
有损式背压控制
无损背压控制
异常处理
DAN LEW 大佬的博客
https://blog.danlew.net/2015/12/08/error-handling-in-rxjava/
官方文档参考
http://reactivex.io/RxJava/javadoc/rx/exceptions/Exceptions.html
从源代码出发来寻找我们需要的东西
提前声明,阅读源码需要英语基础,如果基础不好的话,阅读起来会比较难一点,可以借助工具来帮助阅读,但是效果不如直接阅读英文来的直接。
http://reactivex.io/RxJava/3.x/javadoc/
参考网站
Observable 、Flowable、Maybe、Single、Completable
https://bugfender.com/blog/data-flows-in-rxjava2-observable-flowable-single-maybe-completable/
函数式编程
https://www.tutorialspoint.com/functional_programming/functional_programming_introduction.htm
Reactivex官网
http://reactivex.io/
RxJava、RxAndroid最佳实践
https://github.com/JoaoMotondon/RxJavaDemoApp
https://github.com/kaushikgopal/RxJava-Android-Samples
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
参考文章
RxJava 异步操作 CSDN 文档
https://blog.csdn.net/nicolelili1/article/details/52181469
C# 写的 101 个例子
http://rxwiki.wikidot.com/101samples
微软官方关于Reactive Extensions的参考
https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions
异步IO
异步I/O是计算机操作系统对输入输出的一种处理方式:发起I/O请求的线程不等I/O操作完成,就继续执行随后的代码,I/O结果用其他方式通知发起I/O请求的程序。与异步I/O相对的是更为常见的“同步(阻塞)I/O”:发起I/O请求的线程不从正在调用的I/O操作函数返回(即被阻塞),直至I/O操作完成。
Unix 中异步使用使用aio_write
,aio_read
Unix 异步和Window中的异步不相同,Windows中的异步方式选择更多。
wikipad 链接
https://zh.wikipedia.org/wiki/%E7%95%B0%E6%AD%A5I/O