Project Reactor源码分析1-底层实现原理分析

2023-03-13  本文已影响0人  王侦
        Flux.just("tom", "jack", "allen")
                .map(s-> s.concat("@qq.com"))
                .filter(s -> s.length() >= 11)
                .subscribe(System.out::println);

结果:

jack@qq.com
allen@qq.com

1.声明阶段

Flux#just()

Flux#just()

final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {

    final T[] array;

    @SafeVarargs
    public FluxArray(T... array) {
        this.array = Objects.requireNonNull(array, "array");
    }

Flux#map(Function mapper)

Flux#map(Function<? super T, ? extends V> mapper)

final class FluxMapFuseable<T, R> extends InternalFluxOperator<T, R> implements Fuseable {

    final Function<? super T, ? extends R> mapper;

    FluxMapFuseable(Flux<? extends T> source,
            Function<? super T, ? extends R> mapper) {
        super(source);
        this.mapper = Objects.requireNonNull(mapper, "mapper");
    }

Flux#filter

Flux#filter(Predicate<? super T> p)

2.subscribe

2.1 subscribe阶段

Flux#subscribe(Consumer)

Flux#subscribe(Consumer)

    public final void subscribe(Subscriber<? super T> actual) {
        CorePublisher publisher = Operators.onLastAssembly(this);
        CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

        if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
            subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
        }

        try {
            if (publisher instanceof OptimizableOperator) {
                OptimizableOperator operator = (OptimizableOperator) publisher;
                while (true) {
                    subscriber = operator.subscribeOrReturn(subscriber);
                    if (subscriber == null) {
                        // null means "I will subscribe myself", returning...
                        return;
                    }
                    OptimizableOperator newSource = operator.nextOptimizableSource();
                    if (newSource == null) {
                        publisher = operator.source();
                        break;
                    }
                    operator = newSource;
                }
            }

            publisher.subscribe(subscriber);
        }
        catch (Throwable e) {
            Operators.reportThrowInSubscribe(subscriber, e);
            return;
        }
    }

Flux#subscribe(Subscriber actual)

FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)

    public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
        if (array.length == 0) {
            Operators.complete(s);
            return;
        }
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
        }
        else {
            s.onSubscribe(new ArraySubscription<>(s, array));
        }
    }

2.2 onSubscribe阶段

FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)

LambdaSubscriber#onSubscribe

    public final void onSubscribe(Subscription s) {
        if (Operators.validate(subscription, s)) {
            this.subscription = s;
            if (subscriptionConsumer != null) {
                try {
                    subscriptionConsumer.accept(s);
                }
                catch (Throwable t) {
                    Exceptions.throwIfFatal(t);
                    s.cancel();
                    onError(t);
                }
            }
            else {
                s.request(Long.MAX_VALUE);
            }
        }
    }

2.3 request阶段

FilterFuseableSubscriber#request(Long.MAX_VALUE)

ArrayConditionalSubscription#request

        public void request(long n) {
            if (Operators.validate(n)) {
                if (Operators.addCap(REQUESTED, this, n) == 0) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    }
                    else {
                        slowPath(n);
                    }
                }
            }
        }

2.4 调用阶段

ArrayConditionalSubscription#request

执行逻辑的核心在ArrayConditionalSubscription#fastPath()

        void fastPath() {
            final T[] a = array;
            final int len = a.length;
            final Subscriber<? super T> s = actual;

            for (int i = index; i != len; i++) {
                if (cancelled) {
                    return;
                }

                T t = a[i];

                if (t == null) {
                    s.onError(new NullPointerException("The " + i + "th array element was null"));
                    return;
                }

                s.onNext(t);
            }
            if (cancelled) {
                return;
            }
            s.onComplete();
        }

总结

参考

上一篇 下一篇

猜你喜欢

热点阅读