RSocket协议初识-Springboot中使用(一)

2020-05-27  本文已影响0人  后厂村老司机

前言

前几天无聊翻SpringBoot官方文档,无意中发现文档中增加了一个章节叫RSocket协议的鬼东西,遂研究了一下。

RSocket是什么?

RSocket是一种二进制字节流传输协议,位于OSI模型中的5~6层,底层可以依赖TCP、WebSocket、Aeron协议。

RSocket设计目标是什么?

1、支持对象传输,包括request\response、request\stream、fire and forget、channel
2、支持应用层流量控制
3、支持单连接双向、多次复用
4、支持连接修复
5、更好的使用WebSocket和Aeron协议

RSocket与其他协议有什么区别?

对比Http1.x

对比Http2.x

对比grpc

对比TCP

对比WebSocket

RSocket协议的形式是什么?

结论:

基于RSocket协议,我们的业务数据会被打包成帧,并以帧流的形式在客户端与服务端互相传输。所以RSocket的所有特性都是基于这个帧流实现的。后续有时间会针对每个帧类型做解析。

RSocket适用于哪些场景?

1、移动设备与服务器的连接。

2、微服务场景。

3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

BB了这么多你给我上个代码

SpringBoot中的使用

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String from;
    private String to;
    private long index;
    private long created = Instant.now().getEpochSecond();

    public Message(String from, String to) {
        this.from = from;
        this.to = to;
        this.index = 0;
    }

    public Message(String from, String to, long index) {
        this.from = from;
        this.to = to;
        this.index = index;
    }
}
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
@Controller
public class RSocketController {

    private final List<RSocketRequester> CLIENTS = new ArrayList<>();

    @MessageMapping("request-response")
    public Message requestResponse(Message request) {
        log.info("收到请求: {}", request);
        return new Message("服务端", "客户端");
    }

    @MessageMapping("fire-and-forget")
    public void fireAndForget(Message request) {
        log.info("收到fire-and-forget请求: {}", request);
    }

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("收到流式请求: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(”服务端“, "客户端", index))
                .log();
    }

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                .doOnNext(setting -> log.info("发射间隔为 {} 秒.", setting.getSeconds()))
                .switchMap(setting -> Flux.interval(setting)
                        .map(index -> new Message("服务端", "客户端", index)))
                .log();
    }
}
spring.main.lazy-initialization=true
spring.rsocket.server.port=7000

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String from;
    private String to;
    private long index;
    private long created = Instant.now().getEpochSecond();

    public Message(String from, String to) {
        this.from = from;
        this.to = to;
        this.index = 0;
    }

    public Message(String from, String to, long index) {
        this.from = from;
        this.to = to;
        this.index = index;
    }
}
import java.time.Duration;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@RestController
public class RSocketClient {

    private final RSocketRequester rsocketRequester;
    private static Disposable disposable;

    @Autowired
    public RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
        this.rsocketRequester = rsocketRequesterBuilder
                .rsocketStrategies(strategies)
                .connectTcp("localhost", 7000)
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doOnError(error -> log.warn("发生错误,链接关闭"))
                .doFinally(consumer -> log.info("链接关闭"))
                .subscribe();
    }

    @PreDestroy
    void shutdown() {
        rsocketRequester.rsocket().dispose();
    }

    @GetMapping("request-response")
    public Message requestResponse() {
        Message message = this.rsocketRequester
                .route("request-response")
                .data(new Message("客户端", "服务器"))
                .retrieveMono(Message.class)
                .block();
        log.info("客户端request-response收到响应 {}", message);
        return message;
    }

    @GetMapping("fire-and-forget")
    public String fireAndForget() {
        this.rsocketRequester
                .route("fire-and-forget")
                .data(new Message("客户端", "服务器"))
                .send()
                .block();
        return "fire and forget";
    }

    @GetMapping("stream")
    public String stream() {
        disposable = this.rsocketRequester
                .route("stream")
                .data(new Message("客户端", "服务器"))
                .retrieveFlux(Message.class)
                .subscribe(message -> log.info("客户端stream收到响应 {}", message));
        return "stream";
    }

    @GetMapping("channel")
    public String channel() {
        Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
        Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
        Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));
        Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                .doOnNext(d -> log.info("客户端channel发送消息 {}", d.getSeconds()));
        disposable = this.rsocketRequester
                .route("channel")
                .data(settings)
                .retrieveFlux(Message.class)
                .subscribe(message -> log.info("客户端channel收到响应 {}", message));
        return "channel";
    }

}

代码解析

What Next?

上一篇下一篇

猜你喜欢

热点阅读