基于netty 实现 ws协议的 im 组件(一)

2020-06-17  本文已影响0人  原来Yhy

期望达成目标:

1.消息稳定可靠

2.支持点对点消息

3.支持一对多消息

4.支持消息广播

5.支持节点扩容

6.支持服务注册发现

针对目标的思考:

1.消息稳定可靠方面:

采用netty为网络框架,实现websocket协议(长连接),如需要持久化消息,可将消息写入数据库,接收端进行消息确认。

2.点对点消息的支持

通过给 channel 绑定身份标识,消息体指定消息类型

3.支持一对多消息

通过拿到用户所属的组 ,将channel 加入ChannelGroup

4.支持广播消息

单机模式,通过 channel 的 map,获取所有的 channel 进行广播

多节点模式,先将消息发送的直接发布订阅的中间件,每台服务收到中间件的广播,将消息发送到当前节点的所有channel

5.节点扩容

 采用消息中间件的发布订阅模式,将收到的消息先投递到消息中间件,服务节点通过消费中间件消息,进行当前节点消息的转发

6.支持服务注册发现

 采用 springcloud 进行服务的注册与发现(即通过springcloud 服务注册相关的实现,Nacos,eurake等)

具体实现:

1.基于消息code进行业务处理的事件机制,服务启动时会从spring上下文中拿到 CmdProcess 实现,通过 code 进行 事件的分发

public interface CmdProcess {

    /**

     * 消息接收处理

     * @param message 消息体

     * @param channel 上下文

     * @return 响应消息体(无响应,则返回null)

     */

    Message handler(Message message, Channel channel);

    /**

     * 设置命令码(此处的命令码,需要和消息包对应上)

     * @return 命令码

     */

    Byte getCmdCode();

}
image.gif

2.连接的安全认证机制(第一次握手执行的事件),提供了简单的实现,通过具体业务定义自己的认证实现

public abstract class AuthProcess {

    /**

     * 登录事件

     * @param username 用户名

     * @param password 用户密码

     * @return IM用户对象

     */

    public abstract ImSession login(String username, String password);

}

image.gif

3.channel 建立连接生命周期的执行的事件(客户端连接成功的建立,客户端连接的断开)

public interface LifeCycleEvent {

    /**

     * 绑定通道上下文

     * @param login

     * @param ctx

     */

    void bindContext(ImSession login, ChannelHandlerContext ctx);

    /**

     * 解绑通道上下文

     * @param channel

     */

    void cleanContext(Channel channel);

}
image.gif

4.多节点部署消息分发接口的预留(通过参数设置,是否采用多节点模式)

public interface ImClusterTopic {

    /**

     * 发布消息

     */

    void publish(ClusterMessage message);

    /**

     * 订阅消息(采用广播模式)

     */

    void consumer();

}

image.gif

5.消息分发的工具类

package com.awy.common.ws.netty.toolkit;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.awy.common.message.api.packets.Message;
import com.awy.common.ws.netty.cluster.ImClusterTopic;
import com.awy.common.ws.netty.context.GlobalContent;
import com.awy.common.ws.netty.context.SessionContext;
import com.awy.common.ws.netty.config.ImConfig;
import com.awy.common.ws.netty.model.ClusterMessage;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Locale;
import java.util.Map;

@Slf4j
public class ImSendUtil {

