Rx-Java: create方法的源码阅读
2019-12-17 本文已影响0人
木头与琉璃
在现实中我们经常会说,问题应该在何时,何地,由谁 做点什么来解决。假设发生了一个事故,事故(Observable)应该由有关部门(emitter)的具体人员(Observer)来按照某种流程(onNext)来处理,结果是处理成功(onComplate)或者处理失败(onError)。而事故和有关部门要有联系(subscribe)才能处理。
源码整理
- create方法的入参是个ObservableOnSubscribe的函数接口,出参是Observable这个函数接口的实现类ObservableCreate,而ObservableCreate中实现了父类Observable的抽象方法subscribeActual,subscribeActual方法主要使用了CreateEmitter这个内部类来实现。而subscribeActual是Observable.subscribe(Observer)时会调用来使Observable将元素发送出去。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
- CreateEmitter
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
}
- ObservableOnSubscribe的subscribe入参是ObservableEmitter,ObservableEmitter继承于io.reactivex.Emitter。这些都是函数式接口。 其实现类主要是上面的CreateEmitter,作用是用来下发数据。
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
public interface ObservableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous {@link Disposable}
* or {@link Cancellable} will be disposed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(@Nullable Disposable d);
/**
* Sets a Cancellable on this emitter; any previous {@link Disposable}
* or {@link Cancellable} will be disposed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(@Nullable Cancellable c);
/**
* Returns true if the downstream disposed the sequence or the
* emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a
* successful {@link #tryOnError(Throwable)}.
* <p>This method is thread-safe.
* @return true if the downstream disposed the sequence or the emitter was terminated
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
@NonNull
ObservableEmitter<T> serialize();
/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* <p>History: 2.1.1 - experimental
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.2
*/
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}