Spring5新特性:Webflux响应式编程

2023-04-20  本文已影响0人  圆企鹅i

前言

之前看过一本《RxJava反应式编程》,然后就对响应式编程产生了浓厚的兴趣。
发现在Android应用开发中被广泛普及。
然后在网上众说纷纭。很多人表示学习成本高,收益少。还表示同事RxJava水平参差不齐,导致代码变成屎山。
但是在Spring开始接手之后,希望能够改变这一现象。

image.png

优势在于可以做到低延迟,高吞吐,弹性和消息驱动。
SpringWebflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻
塞的框架
但是大家真的不会去选择并发性能更好的go吗,毕竟协程不是闹着玩的。停留在JDK8的Java是没有虚拟线程的,好再还有Netty。

image.png

怎么优化同步阻塞模型?
这是Webflux的核心,在我们的理想情况下,肯定希望的模式是这样:
1.前端发送请求
2.后端的web服务使用一个线程把逻辑处理,发送给数据库CRUD操作,线程释放
3.数据库收到请求,处理完,主动告诉web服务器
4.web服务器使用一个线程把结果推到前端,线程释放

因为通常我们在第二步,数据的crud上,是非常耗时间的,同时会让一个线程堵塞住,浪费时间。因此要优化这种模型,数据库也需要有异步的能力

Postgres, MSSQL, H2,MySQL都开始陆续支持异步读取,并有了对应的实现R2DBC的驱动。

这也就是响应式编程优秀的地方。

观察者模式

jdk8其实也提供了观察者模式,刚好前段时间写了。
核心就是先指定好遇见某个事件需要触发的逻辑,然后再去等事件发生来触发他。解耦效果很好。

@Test
    public void jdkObs() {
        //todo demo1
        CustomObservable customObservable = new CustomObservable();

        //将观察者1放入集合中 订阅1
        customObservable.addObserver((ob, arg) -> {
            log.info("Custom consume data ...");
            CustomObservable boa = (CustomObservable) ob;
            System.out.println("lambda 接收到了:" + arg);
        });

        //将观察者2放入集合中 订阅2
        customObservable.addObserver(new CustomObserver());

        //告诉观察者 可以执行一次
        customObservable.setChanged();
        //发送事件 集合中订阅的对象们 挨个处理订阅的信息
        customObservable.notifyObservers(Arrays.asList(1, 2, 3, 4, 5));

        //告诉观察者 可以执行一次 没有setChanged则无法处理消息
        customObservable.setChanged();
        customObservable.notifyObservers(Arrays.asList(2, 5));

        //todo 无法读取
        customObservable.notifyObservers(Arrays.asList(1, 3));

    }

响应式编程(Reactor 实现)

(1)响应式编程操作中,Reactor 是满足 Reactive 规范框架
(2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
(3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:
1.元素值
2.错误信号:终止信号,流结束,同时把错误信号传给订阅者。
3.完成信号:终止信号,流结束

依赖

<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-core</artifactId>
 <version>3.1.5.RELEASE</version>
</dependency>

代码

开启Flux流

public static void main(String[] args) {
 //just 方法直接声明 开启一个流,数据流并没有发出,只有进行订阅之后才会触发
 Flux.just(1,2,3,4);
 Mono.just(1);
 //其他的方法
 Integer[] array = {1,2,3,4};
 Flux.fromArray(array);

 List<Integer> list = Arrays.asList(array);
 Flux.fromIterable(list);
 Stream<Integer> stream = list.stream();
 Flux.fromStream(stream);
}

订阅消费流的逻辑

//源码
public abstract class Flux<T> implements Publisher<T> {
...
    //flux订阅消费逻辑 非空的消费者即可
    public final Disposable subscribe(Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer, "consumer");
        return this.subscribe(consumer, (Consumer)null, (Runnable)null);
    }
}

    //应用编码
    @Test
    public void fluxDemo() {
        //just(T... data) 
        Flux.just("Hello", "World")
                .map(s->s)
                .distinct()
                .filter(s->true)
                //订阅一个打印逻辑
                .subscribe(System.out::println);
        //array
        Flux.fromArray(new Integer[]{1, 2, 3})
                .subscribe(System.out::println);
        //empty
        Flux.empty()
                .subscribe(System.out::println);
        //range
        Flux.range(1, 10)
                .subscribe(System.out::println);
        //interval(Duration period) 
        Flux.interval(Duration.of(10, ChronoUnit.SECONDS))
                .subscribe(System.out::println);
    }

工程化编码

SpringWebflux 实现方式有两种:注解编程模型和函数式编程模型
使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,
SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器

Controller

@RestController
public class UserController {
 //注入 service
 @Autowired
 private UserService userService;
 //id 查询
 @GetMapping("/user/{id}")
 public Mono<User> geetUserId(@PathVariable int id) {
 return userService.getUserById(id);
 }
 //查询所有
 @GetMapping("/user")
 public Flux<User> getUsers() {
 return userService.getAllUser();
 }
 //添加
 @PostMapping("/saveuser")
 public Mono<Void> saveUser(@RequestBody User user) {
 Mono<User> userMono = Mono.just(user);
 return userService.saveUserInfo(userMono);
 }
}

其实不难发现,与SpringMVC的代码其实差距并不大。
SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty

Spring官方文档框架区别示意图

Handler

/**
 * @author zhangxuecheng4441
 * @date 2022/11/14/014 15:08
 */
public class UserHandler {
    private final UserService userService;

    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    //根据 id 查询
    public Mono<ServerResponse> getUserById(ServerRequest request) {
        //获取 id 值
        int userId = Integer.valueOf(request.pathVariable("id"));
        //空值处理
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();
        //调用 service 方法得到数据
        Mono<User> userMono = this.userService.getUserById(userId);
        //把 userMono 进行转换返回
        //使用 Reactor 操作符 flatMap
        return userMono.flatMap(person -> ServerResponse
                        .ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(fromObject(person))
                )
                .switchIfEmpty(notFound);
    }

    //查询所有
    public Mono<ServerResponse> getAllUsers() {
        //调用 service 得到结果
        Flux<User> users = this.userService.getAllUser();
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
    }

    //添加
    public Mono<ServerResponse> saveUser(ServerRequest request) {
        //得到 user 对象
        Mono<User> userMono = request.bodyToMono(User.class);
        return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
    }
}
上一篇下一篇

猜你喜欢

热点阅读