    /**
     * 发送给指定用户
     * @param userId 用户id
     * @param message 消息体
     */
    public static void sendUser(String userId, Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.chatMessage(userId,message));
        }else {
            sendCurrentNodeUser(userId,message);
        }
    }

    /**
     * 发送给指定用户列表
     * @param userIds 用户id列表
     * @param message 消息体
     */
    public static void sendUsers(List<String> userIds,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.chatsMessage(userIds,message));
        }else {
            sendCurrentNodeUsers(userIds,message);
        }
    }

    /**
     * 发送指定群组
     * @param groupId 群组id
     * @param message 消息体
     */
    public static void sendGroup(String groupId,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.groupMessage(groupId,message));
        }else {
            sendCurrentNodeGroup(groupId,message);
        }
    }

    /**
     * 发送指定群组列表
     * @param groupIds 群组列表
     * @param message 消息体
     */
    public static void sendGroups(List<String> groupIds,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.groupsMessage(groupIds,message));
        }else {
            sendCurrentNodeGroups(groupIds,message);
        }
    }

    /**
     * 发送给全部用户
     * @param message
     */
    public static void sendAll(Message message){
        //if cluster
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.noStateMessage(message));
        }else {
            //if standalone
            sendCurrentNodeAllChannel(message);
        }
    }

    /**
     * 发送给当前节点指定用户
     * @param userId 用户id
     * @param message 消息
     */
    public static void sendCurrentNodeUser(String userId,Message message){
        Channel channel = SessionContext.getChannel(userId);
        if(channel != null){
            channel.writeAndFlush(getMessage(message));
        }
    }

    /**
     * 发送给当前节点指定用户列表
     * @param userIds 用户id 列表
     * @param message 消息体
     */
    public static void sendCurrentNodeUsers(List<String> userIds,Message message){
        if(CollUtil.isNotEmpty(userIds)){
            if(userIds.size() == 1){
                sendCurrentNodeUser(userIds.get(0),message);
            }else {
                for (String userId : userIds) {
                    sendCurrentNodeUser(userId,message);
                }
            }
        }
    }

    /**
     * 发送给当前节点指定群组
     * @param groupId
     * @param message
     */
    public static void sendCurrentNodeGroup(String groupId,Message message){
        ChannelGroup channelGroup = SessionContext.getChannelGroup(groupId);
        if(channelGroup != null){
            channelGroup.writeAndFlush(getMessage(message));
        }
    }

    /**
     * 发送给当前节点指定群组列表
     * @param groupIds 群组id列表
     * @param message 消息
     */
    public static void sendCurrentNodeGroups(List<String> groupIds, Message message){
        if(CollUtil.isNotEmpty(groupIds)){
            if(groupIds.size() == 1){
                sendCurrentNodeGroup(groupIds.get(0),message);
            }else {
                for (String groupId : groupIds) {
                    sendCurrentNodeGroup(groupId,message);
                }
            }
        }
    }

    /**
     * 发送给当前节点所有用户
     * @param message 消息
     */
    public static void sendCurrentNodeAllChannel(Message message){
        for (Map.Entry<String, Channel> entry : SessionContext.getAllChannel().entrySet()){
            entry.getValue().writeAndFlush(getMessage(message));
        }
    }

    /**
     * 是否多节点
     * @return
     */
    private static boolean isCluster(){
        return ImConfig.getImConfig().getPropertiesConfig().isCluster();
    }

    /**
     * 获取节点推送主题
     * @return
     */
    private static ImClusterTopic getImClusterTopic(){
        return GlobalContent.getInstance().getImClusterTopic();
    }

    /**
     * 获取消息体
     * @param message 消息体
     * @return webSocket 消息体
     */
    public static TextWebSocketFrame getMessage(Message message){
        if(message == null){
            log.error(">>>>>>>>>>>> message can not be empty ");
            return null;
        }
        String result = JSONUtil.toJsonStr(message);
        return new TextWebSocketFrame(result);
    }

}
image.gif

6.webSocket服务器的实现

public class WebSocketServer {

    /**

     * 是否启用ssl

     */

    private boolean ssl = false;

    //监听端口

    private int port;

    //ws 前缀

    private String websocketPath;

    private ServerBootstrap serverBootstrap;

    private NioEventLoopGroup boss;

    private NioEventLoopGroup work;

    private WebSocketServer(){}

    public WebSocketServer(int port, String websocketPath, boolean ssl, AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){

        this.ssl = ssl;

        this.port = port;

        this.websocketPath = websocketPath;

        //设置全局上下文

        if(authProcess == null){

            authProcess = new SimpleAuthProcess();

        }

        if( lifeCycleEvent == null){

            lifeCycleEvent = new SimpleLifeCycleEvent();

        }

        GlobalContent globalContent = GlobalContent.getInstance();

        globalContent.setAuthProcess(authProcess);

        globalContent.setImClusterTopic(imClusterTopic);

        globalContent.setLifeCycleEvent(lifeCycleEvent);

        //主从 react 模型

        serverBootstrap = new ServerBootstrap();

        boss = new NioEventLoopGroup(1);

        work = new NioEventLoopGroup();

        serverBootstrap.group(boss,work)

                .channel(NioServerSocketChannel.class)

                .childHandler(new WebSocketServerInitializer(getSslContext(),websocketPath));

    }

    private SslContext getSslContext(){

        SslContext sslCtx = null;

        try{

            if (this.ssl) {

                SelfSignedCertificate ssc = new SelfSignedCertificate();

                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

            }

        }catch (Exception e){

            e.printStackTrace();

        }

        return sslCtx;

    }

    public void start(){

        try {

            Channel ch = serverBootstrap.bind(port).sync().channel();

            log.info("Open your web browser and navigate to " +

                    (ssl ? "https" : "http") + "://127.0.0.1:" + port + "" + websocketPath);

        } catch (Exception e){

            e.printStackTrace();

        }

    }

    public void stop(){

        boss.shutdownGracefully();

        work.shutdownGracefully();

    }

}
image.gif

7.spring引导类

public class NettyWebSocketStarter {

    private Integer port;

    private String websocketPath;

