2023-11-30跟着源码学IM(十二):基于Netty打造一

2023-11-29  本文已影响0人  jackjiang20212

本文由竹子爱熊猫分享,原题“(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序”,本文有修订和改动。

1、引言

关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。

原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。

2、配套源码

本文配套源码的开源托管地址是:

1)主地址:https://github.com/liuhaijieAdmin/springboot-netty

2)备地址:https://github.com/52im/springboot-netty

3、知识准备

关于 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

有关Netty的入门文章:

1)新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

2)写给初学者:Java高性能NIO框架Netty的学习方法和进阶策略

3)史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战

如果你连Java NIO都不知道,下面的文章建议优先读:

1)少啰嗦!一分钟带你读懂Java的NIO和经典IO的区别

2)史上最强Java NIO入门:担心从入门到放弃的,请读这篇!

3)Java的BIO和NIO很难懂?用代码实践给你看,再不懂我转行!

Netty源码和API 在线查阅地址:

1)Netty-4.1.x 完整源码(在线阅读版)

2)Netty-4.1.x API文档(在线版)

4、基于Netty设计通信协议

协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。

当自己想要打造一款IM通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。

但为什么需要通信协议呢?

因为TCP/IP中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。

5、Redis通信的RESP协议参考学习

不知大家是否还记得之前我聊到的RESP客户端协议,这是Redis提供的一种客户端通信协议。如果想要操作Redis,就必须遵守该协议的格式发送数据。

这个协议特别简单,如下:

1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;

2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;

3)最后再拼接上具体的子命令,同样用换行符分割。

这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。

如下:

客户端命令:

    setname ZhuZi

转变为RESP指令:

    *3

    $3

    set

    $4

    name

    $5

    ZhuZi

按照Redis的规定,但凡满足RESP协议的客户端,都可以直接连接并操作Redis服务端,这也就意味着咱们可以直接通过Netty来手写一个Redis客户端。

代码如下:

// 基于Netty、RESP协议实现的Redis客户端

publicclassRedisClient {

    // 换行符的ASCII码

    staticfinalbyte[] LINE = {13, 10};

    publicstaticvoidmain(String[] args) {

        EventLoopGroup worker = newNioEventLoopGroup();

        Bootstrap client = newBootstrap();

        try{

            client.group(worker);

            client.channel(NioSocketChannel.class);

            client.handler(newChannelInitializer<SocketChannel>() {

                @Override

                protectedvoidinitChannel(SocketChannel socketChannel)

                                                        throwsException {

                    ChannelPipeline pipeline = socketChannel.pipeline();

                    pipeline.addLast(newChannelInboundHandlerAdapter(){

                        // 通道建立成功后调用:向Redis发送一条set命令

                        @Override

                        publicvoidchannelActive(ChannelHandlerContext ctx)

                                                            throwsException {

                            String command = "set name ZhuZi";

                            ByteBuf buffer = respCommand(command);

                            ctx.channel().writeAndFlush(buffer);

                        }

                        // Redis响应数据时触发:打印Redis的响应结果

                        @Override

                        publicvoidchannelRead(ChannelHandlerContext ctx,

                                                Object msg) throwsException {

                            // 接受Redis服务端执行指令后的结果

                            ByteBuf buffer = (ByteBuf) msg;

                            System.out.println(buffer.toString(CharsetUtil.UTF_8));

                        }

                    });

                }

            });

            // 根据IP、端口连接Redis服务端

            client.connect("192.168.12.129", 6379).sync();

        } catch(Exception e){

            e.printStackTrace();

        }

    }

    privatestaticByteBuf respCommand(String command){

        // 先对传入的命令以空格进行分割

        String[] commands = command.split(" ");

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

        // 遵循RESP协议:先写入指令的个数

        buffer.writeBytes(("*"+ commands.length).getBytes());

        buffer.writeBytes(LINE);

        // 接着分别写入每个指令的长度以及具体值

        for(String s : commands) {

            buffer.writeBytes(("$"+ s.length()).getBytes());

            buffer.writeBytes(LINE);

            buffer.writeBytes(s.getBytes());

            buffer.writeBytes(LINE);

        }

        // 把转换成RESP格式的命令返回

        returnbuffer;

    }

}

