Android 开发进阶

RxJava3 Subject 用法及相关源码

2021-12-17  本文已影响0人  JeffreyWorld
RxJava3 Subject

说到 Subject(主题) ,很多人可能都不是很熟悉它,因为相对于 RxJava 的 Observable、Schedulers、Subscribes 等关键字来讲,它抛头露面的场合似乎比较少。

事实上,Subject 作用是很大的,借用官方的解释,Subject 在同一时间内,既可以作为 Observable,也可以作为Observer:

在RxJava3.x中,官方一共为我们提供了以下几种Subject:
ReplaySubject (释放接收到的所有数据)
BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据)
PublishSubject (释放订阅后接收到的数据)
AsyncSubject (仅释放接收到的最后一个数据)
SerializedSubject(串行Subject)
UnicastSubject (仅支持订阅一次的Subject)


/**
 * Represents an {@link Observer} and an {@link Observable} at the same time, allowing
 * multicasting events from a single source to multiple child {@code Observer}s.
 * <p>
 * All methods except the {@link #onSubscribe(io.reactivex.rxjava3.disposables.Disposable)}, {@link #onNext(Object)},
 * {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    @CheckReturnValue
    public abstract boolean hasObservers();

    /**
     * Returns true if the subject has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through an error event
     * @see #getThrowable()
     * @see #hasComplete()
     */
    @CheckReturnValue
    public abstract boolean hasThrowable();

    /**
     * Returns true if the subject has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    @CheckReturnValue
    public abstract boolean hasComplete();

    /**
     * Returns the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet
     */
    @Nullable
    @CheckReturnValue
    public abstract Throwable getThrowable();

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    @CheckReturnValue
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<>(this);
    }
}

下面用 PublishSubject (释放订阅后接收到的数据),举几个例子:

public final class PublishSubject<T> extends Subject<T> {

    static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
    static final PublishDisposable[] EMPTY = new PublishDisposable[0];
    final AtomicReference<PublishDisposable<T>[]> subscribers;
    Throwable error;
    /**
     * Constructs a PublishSubject.
     * @param <T> the value type
     * @return the new PublishSubject
     */
    public static <T> PublishSubject<T> create() {
        return new PublishSubject<>();
    }

    /**
     * Constructs a PublishSubject.
     * @since 2.0
     */
    PublishSubject() {
        subscribers = new AtomicReference<>(EMPTY);
    }

    @Override
    protected void subscribeActual(Observer<? super T> t) {
        PublishDisposable<T> ps = new PublishDisposable<>(t, this);
        t.onSubscribe(ps);
        if (add(ps)) {
            // if cancellation happened while a successful add, the remove() didn't work
            // so we need to do it again
            if (ps.isDisposed()) {
                remove(ps);
            }
        } else {
            Throwable ex = error;
            if (ex != null) {
                t.onError(ex);
            } else {
                t.onComplete();
            }
        }
    }

    /**
     * Tries to add the given subscriber to the subscribers array atomically
     * or returns false if the subject has terminated.
     * @param ps the subscriber to add
     * @return true if successful, false if the subject has terminated
     */
    boolean add(PublishDisposable<T> ps) {
        for (;;) {
            PublishDisposable<T>[] a = subscribers.get();
            if (a == TERMINATED) {
                return false;
            }

            int n = a.length;
            @SuppressWarnings("unchecked")
            PublishDisposable<T>[] b = new PublishDisposable[n + 1];
            System.arraycopy(a, 0, b, 0, n);
            b[n] = ps;

            if (subscribers.compareAndSet(a, b)) {
                return true;
            }
        }
    }

    /**
     * Atomically removes the given subscriber if it is subscribed to the subject.
     * @param ps the subject to remove
     */
    void remove(PublishDisposable<T> ps) {
        for (;;) {
            PublishDisposable<T>[] a = subscribers.get();
            if (a == TERMINATED || a == EMPTY) {
                return;
            }

            int n = a.length;
            int j = -1;
            for (int i = 0; i < n; i++) {
                if (a[i] == ps) {
                    j = i;
                    break;
                }
            }

            if (j < 0) {
                return;
            }

            PublishDisposable<T>[] b;

            if (n == 1) {
                b = EMPTY;
            } else {
                b = new PublishDisposable[n - 1];
                System.arraycopy(a, 0, b, 0, j);
                System.arraycopy(a, j + 1, b, j, n - j - 1);
            }
            if (subscribers.compareAndSet(a, b)) {
                return;
            }
        }
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (subscribers.get() == TERMINATED) {
            d.dispose();
        }
    }