    /**
     * 是否启用ssl
     */
    private boolean ssl = false;

    private WebSocketServer server;

    /**
     * 认证处理器
     */
    private AuthProcess authProcess;

    private LifeCycleEvent lifeCycleEvent;

    private ImClusterTopic imClusterTopic;

    private NettyWebSocketStarter(){}

    public NettyWebSocketStarter(AuthProcess authProcess){
        this(authProcess,null,null);
    }

    public NettyWebSocketStarter(AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
        this.authProcess = authProcess;
        this.lifeCycleEvent = lifeCycleEvent;
        this.imClusterTopic = imClusterTopic;
    }

    @PostConstruct
    public void init(){
        setAttributes(ImConfig.getImConfig().getPropertiesConfig());
        initCmdProcess();
        server = new WebSocketServer(port,websocketPath,ssl,authProcess,lifeCycleEvent,imClusterTopic);
        server.start();
        registerDiscovery(ImConfig.getImConfig().getPropertiesConfig());
    }

    private void setAttributes(ImPropertiesConfig propertiesConfig){
        String packetScanPath = "";
        if(propertiesConfig == null){
            log.error(">>>>>>>> im config prefix: im.ws ");
            log.error(">>>>>>>> im config attributes can not by empty ");
            System.exit(0);
        }

        if(propertiesConfig.isCluster() && this.imClusterTopic == null){
            log.error(">>>>>>>> not allowed  when Cluster model imClusterTopic is null");
            System.exit(0);
        }

        this.port = propertiesConfig.getPort();
        if(this.port == null){
            this.port = getPort();
        }
        this.websocketPath = propertiesConfig.getWebsocketPath();
        if(this.websocketPath == null || this.websocketPath.isEmpty()){
            this.websocketPath = ImCommonConstant.DEFAULT_WEBSOCKET_PATH;
        }

        this.ssl = propertiesConfig.isSsl();
        packetScanPath = propertiesConfig.getPacketScan();
        if(packetScanPath == null || packetScanPath.isEmpty()){
            log.error(">>>>>>>> im.ws.packetScan can not by empty ");
            System.exit(0);
        }
        initPacket(packetScanPath);

    }

    private void registerDiscovery(ImPropertiesConfig propertiesConfig){
        if(propertiesConfig.isRegister()){
            Map<String, AbstractAutoServiceRegistration> serviceRegistrationMap = getApplicationContext().getBeansOfType(AbstractAutoServiceRegistration.class);
            for (Map.Entry<String,AbstractAutoServiceRegistration>  registrationEntry : serviceRegistrationMap.entrySet()){
                registrationEntry.getValue().start();
            }

        }
    }

    private int getPort(){
        int defaultPort = 8888;
        return getPort(defaultPort);
    }

    private int getPort(int port){
        ServerSocket socket = null;
        try{
            socket = new ServerSocket(port);
        }catch (IOException e){
            ++port;
            return getPort(port);
        }finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    log.error("");
                }
            }
        }
        return port;
    }

    private void initCmdProcess(){
        try{
            List<CmdProcess> list = new ArrayList<>();

            String[] beanDefinitionNames = getApplicationContext().getBeanDefinitionNames();
            Stream.of(beanDefinitionNames).forEach(beanName -> {
                Object bean = getApplicationContext().getBean(beanName);
                if(bean instanceof CmdProcess){
                    list.add((CmdProcess)bean);
                }
            });

            ProcessManager.getInstance().addCmdProcessList(list);
            log.info("init im process repository success ! count [" + list.size() + "]");
        }catch (Exception e){
            log.error("nit im process repository error",e);
            System.exit(0);
        }
    }

    private  ApplicationContext getApplicationContext(){
        return ImConfig.getImConfig().getApplicationContext();
    }

    private void initPacket(String packetScanPath){
        try{
            Set<Class<?>> set = ClassUtil.scanPackage(packetScanPath);
            Object obj;
            List<Message> list = new ArrayList<>();
            if(CollectionUtil.isNotEmpty(set)){
                for (Class clazz : set) {
                    obj = ReflectUtil.newInstance(clazz);
                    if(obj instanceof Message){
                        list.add((Message) obj);
                    }
                }
            }

            MessageManager.getInstance().addMessages(list);
            log.info("init IM packet repository success ! count [" + list.size() + "]");
        }catch (Exception e){
            log.error("init packet repository error",e);
            System.exit(0);
        }
    }

    @PreDestroy
    public void stop(){
        server.stop();
    }
}
image.gif

说明:当前实现需要依赖spring,有好的建议欢迎大家提出,指正,最后贴出代码地址

github地址: https://github.com/awyFamily/awy-common-all/tree/master/common-ws-netty

上一篇 下一篇

猜你喜欢

热点阅读