使用RxJava构建一个常用的useCase

2022-01-24  本文已影响0人  Jamesbond_5521

使用RxJava构建一个常用的useCase,功能主要有2种
1、订阅rxjava对象
2、订阅一次rxjava对象
FlowableUseCase代码如下

package com.example.commonui.utils;

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;

public  class FlowableUseCase {

    public static  <T> Disposable listener(Flowable flowable, Action initiator, Consumer<T> callback) {
        if(initiator == null){
            return listener(flowable,callback);
        }
        return flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER)).map(o->(T)o)
                .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
    }

    public static  <T> Disposable listener(Flowable flowable, Consumer<T> callback) {
        return flowable.map(o->(T)o)
                .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
    }

    public static  <T> Disposable single(Flowable flowable, Action initiator, Consumer<T> callback) {
        SingleSubscriber subscriber = new SingleSubscriber() {
            @Override
            public void callback(Object o) {
                try {
                    if(callback!=null){
                        callback.accept((T) o);
                    }
                } catch (Throwable throwable) {
                    throwable.printStackTrace();
                }
            }
        };
        flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER))
                .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
        return subscriber;
    }

    public static  <T> Disposable single(Supplier<T> initiator, Consumer<T> callback) {

        SingleSubscriber subscriber = new SingleSubscriber() {
            @Override
            public void callback(Object o) {
                try {
                    if(callback!=null){
                        callback.accept((T) o);
                    }
                } catch (Throwable throwable) {
                    throwable.printStackTrace();
                }
            }
        };
        Flowable.create(emitter -> emitter.onNext(initiator.get()), BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
        return subscriber;
    }
    private static abstract class SingleSubscriber<T> extends DisposableSubscriber<T>{

        @Override
        protected void onStart() {
            request(1);
        }

        @Override
        public void onNext(T t) {
            callback(t);
        }

        public abstract void callback(T t);

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    }
}

FlowableUserCaseCaller代码如下

package com.example.commonui.utils;

import android.util.ArrayMap;

import androidx.lifecycle.Lifecycle;
import androidx.lifecycle.LifecycleEventObserver;
import androidx.lifecycle.LifecycleOwner;

import javax.inject.Inject;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;

public class FlowableUserCaseCaller {
    Flowable source;
    Action action;
    Supplier supplier;
    Consumer consumer;
    int flag;
    ArrayMap<Integer, Action> calls = new ArrayMap<>();
    CompositeDisposable disposable = new CompositeDisposable();

    @Inject
    public FlowableUserCaseCaller() {
        calls.put(1, () -> disposable.add(FlowableUseCase.listener(source, action, consumer)));
        calls.put(2, () -> disposable.add(FlowableUseCase.single(source, action, consumer)));
        calls.put(3, () -> disposable.add(FlowableUseCase.single(supplier, consumer)));
    }
    public FlowableUserCaseCaller listener(Flowable source) {
        flag = 1;
        this.source = source;
        return this;
    }
    public FlowableUserCaseCaller singleListener(Flowable source) {
        this.flag = 2;
        this.source = source;
        return this;
    }
    public <T> FlowableUserCaseCaller load(Supplier<T> supplier) {
        this.flag = 3;
        this.supplier = supplier;
        return this;
    }
    public static FlowableUserCaseCaller bind(LifecycleOwner owner){
        FlowableUserCaseCaller caseCaller = new FlowableUserCaseCaller();
        owner.getLifecycle().addObserver((LifecycleEventObserver) (source, event) -> {
            if (event == Lifecycle.Event.ON_DESTROY) {
                caseCaller.dispose();
            }
        });
        return caseCaller;
    }

    public FlowableUserCaseCaller load(Action action) {
        this.action = action;
        return this;
    }
    public <T> FlowableUserCaseCaller callback(Consumer<T> consumer) {
        this.consumer = consumer;
        return this;
    }
    public void start() {
        try {
            if (consumer != null) {
                calls.get(flag).run();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }

    public void dispose() {
        if (!disposable.isDisposed()) {
            disposable.dispose();
        }
    }
}

具体使用如下
1、listener

FlowableUserCaseCaller useCase = new FlowableUserCaseCaller ();
useCase.listener(userManager.name).callback(o -> {
            //通知界面更新
            name.setValue((String) o);
        }).start();

2、singleListener

useCase.singleListener(userManager.singleTest)
                .load(userManager::test)
                .callback(o -> {
                    name.setValue(o.toString());
                }).start();

3、单次执行方法无监听

useCase.load(userManager::getUser).callback((Consumer<Result<List<User>>>) o -> {
            if (o.isValid()) {
                datas.setValue(o.getData());
            } else {
                Log.i("xiaochangyan", "code:" + o.getCode() + " message:" + o.getMessage());
            }
            isRefreshing.notifyChange();
        }).start();

4、释放资源

useCase.dispose()
上一篇下一篇

猜你喜欢

热点阅读