    @Override
    public void onNext(T t) {
        ExceptionHelper.nullCheck(t, "onNext called with a null value.");
        for (PublishDisposable<T> pd : subscribers.get()) {
            pd.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
        if (subscribers.get() == TERMINATED) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;

        for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
            pd.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (subscribers.get() == TERMINATED) {
            return;
        }
        for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
            pd.onComplete();
        }
    }

    @Override
    @CheckReturnValue
    public boolean hasObservers() {
        return subscribers.get().length != 0;
    }

    @Override
    @Nullable
    @CheckReturnValue
    public Throwable getThrowable() {
        if (subscribers.get() == TERMINATED) {
            return error;
        }
        return null;
    }

    @Override
    @CheckReturnValue
    public boolean hasThrowable() {
        return subscribers.get() == TERMINATED && error != null;
    }

    @Override
    @CheckReturnValue
    public boolean hasComplete() {
        return subscribers.get() == TERMINATED && error == null;
    }

    /**
     * Wraps the actual subscriber, tracks its requests and makes cancellation
     * to remove itself from the current subscribers array.
     *
     * @param <T> the value type
     */
    static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {

        private static final long serialVersionUID = 3562861878281475070L;
        /** The actual subscriber. */
        final Observer<? super T> downstream;
        /** The subject state. */
        final PublishSubject<T> parent;

        /**
         * Constructs a PublishSubscriber, wraps the actual subscriber and the state.
         * @param actual the actual subscriber
         * @param parent the parent PublishProcessor
         */
        PublishDisposable(Observer<? super T> actual, PublishSubject<T> parent) {
            this.downstream = actual;
            this.parent = parent;
        }

        public void onNext(T t) {
            if (!get()) {
                downstream.onNext(t);
            }
        }

        public void onError(Throwable t) {
            if (get()) {
                RxJavaPlugins.onError(t);
            } else {
                downstream.onError(t);
            }
        }

        public void onComplete() {
            if (!get()) {
                downstream.onComplete();
            }
        }

        @Override
        public void dispose() {
            if (compareAndSet(false, true)) {
                parent.remove(this);
            }
        }

        @Override
        public boolean isDisposed() {
            return get();
        }
    }
}

第一个例子:在申请应用敏感权限时的使用

>RxPermissionsFragment类

    //包含所有当前的权限请求。一旦被授予或拒绝,他们就会被移除。
    private Map<String, PublishSubject<Permission>> mSubjects = new HashMap<>();

    void onRequestPermissionsResult(String[] permissions, int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
        for (int i = 0, size = permissions.length; i < size; i++) {
            log("onRequestPermissionsResult  " + permissions[i]);
            //找到相应的 subject
            PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
            if (subject == null) {
                // 没有找到主题
                Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
                return;
            }
            mSubjects.remove(permissions[i]);
            boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
            subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
            subject.onComplete();
        }
    }

...省略部分代码

    public PublishSubject<Permission> getSubjectByPermission(@NonNull String permission) {
        return mSubjects.get(permission);
    }

    public void setSubjectForPermission(@NonNull String permission, @NonNull PublishSubject<Permission> subject) {
        mSubjects.put(permission, subject);
    }


>RxPermissions类

    @TargetApi(Build.VERSION_CODES.M)
    private Observable<Permission> requestImplementation(final String... permissions) {
        List<Observable<Permission>> list = new ArrayList<>(permissions.length);
        List<String> unrequestedPermissions = new ArrayList<>();

        // 在多个权限的情况下,我们为每个权限创建一个 Observable。
        // 最后,将观察到的值组合在一起以产生唯一的响应。
        for (String permission : permissions) {
            mRxPermissionsFragment.get().log("Requesting permission " + permission);
            if (isGranted(permission)) {
                // 已授予,或未授予 Android M
                // 返回授予的权限对象。
                list.add(Observable.just(new Permission(permission, true, false)));
                continue;
            }

            if (isRevoked(permission)) {
                // 被策略撤销,返回一个被拒绝的 Permission 对象。
                list.add(Observable.just(new Permission(permission, false, false)));
                continue;
            }

            PublishSubject<Permission> subject = mRxPermissionsFragment.get().getSubjectByPermission(permission);
            // 如果不存在则创建一个新主题
            if (subject == null) {
                unrequestedPermissions.add(permission);
                subject = PublishSubject.create();
                mRxPermissionsFragment.get().setSubjectForPermission(permission, subject);
            }

            list.add(subject);
        }

        if (!unrequestedPermissions.isEmpty()) {
            String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]);
            requestPermissionsFromFragment(unrequestedPermissionsArray);
        }
        return Observable.concat(Observable.fromIterable(list));
    }

