通道服务的开发设计

2022-10-12  本文已影响0人  天草二十六_简村人

一、背景

在线课堂业务中,参与方的教师、学生和助教等人员,在实时上课的过程中,需要做到消息的及时传达。
一次上课,可以认为是创建了一个房间,教师把学生都加入到房间,然后就可以在房间里互发消息。
消息的发送又分为单播和广播两种模式。

本文主要是概述如何使用通道技术实现一个聊天系统,介绍一些我们公司的设计与实现思路,希望能够给到大家一些帮助。

二、目标

三、ChannelGroup与Channel

3.1、channel在分布式环境下的保存问题

由于它不能存储到redis,因为netty的channel是一个连接,是和机器的硬件绑定的,无法序列化。所以一般有两种做法,前者是在连接层上面再设计一个负载均衡层,由它来寻址转发到哪台后端机器;后者是让命令消息广播,让每个节点都尝试处理,如果当前用户的通道不在当前机器的内存中,则忽略。总之,保证消息有且只有一个后端服务的节点会处理消息。

3.1.1、使用rabbitmq的广播模式fanout

所有bind到此exchange的queue都可以接收消息。

代码示例:


    /**
     * 通道队列名
     */
    private String channelQueue = "ykt.channel.queue." + HostUtils.getMac();
    /**
     * 死信队列通道名
     */
    private String deadChannelQueue = "ykt.channel.dead.queue";
    /**
     * 通道交换机名
     */
    private String channelExchange = "ykt.channel.exchange";

    /**
     * 死信交换机名
     */
    private String deadExchange = "ykt.channel.dead.letter.exchange";
    /**
     * 死信路由名
     */
    private String deadRoutingKey = "ykt.channel.dead.letter.routeKey";
    /**
     * 通道队列路有名
     */
    private String channelRoutingKey = "ykt.channel.routeKey";

    /**
     * 通道队列
     *
     * @param: []
     * @return: org.springframework.amqp.core.Queue
     **/
    @Bean
    public Queue channelQueue() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-message-ttl", 5 * 1000);
        params.put("x-dead-letter-exchange", deadExchange);
        params.put("x-dead-letter-routing-key", deadRoutingKey);
        return new Queue(channelQueue, false, false, false, params);
    }

    /**
     * 通道交换机 fanout广播模式
     *
     * @param: []
     * @return: org.springframework.amqp.core.Exchange
     **/
    @Bean
    public Exchange channelExchange() {
        return ExchangeBuilder.fanoutExchange(channelExchange).durable(true).build();
    }

    /**
     * 通道队列绑定交换机
     *
     * @param: []
     * @return: org.springframework.amqp.core.Binding
     **/
    @Bean
    public Binding channelBinding() {
        return BindingBuilder.bind(channelQueue()).to(channelExchange()).with(channelQueue).noargs();
    }

3.1.2、redis的订阅与广播

redis支持消息订阅与发布机制,可以使用该机制实现不同服务间的消息转发。
在广播消息时,需要携带能唯一标识接收者身份的字段(例如clientId)。消息广播结束后,所有服务端会收到该消息,服务端仅仅需要判断该消息接收者的是否是连接的自己作为服务端。若发现该接收者正是连接的自己,则直接将消息转发到该客户端即可。

3.2、channel和user的映射需要程序自己实现

建立通道与通道组,和用户与房间的映射关系。通过下面一图可见:

image.png

具体实现,我们是使用Map集合,将之存储在Jvm内存中。也就是说,在应用重启的时候,会丢失用户和通道,房间和通道组的关联。随之,也会断开和端建立的通道,用户会需要重新进入房间。
换句话说,应用在重启的时候,会给用户带来极大的不便,体验方面也影响挺大。这也是为什么IM系统都会在最外面有连接层,极少重启连接层,重启的都是后面的消息处理模块。

3.3、数据结构设计

保存所有的房间(通道组)及其里面的成员用户(通道)。

    /**
     * key是groupId 
     * value是ChannelGroup
     * 存放ChannelGroup
     * groupId 也就是roomId
     */
    private static Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();

    /**
     * key是groupId 
     * value又是一个Key-Value格式,是每个用户对应的Channel
     * [groupId -> [clientId1->channelId1,clientId2->channelId2,clientId3->channelId3,****],]
    * 存放Channel
     * groupId 也就是roomId
     * clientId 也就是userId
     * channelId 也就是Channel的id
     */
    private static Map<String, Map<String, ChannelId>> groupIdClientIdChannelIdMap = new ConcurrentHashMap<>();

四、Netty的启动流程

先贴一张图:


netty架构概念图.png image.png

配置项

五、数模设计

image.png

六、多种业务通道的实现

image.png

初始化

应用重启的时候,扫描所有的注解及方法,读取上面的属性,然后缓存到集合。为后面的工厂方法做准备。

工厂方法

读取请求报文中的appKey和command参数,根据appKey查找到对应的EndpointService实现类,根据command查到对应的Method方法。
使用Method.invoke()方法执行具体的动作。

image.png
// 类Object的集合instanceMapper
Object instance = instanceMapper.getEndpointInstance(appKey);
// 方法Method的结合pointMapper
Method onCommand = pointMapper.getOnCommand(appKey, command);

// instance是指向类,第二个参数是Session,第三个参数text是消息内容
onCommand.invoke(instance, session, text);

七、通道的安全

每次请求都校验token的有效期,如果已过期,则提示重新登录或自动延长有效期。

八、参考链接

九、存在的问题

上一篇下一篇

猜你喜欢

热点阅读