RxJava 中的设计模式(三)代理模式之切换线程实现
2021-06-27 本文已影响0人
蓝笔头
代理模式介绍
在代理模式(Proxy Pattern
)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。
- 意图:为其他对象提供一种代理以控制对这个对象的访问。
- 何时使用:想在访问一个类时做一些控制。
- 如何解决:增加中间层。
注意事项:
- 1)和适配器模式(Adapter Pattern)的区别:适配器模式主要改变所考虑对象的接口,而代理模式不能改变所代理类的接口。
- 2)和装饰器模式(Decorator Pattern)的区别:装饰器模式为了增强功能,而代理模式是为了加以控制。
实现
UML 类图public interface Service {
void handle();
}
@Slf4j
public class RealService implements Service{
@Override
public void handle() {
log.info("I am the RealService");
}
}
@Slf4j
public class ProxyService implements Service {
private Service realService;
public ProxyService(Service realService) {
this.realService = realService;
}
@Override
public void handle() {
new Thread(() -> {
this.realService.handle();
}).start();
}
}
测试:
public class Main {
public static void main(String[] args) {
Service realService = new RealService();
realService.handle();
ProxyService proxyService = new ProxyService(realService);
proxyService.handle();
}
}
输出结果:
10:53:10.375 [main] INFO org.company.pattern.proxy.RealService - I am the RealService
10:53:10.425 [Thread-0] INFO org.company.pattern.proxy.RealService - I am the RealService
RxJava 切换线程实现
通过代理模式实现 RxJava 中的线程切换代码如下文所示。
订阅时(SubscribeOn)
订阅时,即在调用 subscribe() 方法时,切换线程。
1)实现调度器 Schedulers
。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Schedulers {
private static Scheduler IO = new IoScheduler("IO");
private static Scheduler NEW_THREAD = new NewThreadScheduler();
public static Scheduler io() {
return IO;
}
public static Scheduler newThread() {
return NEW_THREAD;
}
interface Scheduler {
void schedule(Runnable run);
}
static class NamedThreadFactory extends AtomicInteger implements ThreadFactory {
private String prefix;
public NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
return new Thread(r, name);
}
}
static class NewThreadScheduler implements Scheduler {
private static final NamedThreadFactory threadFactory = new NamedThreadFactory("New-Thread");
@Override
public void schedule(Runnable run) {
Thread thread = threadFactory.newThread(run);
thread.start();
}
}
static class IoScheduler implements Scheduler {
private static final NamedThreadFactory threadFactory = new NamedThreadFactory("IO");
private ExecutorService executorService;
public IoScheduler(String tag) {
executorService = Executors.newFixedThreadPool(1, threadFactory);
}
@Override
public void schedule(Runnable run) {
executorService.submit(run);
}
}
}
2)实现 ObservableSubscribeOn
。
@Slf4j
public class ObservableSubscribeOn<T> extends Observable<T>{
private Observable<T> upstream;
private Schedulers.Scheduler scheduler;
public ObservableSubscribeOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
this.upstream = upstream;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<T> observer) {
scheduler.schedule(() -> {
log.info("schedule is called.");
this.upstream.subscribe(observer);
});
}
}
代理模式分析:
-
upstream
字段所引用的Observable
对象可以看作是上文的RealService
。 -
ObservableSubscribeOn
可以看作是上文的ProxyService
。
注意:这里代理的的是
Observable
。
观察时(ObserveOn)
观察时,即事件处理时(调用 Observer
的 onNext()
方法)。
这个时候也可以切换线程。
1)Schedulers.Scheduler
新增 createWorker()
方法。
public class Schedulers {
...
interface Scheduler {
...
ExecutorService createWorker();
}
static class NewThreadScheduler implements Scheduler {
...
@Override
public ExecutorService createWorker() {
return Executors.newFixedThreadPool(1, threadFactory);
}
}
static class IoScheduler implements Scheduler {
...
@Override
public ExecutorService createWorker() {
return Executors.newFixedThreadPool(1, threadFactory);
}
}
}
因为一个事件流中可以包含多个事件数据,所以要使用 Worker
(线程池)的形式执行事件监听处理(Observer
的 onNext()
方法)。
2)实现 ObservableObserveOn
。
public class ObservableObserveOn<T> extends Observable<T> {
private Observable<T> upstream;
private Schedulers.Scheduler scheduler;
public ObservableObserveOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
this.upstream = upstream;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer observer) {
ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver(observer, this.scheduler);
this.upstream.subscribe(observeOnObserver);
}
public static class ObserveOnObserver<T> implements Observer<T> {
private Observer<T> downstream;
private ExecutorService worker;
public ObserveOnObserver(Observer<T> downstream, Schedulers.Scheduler scheduler) {
this.downstream = downstream;
this.worker = scheduler.createWorker();
}
@Override
public void onNext(T value) {
this.worker.submit(() -> {
this.downstream.onNext(value);
});
}
}
}
代理模式分析:
-
downstream
字段所引用的Observer
对象可以看作是上文的RealService
。 -
ObserveOnObserver
可以看作是上文的ProxyService
。
注意:这里代理的的是
Observer
。
测试代码如下所示:
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(Emitter<Integer> emitter) {
log.info("emitter begin");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.map(v -> v + 10) // 11 12 13
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer o) {
log.info("onNext {}", o);
}
});
}
}
输出如下所示:
17:17:51.623 [IO-1] INFO org.company.rxjava.pattern.ObservableSubscribeOn - schedule is called.
17:17:51.639 [IO-1] INFO org.company.rxjava.pattern.Main2 - emitter begin
17:17:51.642 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 11
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 12
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 13
从输出中可以得知,订阅(subscribe()
) 在 [IO-0]
线程中执行,事件处理(onNext()
)在 [New-Thread-[worker]-0]
线程中执行。
说明实现符合预期。
总结
- 在
ObservableSubscribeOn
中用代理模式代理了upstream
字段引用的Observable
对象。 - 在
ObserveOnObserver
中用代理模式代理了downstream
字段所引用的Observer
对象。