上面的代码,是应用申请相关隐私权限时的业务逻辑。 Map<String, PublishSubject<Permission>> mSubjects 里用一个Map,含有若干个用 PublishSubject 包裹的 Permission。在申请或拒绝完权限后,释放订阅后接收到的数据。

第二个例子:在应用首页,业务逻辑比较多,需要要按先后顺序处理对应逻辑,并做相应的 UI 展示。

>IndexFragment类

    private PublishSubject<List<ListBean>> mBannerPublishSubject; //banner
    private PublishSubject<Object> mGuidePublishSubject; //首页引导
    private PublishSubject<Object> mSignAwardPublishSubject; //签到奖励

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        mBannerPublishSubject = PublishSubject.create();
        mGuidePublishSubject= PublishSubject.create();
        mMonthVipPublishSubject = PublishSubject.create();
        Observable.concat(mGuidePublishSubject, mBannerPublishSubject, mSignAwardPublishSubject)
                .subscribe(new DefaultObserver<Object>() {
                    @Override
                    public void onNext(@io.reactivex.rxjava3.annotations.NonNull Object object) {
                    }

                    @Override
                    public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        //服务端状态为已完成,本地没结束引导,不重置引导状态
                        if (NewUserGuideUtils.isServerGuideEnd()) {
                            NewUserGuideUtils.setFirstLocalGuideEnd();
                            showDialogAfterGuide();
                        } else {
                            showNewGuideDialog();
                        }
                    }
                });
    }
    ...省略部分代码

    //当首页所需的banner数据取到时
    pravite void getBannerDataSuccess(){
         mBannerPublishSubject.onComplete();
    }

    //当首页所需的用户信息数据取到时
    pravite void getUserInfoDataSuccess(){
        mGuidPublishSubject.onNext(this);
        mGuidPublishSubject.onComplete();
    }
    
    //当首页所需的签到奖励的数据取到时
    pravite void getSignAwardDataSuccess(){
        mSignAwardPublishSubject.onNext(this);
        mSignAwardPublishSubject.onComplete();
    }

下面用 BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据),举个例子:

我们正在开发的几个项目,都用到了这个开源库 RxLifecyclehttps://github.com/trello/RxLifecycle
官方对这个库的原话解释:Lifecycle handling APIs for Android apps using RxJava
其中 RxActivity 的源码中,就要到了 BehaviorSubject。这个 BehaviorSubject 会在不同的生命周期发射不同的 ActivityEvent ,比如在 onCreate() 生命周期发射 ActivityEvent.CREATE ,在 onStop() 发射 ActivityEvent.STOP

public final class BehaviorSubject<T> extends Subject<T> {

    final AtomicReference<Object> value; //原子操作类,当前接收到的最后一个数据

    final AtomicReference<BehaviorDisposable<T>[]> observers;//原子操作类,BehaviorDisposable内部存储了所有接受到的数据

    @SuppressWarnings("rawtypes")
    static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];//标记,意味着一个空的BehaviorDisposable

    @SuppressWarnings("rawtypes")
    static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0]; //标记,意味着已经达到了TERMINATED,终止数据的发射
    final ReadWriteLock lock;
    final Lock readLock;
    final Lock writeLock;

    final AtomicReference<Throwable> terminalEvent;

    ...省略

}
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {

    private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();

    @Override
    @NonNull
    @CheckResult
    public final Observable<ActivityEvent> lifecycle() {
        return lifecycleSubject.hide();
    }

    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }

    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }

    @Override
    @CallSuper
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifecycleSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    @CallSuper
    protected void onStart() {
        super.onStart();
        lifecycleSubject.onNext(ActivityEvent.START);
    }

    @Override
    @CallSuper
    protected void onResume() {
        super.onResume();
        lifecycleSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    @CallSuper
    protected void onPause() {
        lifecycleSubject.onNext(ActivityEvent.PAUSE);
        super.onPause();
    }

    @Override
    @CallSuper
    protected void onStop() {
        lifecycleSubject.onNext(ActivityEvent.STOP);
        super.onStop();
    }

    @Override
    @CallSuper
    protected void onDestroy() {
        lifecycleSubject.onNext(ActivityEvent.DESTROY);
        super.onDestroy();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读