Reactive编程思想
2022-07-25 本文已影响0人
右耳菌
1. JDK9 Reactive - (真的要使用,建议使用jdk11)
Reactive响应式(反应式)编程是一种新的编程风格,其特点是异步或并发、事件驱动、推送PUSH机制以及观察者模式的衍生。reactive应用(响应式应用)允许开发人员构建事件驱动(event-driven),可扩展性,弹性的反应系统∶提供高度敏感的实时的用户体验感觉,可伸缩性和弹性的应用程序栈的支持,随时可以部署在多核和云计算架构。
-
响应式编程与命令式编程的区别:
在命令式编程中,a:=b+c意味着将b+c的结果赋值给a,并且此后b或c的值发生变化不会影响到a的值。而在响应式编程中,a的值会随着b或c的改变而自动更新,并且不需要重新执行a:=b+c来确定当前分配给a的值。(PS:很像angularjs、vuejs这种MVVM框架,视图绑定模型,模型变了,视图自动就跟着变了)
2. Reactive 的主要接口
- Publisher: 发布者,数据的生产端。由它来提供数据的发生
- Subscriber:消费者,此处可以定义获取到数据后响应的操作
- Processor:消费者与发布者之间的数据处理
- back pressire:背压,消费者告诉发布者自己能够处理多少数据
模拟一个例子
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
//1.定义发布者,数据类型是Integer
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//2.定义消费者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
//订阅关系管理
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer integer) {
//获取到数据后,开始处理
System.out.println("我接收到的数据是:" + integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("数据处理完毕");
}
};
// 发布者和订阅者建立联系
publisher.subscribe(subscriber);
//创建数据
//TODO -- 数据库,redis,缓存 省略掉数据获取的步骤
int data = 110;
publisher.submit(data);
publisher.close();
Thread.currentThread().join(1000);
}
}
执行结果
我接收到的数据是:110
数据处理完毕
3. 消费者的回调方法
- onSubscribe:订阅关系处理,用它来响应发布者
- onNext:接收到数据后会响应的方法
- onError:出现任何错误时处理的方法
- onComplete:任务完成后响应的方法
4. Flow类的代码
package java.util.concurrent;
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
}
public static int defaultBufferSize() {
return 256;
}
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Subscriber<T> {
void onSubscribe(Flow.Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Flow.Subscriber<? super T> var1);
}
}
可以看到上边也定义了一个Professor接口,所以按理来说我们是可以使用一个类来实现这个接口,然后处理相关的逻辑的,以下就用这种方式来实现类似上边的一个例子。
- MyProfessor
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("打印当前的数据:" + integer);
this.submit(String.valueOf(integer + 100));
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("调用完成");
}
}
- MyFlowDemo
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class MyFlowDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
MyProcessor myProcessor = new MyProcessor();
publisher.subscribe(myProcessor);
//定义消费者 --
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("接收到数据:" + s);
throw new RuntimeException();
}
@Override
public void onError(Throwable throwable) {
System.out.println("人为制造异常,执行了onerror!!!");
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("发送方执行完毕");
}
};
myProcessor.subscribe(subscriber);
publisher.submit(111);
publisher.close();
Thread.currentThread().join(1000);
}
}
如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~