websocket推送信息

2020-10-19  本文已影响0人  半日孤独

1.添加依赖

 <!--websocket依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.websocket配置


/**
 * 配置WebSocket
 */
@Configuration
//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    //注册STOMP协议的节点(endpoint),并映射指定的url
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个STOMP的endpoint,并指定使用SockJS协议
        registry.addEndpoint("/endpointOyzc").setAllowedOrigins("*").withSockJS();
    }

    @Override
    //配置消息代理(Message Broker)
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
        registry.enableSimpleBroker("/topic", "/user");
        //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }
}

3.使用

          //使用springboot封装好的调用方法
          @Autowired
          private SimpMessagingTemplate template;
           //给所有人推送
            this.template.convertAndSend("/topic/getResponse", message);
            //单独给某个用户推送
            this.template.convertAndSendToUser("thfyth", "/queue/getResponse", "今天是个好日子");

4.前端

var stompClient = null; 
    //加载完浏览器后  调用connect(),打开双通道
    $(function(){   
    //打开双通道
    connect()
    })
    //强制关闭浏览器  调用websocket.close(),进行正常关闭
    window.onunload = function() {
        disconnect()
    }
    function connect(){
        
        var socket = new SockJS('http://127.0.0.1:9004/hr-user/endpointOyzc'); //连接SockJS的endpoint名称为"endpointOyzc"
        stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
        stompClient.connect({},function(frame){//连接WebSocket服务端     
            console.log('Connected:' + frame);
            //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
           //广播信息
            stompClient.subscribe('/topic/getResponse',function(response){
                showResponse(JSON.parse(response.body));
              //个人信息
             //stompClient.subscribe('user'+'thfyth'+/queue/getResponse',function(response){
            });
        });
    }
 
    //关闭双通道
    function disconnect(){
        if(stompClient != null) {
            stompClient.disconnect();
        }
        console.log("Disconnected");
    }
    function showResponse(message){
        var response = $("#response");
        response.append("<p>"+message.userName+"</p>");
    }

4.gataway集成websocket

4.1 协议修改

websocket推送信息前,会进行一个连接握手操作,发送一个/*/info请求,
但是协议是ws的,所以我们需要进行拦截修改操作
 @Component
public class WebsocketFilter implements GlobalFilter, Ordered {
    //项目是分布式的,user表示项目名,如不是微服务项目,则去掉
    private final static String DEFAULT_FILTER_PATH = "/user/info";

    /**
     * 设置请求方式,将ws协议改为http
     * @param exchange ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,
     *                 并公开额外的 服务器 端处理相关属性和特性,如请求属性
     * @param chain
     * @return
     */
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl =  exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
        String scheme = requestUrl.getScheme();
        if (!"ws".equals(scheme) && !"wss".equals(scheme)) {
            return chain.filter(exchange);
        } else if (DEFAULT_FILTER_PATH.equals(requestUrl.getPath())) {
            String wsScheme = convertWsToHttp(scheme);
            URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
        }
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE - 2;
    }

    static String convertWsToHttp(String scheme) {
        scheme = scheme.toLowerCase();
        return "ws".equals(scheme) ? "http" : "wss".equals(scheme) ? "https" : scheme;
    }

}

4.2 跨域问题配置

@Component
public class ResponseFilter implements GlobalFilter, Ordered {


    private static final String ALL = "*";
    private static final String MAX_AGE = "18000L";

    @Bean
    public RouteDefinitionLocator discoveryClientRouteDefinitionLocator(DiscoveryClient discoveryClient,
                                                                        DiscoveryLocatorProperties properties) {
        return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
    }

    @Bean
    public ServerCodecConfigurer serverCodecConfigurer() {
        return new DefaultServerCodecConfigurer();
    }
  //添加请求头 
    @Bean
    public WebFilter corsFilter() {
        return (ServerWebExchange ctx, WebFilterChain chain) -> {
            ServerHttpRequest request = ctx.getRequest();
            if (!CorsUtils.isCorsRequest(request)) {
                return chain.filter(ctx);
            }
            HttpHeaders requestHeaders = request.getHeaders();
            ServerHttpResponse response = ctx.getResponse();
            HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
            HttpHeaders headers = response.getHeaders();
            headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
            headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestHeaders.getAccessControlRequestHeaders());
            if (requestMethod != null) {
                headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
            }
            headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
            headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, ALL);
            headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
            if (request.getMethod() == HttpMethod.OPTIONS) {
                response.setStatusCode(HttpStatus.OK);
                return Mono.empty();
            }
            return chain.filter(ctx);
        };
    }
  //将请求头中有多个值的去掉 这是该版本的一个bug
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange).then(Mono.defer(() -> {
            exchange.getResponse().getHeaders().entrySet().stream()
                    .filter(kv -> (kv.getValue() != null && kv.getValue().size() > 1))
                    .filter(kv -> (kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN)
                            || kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)))
                    .forEach(kv -> {
                        kv.setValue(new ArrayList<String>() {{
                            add(kv.getValue().get(0));
                        }});
                    });

            return chain.filter(exchange);
        }));
    }

    @Override
    public int getOrder() {
        // 指定此过滤器位于NettyWriteResponseFilter之后
        // 即待处理完响应体后接着处理响应头
        return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
    }

}

4.3 路由配置

gateway:
      discovery:
        locator:
          enabled: true #开启从注册中心动态创建路由的功能,利用微服务名进行路由
      routes:
      - id: user_routh #user_route    #路由的ID,没有固定规则但要求唯一,建议配合服务名
        order: 3
        uri: lb://user #匹配后提供服务的路由地址
        predicates:
        - Path=/api/user/*/**         # 断言,路径相匹配的进行路由
        filters:
        - StripPrefix=1
      - id: user_routh
        uri: lb:ws://user
        order: 2
        predicates:
          - Path=/user/endpointOyzc/**
        filters:
          - StripPrefix=1

5.备注

在网关使用websocket时,跨域配置中access_control_allow_credentials这个设置为true之后,access_control_allow_origin设置最好不要为'*'

如有问题,请自觉查找,因为不一定完全匹配。

上一篇 下一篇

猜你喜欢

热点阅读