面试精选网络

TCP粘包和拆包

2021-03-05  本文已影响0人  乙腾

TCP的粘包和拆包

粘包和拆包现象

image.png

客户端给服务端发送数据可能存在的场景:

1.无拆包粘包

服务端分两次读取到了两个独立的数据包,分别是D1和D2。

2.粘包

服务端一次接受到了一个数据包,但是这个数据包包含两个完整的消息D1和D2,D1和D2粘合在一起,称之为TCP粘包。(这里需要注意:一定是一个数据包包含两个完整的消息,才是粘包)

3.拆包

拆包有两种情况:

3.1

服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容。

3.2

服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

notice:

拆包

一个消息被拆分为多个部分,分散在多个数据包中传输。

粘包

多个完整的消息封装在了一个数据包中传输。

产生拆包和粘包的原因

粗略说明:

由于TCP无消息保护边界,需要在接收端处理消息边界问题

TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的,由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。

详细说明:

tcp是以流的方式进行传输的,传输的最小单元为一个报文段(segment)。tcp 通讯协议中的最大传输单元为MTU,一般是1500比特(187.5byte),超过这个量要分为多个报文段,其中可以传输的最大报文长度为mss,一般MSS=1500- 20(IP Header) -20 (TCP Header) = 1460比特,即180多个字节。

tcp为提高性能,发送端会将需要发送的数据发送到缓冲区,等待缓冲区满了之后,再将缓冲中的数据发送到接收方。同理,接收方也有缓冲区这样的机制,来接收数据。

发生TCP粘包、拆包主要是由于下面一些原因:

  1. 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
  2. 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
  3. 进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>mss的时候将发生拆包。
  4. 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

解决方案

自定义协议+编解码器

服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。

image.png

每次在数据包前面加上一个固定的长度,来标识本次消息的长度。

code

客户端代码

MessageDecoder

package com.pl.netty.protocoltcp.decoder;

import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName MessageDecoder  消息解码器
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class MessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MessageDecoder 被调用");
        //关键点在这里
        //现获取int,判断本次数据的字节长度
        int length = in.readInt();
        //读取到字节长度后,从byteBuf中读取指定长度的字节
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成 MessageProtocol 对象,放入 out,传递到下一个Handler
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}

Client

package com.pl.netty.protocoltcp;

import com.pl.netty.protocoltcp.encoder.MessageEncoder;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Scanner;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName Client
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class Client {

    //属性
    private final String host;
    private final int port;

    public Client(String host, int port) {
        this.port = port;
        this.host = host;
    }

    public void run() throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入Handler
                       //     pipeline.addLast("encoder", new MessageEncoder());
                            pipeline.addLast("encoder",new MessageEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("--------" + channel.localAddress() + "---------");
            //客户端需要输入信息,创建一个扫描器

        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Client("127.0.0.1", 7000).run();
    }
}

GroupChatClientHandler

package com.pl.netty.protocoltcp;

import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName GroupChatClientHandler
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class GroupChatClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 3; i++) {
            String message = "Server" + i;
            byte[] content = message.getBytes(CharsetUtil.UTF_8);
            int length = content.length;
            //创建协议包对象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);
        }
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) throws Exception {

    }
}

服务端代码

MessageEncoder

package com.pl.netty.protocoltcp.encoder;

import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName MessageEncoder  消息编码器
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MessageEncoder 方法被调用");
        //编码器的关键,每次传输消息数据的时候,先给定一个固定长度
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

Server

package com.pl.netty.protocoltcp;

import com.pl.netty.protocoltcp.decoder.MessageDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName Server
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class Server {
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run(){
        //bossGroup 用于接收连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //workerGroup 用于具体的处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder", new MessageDecoder());
                           // pipeline.addLast("decoder",new MessageEncoder());
                            pipeline.addLast(new MssgHandler());
                        }
                    });
            System.out.println("netty 服务端启动");
            //Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //监听关闭事件
            channelFuture.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        Server server = new Server(7000);
        server.run();
    }
}

MssgHandler

package com.pl.netty.protocoltcp;

import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName GroupchatServerHandler  继承 SimpleChannelInboundHandler 入栈
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class MssgHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务器第 " + (++count) +" 次接收到信息如下:");
        System.out.println("长度:" + len);
        System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));
        //回复消息
        String response = UUID.randomUUID().toString();
        int length = response.getBytes(CharsetUtil.UTF_8).length;
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(response.getBytes());
        ctx.writeAndFlush(messageProtocol);

    }
}

输出

image.png
上一篇下一篇

猜你喜欢

热点阅读