RxJava操作符(创建)
2019-02-13 本文已影响0人
Charein
包括
create
,defer
,just
,from
,fromArray
,fromIterable
,fromPublisher
create
image.png通过以编程方式调用observer方法从头创建一个Observable,参考http://reactivex.io/documentation/operators/create.html
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
例如:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onNext("#" + i);
}
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});
// 输出
// System.out: accept: #0
// System.out: accept: #
// System.out: accept: #2
defer
image.png在观察者订阅之前不要创建Observable,并为每个观察者创建一个新的Observable。
Defer运算符等待观察者订阅它,然后它生成一个Observable,通常带有Observable工厂函数。 它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它订阅了相同的Observable,但实际上每个订阅者都获得其自己的单独序列。
在某些情况下,等到最后一分钟(即直到订阅时间)生成Observable可以确保此Observable包含最新的数据。
参考:http://reactivex.io/documentation/operators/defer.html
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier);
例如
private String msg = "#0";
@SuppressLint("CheckResult")
public void testDefer() throws InterruptedException {
// 不停的更新msg内容
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0; i<10; i++) {
msg = "#" + i;
System.out.println("msg: " + msg);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(msg);
}
});
// Observable<String> observable = Observable.just(msg);
TimeUnit.MILLISECONDS.sleep(1000);
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});
// 输出
// System.out: msg: #0
// System.out: msg: #1
// System.out: msg: #2
// System.out: msg: #3
// System.out: accept: #3
// System.out: msg: #4
// System.out: msg: #5
// System.out: msg: #6
// System.out: msg: #7
// System.out: msg: #8
// System.out: msg: #9
}
如果把defer换成just,输出结果将为
// System.out: msg: #0
// System.out: msg: #1
// System.out: msg: #2
// System.out: msg: #3
// System.out: accept: #0
// System.out: msg: #4
// System.out: msg: #5
// System.out: msg: #6
// System.out: msg: #7
// System.out: msg: #8
// System.out: msg: #9
just
public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
// 最多可以带10个参数
例如
Observable.just("#0")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #0
System.out.println("accept: " + s);
}
});
fromArray
public static <T> Observable<T> fromArray(T... items)
例如
Observable.fromArray("#0", "#1", "#2")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #0
// System.out: accept: #1
// System.out: accept: #2
System.out.println("accept: " + s);
}
});
fromIterable
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
例如
Observable.fromIterable(new Iterable<String>() {
@NonNull
@Override
public Iterator<String> iterator() {
List<String> list = new ArrayList<>();
list.add("#0");
list.add("#1");
list.add("#2");
return list.iterator();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #0
// System.out: accept: #1
// System.out: accept: #2
System.out.println("accept: " + s);
}
});
fromPublisher
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
例如
Observable.fromPublisher(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
for (int i = 0; i < 4; i++) {
s.onNext("#" + i);
}
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #0
// System.out: accept: #1
// System.out: accept: #2
System.out.println("accept: " + s);
}
});