在上述这个案例中,也仅仅只是通过respCommand()这个方法,对用户输入的指令进行了转换。同时在上面通过Netty,与Redis的地址、端口建立了连接。在连接建立成功后,就会向Redis发送一条转换成RESP指令的set命令。接着等待Redis的响应结果并输出,如下:

+OK

因为这是一条写指令,所以当Redis收到执行完成后,最终就会返回一个OK,大家也可直接去Redis中查询,也依旧能够查询到刚刚写入的name这个键值。

6、HTTP超文本传输协议参考学习

前面咱们自己针对于Redis的RESP协议,对用户指令进行了封装,然后发往Redis执行。

但对于这些常用的协议,Netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。

比如现在咱们可以基于Netty提供的处理器,实现一个简单的HTTP服务器。

代码如下:

// 基于Netty提供的处理器实现HTTP服务器

publicclassHttpServer {

    publicstaticvoidmain(String[] args) throwsInterruptedException {

        EventLoopGroup boss = newNioEventLoopGroup();

        EventLoopGroup worker = newNioEventLoopGroup();

        ServerBootstrap server = newServerBootstrap();

        server

            .group(boss,worker)

            .channel(NioServerSocketChannel.class)

            .childHandler(newChannelInitializer<NioSocketChannel>() {

                @Override

                protectedvoidinitChannel(NioSocketChannel ch) {

                    ChannelPipeline pipeline = ch.pipeline();

                    // 添加一个Netty提供的HTTP处理器

                    pipeline.addLast(newHttpServerCodec());

                    pipeline.addLast(newChannelInboundHandlerAdapter() {

                        @Override

                        publicvoidchannelRead(ChannelHandlerContext ctx,

                                                Object msg) throwsException {

                            // 在这里输出一下消息的类型

                            System.out.println("消息类型:"+ msg.getClass());

                            super.channelRead(ctx, msg);

                        }

                    });

                    pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {

                        @Override

                        protectedvoidchannelRead0(ChannelHandlerContext ctx,

                                                    HttpRequest msg) throwsException {

                            System.out.println("客户端的请求路径:"+ msg.uri());

                            // 创建一个响应对象,版本号与客户端保持一致,状态码为OK/200

                            DefaultFullHttpResponse response =

                                    newDefaultFullHttpResponse(

                                            msg.protocolVersion(),

                                            HttpResponseStatus.OK);

                            // 构造响应内容

                            byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

                            // 设置响应头:告诉客户端本次响应的数据长度

                            response.headers().setInt(

                                HttpHeaderNames.CONTENT_LENGTH,content.length);

                            // 设置响应主体

                            response.content().writeBytes(content);

                            // 向客户端写入响应数据

                            ctx.writeAndFlush(response);

                        }

                    });

                }

            })

            .bind("127.0.0.1",8888)

            .sync();

    }

}

在该案例中,咱们就未曾手动对HTTP的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个HttpServerCodec处理器,这个处理器是Netty官方提供的。

其类继承关系如下:

publicfinalclassHttpServerCodec

    extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>

    implementsSourceCodec {

    // ......

}

观察会发现,该类继承自CombinedChannelDuplexHandler这个组合类,它组合了编码器、解码器。

这也就意味着HttpServerCodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。

同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。

此时在浏览器输入http://127.0.0.1:8888/index.html,结果如下:

消息类型:classio.netty.handler.codec.http.DefaultHttpRequest

消息类型:classio.netty.handler.codec.http.LastHttpContent$1

客户端的请求路径:/index.html

此时来看结果,客户端的请求会被解析成两个部分:

1)第一个是请求信息;

2)第二个是主体信息。

