右耳菌-邓小白的Java架构师的修炼之路给自己

Reactive编程思想

2022-07-25  本文已影响0人  右耳菌

1. JDK9 Reactive - (真的要使用,建议使用jdk11)

Reactive响应式(反应式)编程是一种新的编程风格,其特点是异步或并发、事件驱动、推送PUSH机制以及观察者模式的衍生。reactive应用(响应式应用)允许开发人员构建事件驱动(event-driven),可扩展性,弹性的反应系统∶提供高度敏感的实时的用户体验感觉,可伸缩性和弹性的应用程序栈的支持,随时可以部署在多核和云计算架构。


2. Reactive 的主要接口

模拟一个例子

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

    public static void main(String[] args) throws InterruptedException {
        //1.定义发布者,数据类型是Integer
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2.定义消费者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //订阅关系管理
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                //获取到数据后,开始处理
                System.out.println("我接收到的数据是:" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("数据处理完毕");

            }
        };

        // 发布者和订阅者建立联系
        publisher.subscribe(subscriber);

        //创建数据
        //TODO -- 数据库,redis,缓存 省略掉数据获取的步骤

        int data = 110;
        publisher.submit(data);
        publisher.close();

        Thread.currentThread().join(1000);
    }
}

执行结果

我接收到的数据是:110
数据处理完毕

3. 消费者的回调方法


4. Flow类的代码

package java.util.concurrent;

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber<T> {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher<T> {
        void subscribe(Flow.Subscriber<? super T> var1);
    }
}

可以看到上边也定义了一个Professor接口,所以按理来说我们是可以使用一个类来实现这个接口,然后处理相关的逻辑的,以下就用这种方式来实现类似上边的一个例子。


import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("打印当前的数据:" + integer);
        this.submit(String.valueOf(integer + 100));
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println("调用完成");
    }
}
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class MyFlowDemo {

    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        MyProcessor myProcessor = new MyProcessor();

        publisher.subscribe(myProcessor);

        //定义消费者 --
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
            }

            @Override
            public void onNext(String s) {
                System.out.println("接收到数据:" + s);
                throw new RuntimeException();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("人为制造异常,执行了onerror!!!");
                throwable.printStackTrace();
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("发送方执行完毕");
            }
        };

        myProcessor.subscribe(subscriber);

        publisher.submit(111);
        publisher.close();

        Thread.currentThread().join(1000);

    }
}

如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

上一篇 下一篇

猜你喜欢

热点阅读