Project Reactor源码分析2-线程切换

2023-03-13  本文已影响0人  王侦

4种线程调度器

进行线程切换的函数有两个

        Flux.just("tom")
                .map(s -> {
                    System.out.println("(concat @qq.com) at [" + Thread.currentThread() + "]");
                    return s.concat("@qq.com");
                })
                .publishOn(Schedulers.newSingle("thread-a"))
                .map(s -> {
                    System.out.println("(to string) at [" + Thread.currentThread() + "]");
                    return s;
                })
                .subscribeOn(Schedulers.newSingle("source"))
                .subscribe(System.out::println);

结果:

(concat @qq.com) at [Thread[source-1,5,main]]
(concat foo) at [Thread[thread-a-3,5,main]]
(startsWith f) at [Thread[thread-a-3,5,main]]
(to string) at [Thread[thread-b-2,5,main]]
tom@qq.com-foo

1.声明阶段

2.subscribe、onSubscribe、request阶段

2.1 LambdaSubscriber@onSubscribe (subscribeOn线程影响)

FluxSubscribeOn#subscribeOrReturn

    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Worker worker = Objects.requireNonNull(scheduler.createWorker(),
                "The scheduler returned a null Function");

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
                actual, worker, requestOnSeparateThread);
        actual.onSubscribe(parent);

        try {
            worker.schedule(parent);
        }
        catch (RejectedExecutionException ree) {
            if (parent.s != Operators.cancelledSubscription()) {
                actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
                        actual.currentContext()));
            }
        }
        return null;
    }

LambdaSubscriber@onSubscribe:

SubscribeOnSubscriber#request(Long.MAX_VALUE)

ExecutorServiceWorker#schedule(SubscribeOnSubscriber)

2.2 subscribe阶段

FluxMapFuseable#subscribe(SubscribeOnSubscriber)

2.3 onSubscribe

2.4 request及执行

从最外层往里面执行:

3.总结

参考

上一篇 下一篇

猜你喜欢

热点阅读