但按理来说浏览器发出的请求,属于GET类型的请求,GET请求是没有请求体信息的,但Netty依旧会解析成两部分~,只不过GET请求的第二部分是空的。

在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

7、自定义消息传输协议

7.1概述

Netty除开提供了HTTP协议的处理器外,还提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列协议的实现,具体定义位于io.netty.handler.codec这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。

很多基于Netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。

所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。

7.2自定义协议的要素

在自定义传输协议时,咱们必然需要考虑几个因素,如下:

1)魔数:用来第一时间判断是否为自己需要的数据包;

2)版本号:提高协议的拓展性,方便后续对协议进行升级;

3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如Json、ProtoBuf、JDK...;

4)消息类型:第一时间判断出当前消息的类型;

5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;

6)正文长度:提供给LTC解码器使用,防止解码时出现粘包、半包的现象;

7)消息正文:本次消息要传输的具体数据。

在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。

基于上述几个字段,能够在第一时间内判断出:

1)消息是否可用;

2)当前协议版本;

3)消息的具体类型;

4)消息的长度等各类信息。

从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。

7.3自定义协议实战

前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。

因此实现如下:

@ChannelHandler.Sharable

publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {

    // 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)

    @Override

    protectedvoidencode(ChannelHandlerContext ctx, Message msg,

                          List<Object> list) throwsException {

        ByteBuf outMsg = ctx.alloc().buffer();

        // 前五个字节作为魔数

        byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};

        outMsg.writeBytes(magicNumber);

        // 一个字节作为版本号

        outMsg.writeByte(1);

        // 一个字节表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....

        outMsg.writeByte(0);

        // 一个字节用于表示消息类型

        outMsg.writeByte(msg.getMessageType());

        // 四个字节表示消息序号

        outMsg.writeInt(msg.getSequenceId());

        // 使用Java-Serializable的方式对消息对象进行序列化

        ByteArrayOutputStream bos = newByteArrayOutputStream();

        ObjectOutputStream oos = newObjectOutputStream(bos);

        oos.writeObject(msg);

        byte[] msgBytes = bos.toByteArray();

        // 使用四个字节描述消息正文的长度

        outMsg.writeInt(msgBytes.length);

        // 将序列化后的消息对象作为消息正文

        outMsg.writeBytes(msgBytes);

        // 将封装好的数据传递给下一个处理器

        list.add(outMsg);

    }

    // 消息入站时会经过的解码方法(将自定义格式的消息转变为具体的消息对象)

    @Override

    protectedvoiddecode(ChannelHandlerContext ctx,

                          ByteBuf inMsg, List<Object> list) throwsException {

        // 读取前五个字节得到魔数

        byte[] magicNumber = newbyte[5];

        inMsg.readBytes(magicNumber,0,5);

        // 再读取一个字节得到版本号

        byteversion = inMsg.readByte();

        // 再读取一个字节得到序列化方式

        byteserializableType = inMsg.readByte();

        // 再读取一个字节得到消息类型

        bytemessageType = inMsg.readByte();

        // 再读取四个字节得到消息序号

        intsequenceId = inMsg.readInt();

        // 再读取四个字节得到消息正文长度

        intmessageLength = inMsg.readInt();

        // 再根据正文长度读取序列化后的字节正文数据

        byte[] msgBytes = newbyte[messageLength];

        inMsg.readBytes(msgBytes,0,messageLength);

        // 对于读取到的消息正文进行反序列化,最终得到具体的消息对象

        ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);

        ObjectInputStream ois = newObjectInputStream(bis);

        Message message = (Message) ois.readObject();

        // 最终把反序列化得到的消息对象传递给后续的处理器

        list.add(message);

    }

}

上面自定义的处理器中,继承了MessageToMessageCodec类,主要负责将数据在原生ByteBuf与Message之间进行相互转换,而Message对象是自定义的消息对象,这里暂且无需过多关心。

其中主要实现了两个方法:

