Android开发Android技术知识Android开发

Rxjava源码解析笔记 | 创建Observable 与 Ob

2019-06-04  本文已影响8人  凌川江雪

简单回顾

如果抛开Rxjava的操作符以及其线程控制的话,Rxjava的最基本使用是比较简单的
第一步,创建被观察者Observable;
第二步,创建观察者Observer/Subscriber;
第三步,subscribe;

三个关键对象和一个核心方法

Rxjava源码相关分析

Observable类中的create()源码
public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

    @Beta
    public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
        return new Observable<T>(hook.onCreate(syncOnSubscribe));
    }

    @Experimental
    public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
        return new Observable<T>(hook.onCreate(asyncOnSubscribe));
    }

    ...

}



观察者创建以及订阅实例代码:

    //第二步:创建观察者
    Observer<Object> observer = new Observer<Object>() {

        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Object s) {

        }
    };

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onStart() {
            super.onStart();
        }
    };

    public void doRxjava(){
        //第三步:订阅
        observable.subscribe(observer);
        observable.subscribe(subscriber);
    }

接下来是Rxjava的SDKsubscribe()的传入参数
Observer时候(observable.subscribe(observer);)的源码:

    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        return subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(T t) {
                observer.onNext(t);
            }

        });
    }

Rxjava的SDKsubscribe()的传入参数
Subscriber时候(observable.subscribe(subscriber);)的源码:

   public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

Subscriber源码分析
package rx;

import rx.internal.util.SubscriptionList;


public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    // represents requested not set yet
    private static final Long NOT_SET = Long.MIN_VALUE;

    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
    /* protected by `this` */
    private Producer producer;
    /* protected by `this` */
    private long requested = NOT_SET; // default to not set

    protected Subscriber() {
        this(null, false);
    }

    protected Subscriber(Subscriber<?> subscriber) {
        this(subscriber, true);
    }

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
        // do nothing by default
    }
    
    protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        } 
        
        // if producer is set then we will request from it
        // otherwise we increase the requested count by n
        Producer producerToRequestFrom = null;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }

    private void addToRequested(long n) {
        if (requested == NOT_SET) {
            requested = n;
        } else { 
            final long total = requested + n;
            // check if overflow occurred
            if (total < 0) {
                requested = Long.MAX_VALUE;
            } else {
                requested = total;
            }
        }
    }

    public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }
}
public abstract class Subscriber<T> implements Observer<T>, Subscription {...
package rx;

import rx.subscriptions.Subscriptions;

public interface Subscription {
    void unsubscribe();

    boolean isUnsubscribed();
}

以上则是Subscriber的核心逻辑;


第三步,下面具体分析订阅的实现

下面小结一下,call()方法在subscribe()方法之中被调用
(从下面subscribe()方法的源码中可以见得),

也即默认subscribe()方法被调用之后
observable.subscribe(observer);或者observable.subscribe(subscriber);),

触发创建Observable时候实现的OnSubscribe<>中的call()方法,
完成回调;
call()方法中可以实现一系列事件消费的过程——onNext()、onCompleted()等;

//第一步:创建被观察者:create
   Observable observable = Observable.create(new >Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {
           subscriber.onNext("Hello");
           subscriber.onNext("Imooc");
           subscriber.onCompleted();
       }
   });

   //通过just方法来创建被观察者
   Observable observableJust = Observable.just("hello", "Imooc");

   //通过from方法来创建被观察者
   String[] parameters = {"hello", "Imooc"};
   Observable observableFrom = Observable.from(parameters);

以上便是Rxjava最基本的使用方式

Rxjava的SDKsubscribe()的传入参数
Subscriber时候(observable.subscribe(subscriber);)的源码:

   public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        
        subscriber.onStart();

        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        try {
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);

                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                hook.onSubscribeError(r);
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

接着,

if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }

public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    boolean done = false;

    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

    @Override
    public void onCompleted() {
        if (!done) {
            done = true;
            try {
                actual.onCompleted();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPluginUtils.handleException(e);
                throw new OnCompletedFailedException(e.getMessage(), e);
            } finally {
                try {
                    unsubscribe();
                } catch (Throwable e) {
                    RxJavaPluginUtils.handleException(e);
                    throw new UnsubscribeFailedException(e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void onError(Throwable e) {
        Exceptions.throwIfFatal(e);
        if (!done) {
            done = true;
            _onError(e);
        }
    }


    @Override
    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            Exceptions.throwOrReport(e, this);
        }
    }

    protected void _onError(Throwable e) {
        RxJavaPluginUtils.handleException(e);
        try {
            actual.onError(e);
        } catch (Throwable e2) {
            if (e2 instanceof OnErrorNotImplementedException) {
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
                }
                throw (OnErrorNotImplementedException) e2;
            } else {
                RxJavaPluginUtils.handleException(e2);
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
                }

                throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
            }
        }
        try {
            unsubscribe();
        } catch (RuntimeException unsubscribeException) {
            RxJavaPluginUtils.handleException(unsubscribeException);
            throw new OnErrorFailedException(unsubscribeException);
        }
    }

    public Subscriber<? super T> getActual() {
        return actual;
    }
}



参考自 慕课网

上一篇 下一篇

猜你喜欢

热点阅读