Zuul 消息推送

2018-10-05  本文已影响0人  _fan凡

Zuul 2.0 支持消息推送,例如送服务端发送消息给客户端。自持两种协议,WebSockets和Server Sent Events(SSE) 来发送推送消息。
官方也提供了示例sample app来使用WebSockets和SSE发送消息。

授权 Authentication

Zuul 推送服务器必须对进入的推送连接进行授权验证。可以在Zuul 推送服务器中以插件化的方式实现自己的认证。可以通过继承抽象类PushAuthHandler实现该抽象类的doAuth()方法。可以参考官方提供的实例SamplePushAuthHandler
参考示例代码:

protected PushUserAuth doAuth(FullHttpRequest req) {
        final Cookies cookies = parseCookies(req);
        for (final Cookie c : cookies.getAll()) {
            if(c.getName().equals("userAuthCookie")) {
                final String customerId = c.getValue();
                if (!Strings.isNullOrEmpty(customerId)) {
                    return new SamplePushUserAuth(customerId);
                }
            }
        }
        return new SamplePushUserAuth(HttpResponseStatus.UNAUTHORIZED.code());
    }

该类的doAuth()方法经过自己的处理后返回一个PushUserAuth接口的实例,该接口定义了三个方法:

public interface PushUserAuth {
    boolean isSuccess();
    int statusCode();
    String getClientIdentity();
}

客户端注册和查找

在认证成功后,Zuul推送服务根据客户端或者用户身份注册记录每个授权的连接以便之后的查找和发送推送消息给指定的客户端或者用户。可以在认证通过后实现PushUserAuth接口并且在上文提到的Handler类PushAuthHandler类的doAuth()方法中返回PushUserAuth的实例。可以参考示例SamplePushUserAuth
示例源码如下:

public class SamplePushUserAuth implements PushUserAuth {
    private String customerId;
    private int statusCode;
    private SamplePushUserAuth(String customerId, int statusCode) {
        this.customerId = customerId;
        this.statusCode = statusCode;
    }
    // Successful auth
    public SamplePushUserAuth(String customerId) {
        this(customerId, 200);
    }
    // Failed auth
    public SamplePushUserAuth(int statusCode) {
        this("", statusCode);
    }
    @Override
    public boolean isSuccess() {
        return statusCode == 200;
    }
    @Override
    public int statusCode() {
        return statusCode;
    }
    @Override
    public String getClientIdentity() {
        return customerId;
    }

每个Zuul的推送服务使用PushConnectionRegistry管理了一个本地基于内存的与其连接的所有客户端注册表。对单个节点的推送集群来说,基于内存的本地记录是足够满足使用的。但是对于多节点推送集群的场景,除了内存注册表,还需要第二级别:即全局数据存储注册表,来将推送记录扩展单节点限制。在这种情况下查找一个特定的客户端需要两个步骤。

可以通过继承PushRegistrationHandler并且覆盖registerClient()方法来集成全局的数据数据推送注册表。Zuul推送服务插入任何类型的数据存储来作为你选择的全局数据存储注册表但是最好选择的数据存储支持如下的特性:

 public void init() throws Exception
    {
        ChannelConfig channelDeps = new ChannelConfig();
        addChannelDependencies(channelDeps);

        ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        clientConnectionsShutdown = new ClientConnectionsShutdown(clientChannels,
                GlobalEventExecutor.INSTANCE, discoveryClient);

        portsToChannelInitializers = choosePortsAndChannels(clientChannels, channelDeps);

        server = new Server(portsToChannelInitializers, serverStatusManager, clientConnectionsShutdown, eventLoopGroupMetrics);
    }
protected Map<Integer, ChannelInitializer> choosePortsAndChannels(
            ChannelGroup clientChannels,
            ChannelConfig channelDependencies) {
        Map<Integer, ChannelInitializer> portsToChannels = new HashMap<>();

        int port = new DynamicIntProperty("zuul.server.port.main", 7001).get();

        ChannelConfig channelConfig = BaseServerStartup.defaultChannelConfig();
        int pushPort = new DynamicIntProperty("zuul.server.port.http.push", 7008).get();
        ServerSslConfig sslConfig;
        /* These settings may need to be tweaked depending if you're running behind an ELB HTTP listener, TCP listener,
         * or directly on the internet.
         */    
        channelConfig.set(CommonChannelConfigKeys.allowProxyHeadersWhen, StripUntrustedProxyHeadersHandler.AllowWhen.NEVER);
        channelConfig.set(CommonChannelConfigKeys.preferProxyProtocolForClientIp, true);
        channelConfig.set(CommonChannelConfigKeys.isSSlFromIntermediary, false);
        channelConfig.set(CommonChannelConfigKeys.withProxyProtocol, true);
        channelDependencies.set(ZuulDependencyKeys.pushConnectionRegistry, pushConnectionRegistry);
        portsToChannels.put(port, new SampleWebSocketPushChannelInitializer(port, channelConfig, channelDependencies, clientChannels));
                logPortConfigured(port, null);
        // port to accept push message from the backend, should be accessible on internal network only.
        portsToChannels.put(pushPort, pushSenderInitializer);
        logPortConfigured(pushPort, null);

        return portsToChannels;
    }

上面的倒数第四行代码,找到了SampleWebSocketPushChannelInitializer,把两个参数channelDependenciesclientChannels传给了类SampleWebSocketPushChannelInitializer

接受新的推送连接

SampleWebSocketPushChannelInitializerSampleSSEPushChannelInitializer分别展示了安装Netty通道管线(channel pipeline)来接受进入的WebSocket和SSE连接。这些类基于使用的协议为每个请求安装认证和注册表处理器,其实就是上边的最后一步。

负载均衡器与WebSockets和SSE的对比

推送连接与通常的HTTP类型的请求/响应是不同的。推送链接是持久的和长期存活的(long-lived)。一旦连接被创建,即使是没有请求进入,也会被客户端和服务端保持打开状态。这对常见的负载均衡器来说就会有个问题:在一定的空闲时间后连接会被断开。Amazon的ELB以及低版本的HAProxy和Nginx都会有该问题。你有两种选择来让集群和负载均衡器一起工作:
1.使用最新版本的支持WebSocket代理的负载均衡器如最新版本的HAProxy或者Nginx或者Amazon的ALB。
2.运行存在的负载均衡器作为4层网络协议的TCP负载均衡器,而不是7层网络协议的HTTP负载均衡器。多数负载均衡器,包括ELB,都支持作为TCP负载均衡器的模式。在这种模式下,它们只是代理TCP包的流入流出,不会对出现这些问题的应用层协议做解析或者阻断。
你也很可能需要增加负载均衡器的空闲超值,因为默认空间超时值通常以秒为单位这对典型的长时间存活、持久性和激活没有推送连接的情况是不够用的。

配置选项

名称 描述 默认值
zuul.push.registry.ttl.seconds Record expiry (TTL) of a client registration record in the global registry: 记录在全局注册表中客户注册记录的超时时间 1800 s
zuul.push.reconnect.dither.seconds Randomization window for each client's max connection lifetime. Helps in spreading subsequent client reconnects across time:每个客户端最大连接生存期的随机化窗口,有助于随后时间客户端重连 180 s
zuul.push.client.close.grace.period Number of seconds the server will wait for the client to close the connection before it closes it forcefully from its side:服务端在强制关掉其自身一端的连接之前等待客户端的时间 4 s

如果使用了Netflix OSS Auchaius模块,则可以在运行时修改上表的所有配置,服务不用重启配置就会立即生效。

上一篇 下一篇

猜你喜欢

热点阅读