1)encode():出站时会经过的编码方法,会将原生消息对象按自定义的协议封装成对应的字节数据;

2)decode():入站时会经过的解码方法,会将协议格式的字节数据,转变为具体的消息对象。

上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:

1)魔数:使用第1~5个字节来描述,这个魔数值可以按自己的想法自定义;

2)版本号:使用第6个字节来描述,不同数字表示不同版本;

3)序列化算法:使用第7个字节来描述,不同数字表示不同序列化方式;

4)消息类型:使用第8个字节来描述,不同的消息类型使用不同数字表示;

5)消息序号:使用第9~12个字节来描述,其实就是一个四字节的整数;

6)正文长度:使用第13~16个字节来描述,也是一个四字节的整数;

7)消息正文:长度不固定,根据每次具体发送的数据来决定。

在其中,为了实现简单,这里的序列化方式,则采用的是JDK默认的Serializable接口方式,但这种方式生成的对象字节较大,实际情况中最好还是选择谷歌的ProtoBuf方式,这种算法属于序列化算法中,性能最佳的一种落地实现。

当然,这个自定义的协议是提供给后续的聊天业务使用的,但这种实战型的内容分享,基本上代码量较高,所以大家看起来会有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑马Netty视频教程》二次改良的,因此如若感觉文字描述较为枯燥,可直接点击前面给出的链接,观看P101~P121视频进行学习。

最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。

PS:如果你想系统学习Protobuf,可以从以下文章入手:

如何选择即时通讯应用的数据传输格式

强列建议将Protobuf作为你的即时通讯应用数据传输格式

IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点

IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理

IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理

IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)

最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。

PS:如果你想系统学习Protobuf,可以从以下文章入手:

如何选择即时通讯应用的数据传输格式

强列建议将Protobuf作为你的即时通讯应用数据传输格式

IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点

IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理

IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理

IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)

12、系列文章

跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制

跟着源码学IM(二):自已开发IM很难?手把手教你撸一个Andriod版IM

跟着源码学IM(三):基于Netty,从零开发一个IM服务端

跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统

跟着源码学IM(五):正确理解IM长连接、心跳及重连机制,并动手实现

跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统

跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天

跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天

跟着源码学IM(九):基于Netty实现一套分布式IM系统

跟着源码学IM(十):基于Netty,搭建高性能IM集群(含技术思路+源码)

跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)

跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序》(* 本文

SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能

13、参考资料

[1]浅谈IM系统的架构设计

[2]简述移动端IM开发的那些坑:架构设计、通信协议和客户端

[3]一套海量在线用户的移动端IM架构设计实践分享(含详细图文)

[4]一套原创分布式即时通讯(IM)系统理论架构方案

[5]一套亿级用户的IM架构技术干货(上篇):整体架构、服务拆分等

[6]一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等

[7]史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战

[8]强列建议将Protobuf作为你的即时通讯应用数据传输格式

[9]IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

[10]融云技术分享:全面揭秘亿级IM消息的可靠投递机制

[11]IM群聊消息如此复杂,如何保证不丢不重?

[12]零基础IM开发入门(四):什么是IM系统的消息时序一致性?

[13]如何保证IM实时消息的“时序性”与“一致性”?

[14]微信的海量IM聊天消息序列号生成实践(算法原理篇)

[15]网易云信技术分享:IM中的万人群聊技术方案实践总结

[16]融云IM技术分享:万人群聊消息投递方案的思考和实践

[17]为何基于TCP协议的移动端IM仍然需要心跳保活机制?

[18]一文读懂即时通讯应用中的网络心跳包机制:作用、原理、实现思路等

[19]微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)

[20]融云技术分享:融云安卓端IM产品的网络链路保活技术实践

[21]彻底搞懂TCP协议层的KeepAlive保活机制

[22]深度解密钉钉即时消息服务DTIM的技术设计

(本文已同步发布于:http://www.52im.net/thread-4530-1-1.html

上一篇下一篇

猜你喜欢

热点阅读