RxJava 中的设计模式(三)代理模式之切换线程实现

2021-06-27  本文已影响0人  蓝笔头

代理模式介绍

在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。

注意事项:

实现

UML 类图
public interface Service {
    void handle();
}

@Slf4j
public class RealService implements Service{
    @Override
    public void handle() {
        log.info("I am the RealService");
    }
}

@Slf4j
public class ProxyService implements Service {
    private Service realService;

    public ProxyService(Service realService) {
        this.realService = realService;
    }

    @Override
    public void handle() {
        new Thread(() -> {
            this.realService.handle();
        }).start();
    }
}

测试:

public class Main {

    public static void main(String[] args) {
        Service realService = new RealService();
        realService.handle();

        ProxyService proxyService = new ProxyService(realService);
        proxyService.handle();
    }
}

输出结果:

10:53:10.375 [main] INFO org.company.pattern.proxy.RealService - I am the RealService
10:53:10.425 [Thread-0] INFO org.company.pattern.proxy.RealService - I am the RealService

RxJava 切换线程实现

通过代理模式实现 RxJava 中的线程切换代码如下文所示。

订阅时(SubscribeOn)

订阅时,即在调用 subscribe() 方法时,切换线程。

1)实现调度器 Schedulers

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Schedulers {
    private static Scheduler IO = new IoScheduler("IO");
    private static Scheduler NEW_THREAD = new NewThreadScheduler();

    public static Scheduler io() {
        return IO;
    }

    public static Scheduler newThread() {
        return NEW_THREAD;
    }

    interface Scheduler {
        void schedule(Runnable run);
    }

    static class NamedThreadFactory extends AtomicInteger implements ThreadFactory {
        private String prefix;

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
            String name = nameBuilder.toString();
            return new Thread(r, name);
        }
    }

    static class NewThreadScheduler implements Scheduler {
        private static final NamedThreadFactory threadFactory = new NamedThreadFactory("New-Thread");

        @Override
        public void schedule(Runnable run) {
            Thread thread = threadFactory.newThread(run);
            thread.start();
        }
    }

    static class IoScheduler implements Scheduler {
        private static final NamedThreadFactory threadFactory = new NamedThreadFactory("IO");
        private ExecutorService executorService;

        public IoScheduler(String tag) {
            executorService = Executors.newFixedThreadPool(1, threadFactory);
        }

        @Override
        public void schedule(Runnable run) {
            executorService.submit(run);
        }
    }
}

2)实现 ObservableSubscribeOn

@Slf4j
public class ObservableSubscribeOn<T> extends Observable<T>{
    private Observable<T> upstream;
    private Schedulers.Scheduler scheduler;

    public ObservableSubscribeOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
        this.upstream = upstream;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        scheduler.schedule(() -> {
            log.info("schedule is called.");
            this.upstream.subscribe(observer);
        });
    }
}

代理模式分析:

注意:这里代理的的是 Observable

观察时(ObserveOn)

观察时,即事件处理时(调用 ObserveronNext() 方法)。
这个时候也可以切换线程。

1)Schedulers.Scheduler 新增 createWorker() 方法。

public class Schedulers {
    ...
    interface Scheduler {
        ...
        ExecutorService createWorker();
    }

    static class NewThreadScheduler implements Scheduler {
        ...
        @Override
        public ExecutorService createWorker() {
            return Executors.newFixedThreadPool(1, threadFactory);
        }
    }

    static class IoScheduler implements Scheduler {
        ...
        @Override
        public ExecutorService createWorker() {
            return Executors.newFixedThreadPool(1, threadFactory);
        }
    }
}

因为一个事件流中可以包含多个事件数据,所以要使用 Worker(线程池)的形式执行事件监听处理(ObserveronNext() 方法)。

2)实现 ObservableObserveOn

public class ObservableObserveOn<T> extends Observable<T> {
    private Observable<T> upstream;
    private Schedulers.Scheduler scheduler;

    public ObservableObserveOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
        this.upstream = upstream;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer observer) {
        ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver(observer, this.scheduler);
        this.upstream.subscribe(observeOnObserver);
    }

    public static class ObserveOnObserver<T> implements Observer<T> {
        private Observer<T> downstream;
        private ExecutorService worker;

        public ObserveOnObserver(Observer<T> downstream, Schedulers.Scheduler scheduler) {
            this.downstream = downstream;
            this.worker = scheduler.createWorker();
        }

        @Override
        public void onNext(T value) {
            this.worker.submit(() -> {
                this.downstream.onNext(value);
            });
        }
    }
}

代理模式分析:

注意:这里代理的的是 Observer

测试代码如下所示:

@Slf4j
public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(Emitter<Integer> emitter) {
                        log.info("emitter begin");
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                    }
                })
                .map(v -> v + 10) // 11 12 13
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onNext(Integer o) {
                        log.info("onNext {}", o);
                    }
                });
    }
}

输出如下所示:

17:17:51.623 [IO-1] INFO org.company.rxjava.pattern.ObservableSubscribeOn - schedule is called.
17:17:51.639 [IO-1] INFO org.company.rxjava.pattern.Main2 - emitter begin
17:17:51.642 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 11
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 12
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 13

从输出中可以得知,订阅(subscribe()) 在 [IO-0] 线程中执行,事件处理(onNext())在 [New-Thread-[worker]-0] 线程中执行。

说明实现符合预期。

总结

参考

上一篇下一篇

猜你喜欢

热点阅读