Spring响应式

零基础学习WebFlux(持续更新中。。。)

2020-09-01  本文已影响0人  liuliuzo

前言

我们在学习WebFlux之前需要先学习JVM平台上的响应式流(Reactive Streams)规范。响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准。大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上。响应式流其实就是一个规范,且这个规范已经被引入到JDK9里了。

API由以下组件组成,响应式流的实现必须提供它们:

它们其实是4个接口,先睹为快:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
public interface Subscription {
    public void request(long n);
    public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

一个发布者是一个潜在的无限数量的序列元素的一个提供者,按照收到的来自于它的订阅者的需要来发布这些元素。

作为对发布者的subscribe(Subscriber)方法调用的响应,对于订阅者上的方法的可能调用顺序按下面的协议给出:

onSubscribe onNext* (onError | onComplete)?

这意味着onSubscribe方法总是被调用,后面跟着一个可能的无限数量onNext方法调用(因为订阅者的请求)。如果失败的话,后跟一个onError方法调用,或当没有更多的元素可用时,是一个onComplete方法调用,只要这个Subscription(订阅关系)没有被取消。

说明

Publisher
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
Subscriber
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}

一个订阅必须支持无数次地调用request方法,必须支持到2^63 - 1(Long.MAX_VALUE)次。如果一个需求等于或大于2^63 - 1(Long.MAX_VALUE),或许被发布者认为是真正的无界。
一个订阅被一个发布者和一个订阅者共享,目的是为了在它们之间调节数据交换。这也是为什么subscribe()方法并没有返回创建的那个订阅而是返回void的原因。这个订阅只能通过onSubscriber回调方法传给订阅者。

Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

WebFlux介绍

Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux。

WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,Spring5的已经支持Reactor模型,主要提供开发者使用的是Mono和Flux,在Reator遵循ReactiveStreams的标准的API的现,Rxjava是在Reactor之前实现反应流式库,Spring 5实现了在Reactor-Netty基础上实现SpringMVC的框架命名为Spring Webflux,这是取代传统的servlet API的异步框架。

Reactor库

接着在介绍WebFlux之前我们再了解下Reactor库

代码演示 Flux 和 和 Mono

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.1.5.RELEASE</version>
< dependency>
public class ReactorTest {
    //just声明元素
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6);
        Mono.just(1);
        //其他方法
        Integer[] a = {1, 2, 3, 4, 5, 6};
        Flux.fromArray(a);
        List<Integer> list = Arrays.asList(a);
        Flux.fromIterable(list);
        Stream<Integer> stream = list.stream();
        Flux.fromStream(stream);
    }
}
反应式数据处理
public Mono<ClientUser> currentUser () {
    return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
            : Mono.empty();
}

Mono defer方法创建数据源属于懒汉型,与Mono.just等创建数据源则是恶汉型,我们在看下下面这个例子:

Mono<Date> m1 = Mono.just(new Date());
Mono<Date> m2 = Mono.defer(()-> Mono.just(new Date()));
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
try {
    Thread.sleep(5000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
Tue Dec 08 09:18:48 CST 2020
Tue Dec 08 09:18:49 CST 2020
Tue Dec 08 09:18:48 CST 2020
Tue Dec 08 09:18:54 CST 2020

我们可以看到,创建了两个数据源,一个使用Mono.just创建,一个用Mono.defer创建,然后分别通过lambda表达式订阅这两个publisher,可以看到两个输出的时间是一样的,延迟5秒钟后重新订阅,Mono.just创建的数据源时间没变,但是Mono.defer创建的数据源时间相应的延迟了5秒钟,原因在于Mono.just会在声明阶段构造Date对象,只创建一次,但是Mono.defer却是在subscribe阶段才会创建对应的Date对象,每次调用subscribe方法都会创建Date对象,在webflux中:

@Override
public Mono<Void> filter(ServerWebExchange exchange) {
    return Mono.defer(() -> 
this.currentFilter != null && this.next != null ? 
this.currentFilter.filter(exchange, this.next) : this.handler.handle(exchange));
}

WebFlux的执行过程

reactor.netty.http.server.HttpServerHandle#onStateChange
->ReactorHttpHandlerAdapter.apply    //封装Request,Response。ReactorServerHttpRequest
-> HttpWebHandlerAdapter.handle      //创建ServerWebExchange
-> ExceptionHandlingWebHandler.handle
-> FilteringWebHandler.handle
-> DefaultWebFilterChain.handle      //使用WebFilter在请求处理前进行filter
-> DispatcherHandler.handle          //与Spring MVC对应,HandlerMapping,HandlerAdpater
-> handlerAdapter.handle
-> 调用我们的业务代码

样例

我们测试下webflux的线程执行情况,为什么webflux会有更高的性能。

@RestController
@RequestMapping("/hello/")
public class HelloController {

    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(HelloController.class);

    @GetMapping("/common")
    public String commonHandle() {
        String uuid = UUID.randomUUID().toString();
        log.info("common-start"+":"+uuid);
        // 执行耗时操作
        String result = doThing("common handler");
        log.info("common-end"+":"+uuid);
        return result;
    }

    @GetMapping("/mono")
    public Mono<String> monoHandle() {
        String uuid = UUID.randomUUID().toString();
        log.info("mono-start"+":"+uuid);
        // 执行耗时操作
        Mono<String> mono = Mono.fromSupplier(() -> doThing("mono handle"));
        log.info("mono-end"+":"+uuid);
        // Mono表示包含0或1个元素的异步序列
        return mono;
    }

    // 定义耗时操作
    private String doThing(String msg) {
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return msg;
    }
}

日志

2020-12-01 11:37:25.286 [reactor-http-nio-4] INFO  com.liuliu.webflux.learning.demo06.HelloController - mono-start: 631e4b65-1c0b-4064-afc0-d9741e7af269
2020-12-01 11:37:25.287 [reactor-http-nio-4] INFO  com.liuliu.webflux.learning.demo06.HelloController - mono-end:   631e4b65-1c0b-4064-afc0-d9741e7af269
2020-12-01 11:37:35.291 [reactor-http-nio-4] DEBUG org.springframework.core.codec.CharSequenceEncoder - [e6c15c8c] Writing "mono handle:631e4b65-1c0b-4064-afc0-d9741e7af269"
2020-12-01 11:37:35.292 [reactor-http-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xe6c15c8c, L:/127.0.0.1:8080 - R:/127.0.0.1:62228] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)

2020-12-01 11:37:22.693 [reactor-http-nio-2] INFO  com.liuliu.webflux.learning.demo06.HelloController - common-start:   3c52e08f-f88b-4bb9-904d-908e9f8235cf
2020-12-01 11:37:32.694 [reactor-http-nio-2] INFO  com.liuliu.webflux.learning.demo06.HelloController - common-end:     3c52e08f-f88b-4bb9-904d-908e9f8235cf
2020-12-01 11:37:32.701 [reactor-http-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - [c4438344] Writing "common handler:3c52e08f-f88b-4bb9-904d-908e9f8235cf"
2020-12-01 11:37:32.706 [reactor-http-nio-2] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xc4438344, L:/127.0.0.1:8080 - R:/127.0.0.1:62226] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.

从日志里可以看出来,reactor-http-nio-4 并不会阻塞,所以大大的增加了吞吐量。

转载自一下文章:
学习代码
Spring5新特性WebFlux学习
Webflux的执行流程和核心API
外行人都能看得懂的WebFlux,错过了血亏
JVM平台上的响应式流(Reactive Streams)规范

上一篇 下一篇

猜你喜欢

热点阅读