Netty

用Netty构建自定义协议

2017-08-28  本文已影响0人  程序员修炼笔记

在复杂的网络世界中,各种应用之间通信需要依赖各种各样的协议,比如:HTTP,Telnet,FTP,SMTP等等。
在开发过程中,有时候我们需要构建一些适应自己业务的应用层协议,比如银行业中通用的8583报文,Netty作为目前Java-NIO方向最优秀的框架,可以帮助我们快速构建自定议协议,本文将以一个简洁的例子帮助大家来了解一下。

协议约定

协议名称: FF

image.png

协议规则:
如图所示,分为Header和Content两部分,Content的长度为变长,由header中的content-length来定义。

定义消息对象

FFHeader.java

package com.jack.study.netty01.customJianShu.mesage;

//消息的头部
public class FFHeader {

    // 协议版本
    private int version;
    // 消息内容长度
    private int contentLength;
    // 服务名称
    private String sessionId;

    public FFHeader(int version, int contentLength, String sessionId) {
        this.version = version;
        this.sessionId = sessionId;
        this.contentLength = contentLength;
    }

    public int getVersion() {
        return version;
    }

    public void setVersion(int version) {
        this.version = version;
    }

    public int getContentLength() {
        return contentLength;
    }

    public void setContentLength(int contentLength) {
        this.contentLength = contentLength;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }
}

FFMessage.java

package com.jack.study.netty01.customJianShu.mesage;

//消息的主体
public class FFMessage {

    private FFHeader luckHeader;
    private String content;

    public FFMessage(FFHeader luckHeader, String content) {
        this.luckHeader = luckHeader;
        this.content = content;
    }

    public FFHeader getLuckHeader() {
        return luckHeader;
    }

    public void setLuckHeader(FFHeader luckHeader) {
        this.luckHeader = luckHeader;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]", luckHeader.getVersion(),
                luckHeader.getContentLength(), luckHeader.getSessionId(), content);
    }
}

定义编码器

package com.jack.study.netty01.customJianShu.codec;

import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 编码器
 *
 */
public class FFEncoder extends MessageToByteEncoder<FFMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, FFMessage message, ByteBuf out) throws Exception {
        // 将Message转换成二进制数据
        FFHeader header = message.getLuckHeader();
        // 写入Header信息
        out.writeInt(header.getVersion());
        out.writeInt(header.getContentLength());
        out.writeBytes(header.getSessionId().getBytes());

        // 写入消息主体信息
        out.writeBytes(message.getContent().getBytes());
    }
}

这里没有什么好说的,就是按定义好的顺序输出即可。

定义解码器

package com.jack.study.netty01.customJianShu.codec;

import java.util.List;

import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * 解码器
 *
 */
public class FFDecoder extends ByteToMessageDecoder {

    private final static int HEADER_LENGTH = 44;// header的长度

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 长度不足,退出
        if (in.readableBytes() < HEADER_LENGTH) {
            return;
        }
        // 获取协议的版本
        int version = in.readInt();
        // 获取消息长度
        int contentLength = in.readInt();
        // 获取SessionId
        byte[] sessionByte = new byte[36];
        in.readBytes(sessionByte);
        String sessionId = new String(sessionByte);
        // 组装协议头
        FFHeader header = new FFHeader(version, contentLength, sessionId);

        // 长度不足重置读index,退出
        if (in.readableBytes() < contentLength) {
            in.setIndex(in.readerIndex() - HEADER_LENGTH, in.writerIndex());
            return;
        }

        byte[] content = new byte[contentLength];
        // 读取消息内容
        in.readBytes(content);

        FFMessage message = new FFMessage(header, new String(content));

        out.add(message);
    }
}
这个类是核心的处理了,其中两个IF代码段的处理是为了解决拆包粘包的问题。如果没有这两段消息的解析在多条消息时就会产生错乱。

Server监听消息

package com.jack.study.netty01.customJianShu.server;

import com.jack.study.netty01.customJianShu.codec.FFDecoder;
import com.jack.study.netty01.customJianShu.codec.FFEncoder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

    // 指定端口号
    private static final int PORT = 8888;

    public static void main(String args[]) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 指定socket的一些属性
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 指定是一个NIO连接通道
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>(){

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            // 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
                            // 这里必须给每个Handler都添加一个独立的Decoder.
                            pipeline.addLast(new FFEncoder());
                            pipeline.addLast(new FFDecoder());

                            // 添加逻辑控制层
                            pipeline.addLast(new ServerHandler());
                            
                        }
                        
                    });

            // 绑定对应的端口号,并启动开始监听端口上的连接
            Channel ch = serverBootstrap.bind(PORT).sync().channel();

            System.out.printf("luck协议启动地址:127.0.0.1:%d/\n", PORT);

            // 等待关闭,同步端口
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.jack.study.netty01.customJianShu.server;

import com.jack.study.netty01.customJianShu.mesage.FFMessage;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ServerHandler extends SimpleChannelInboundHandler<FFMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FFMessage msg) throws Exception {
        // 简单地打印出server接收到的消息
        System.out.println("接收:"+msg);
    }
}

Client端发送消息

package com.jack.study.netty01.customJianShu.client;

import java.util.UUID;

import com.jack.study.netty01.customJianShu.codec.FFDecoder;
import com.jack.study.netty01.customJianShu.codec.FFEncoder;
import com.jack.study.netty01.customJianShu.mesage.FFHeader;
import com.jack.study.netty01.customJianShu.mesage.FFMessage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

    public static void main(String args[]) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 添加编码器
                    pipeline.addLast(new FFEncoder());
                    // 添加解码器
                    pipeline.addLast(new FFDecoder());
                    // 业务处理类(只打印了消息内容)
                    pipeline.addLast(new ClientHandler());
                }
            });

            // 连接服务端
            Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
            int version = 1;
            String sessionId = UUID.randomUUID().toString();
            String str = "Hello!";

            // 发送1000000条消息
            for (int i = 0; i < 100000; i++) {
                String content = str + "----" + i;
                FFHeader header = new FFHeader(version, content.length(), sessionId);
                FFMessage message = new FFMessage(header, content);
                ch.writeAndFlush(message);
            }

            ch.closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }
}

package com.jack.study.netty01.customJianShu.client;

import com.jack.study.netty01.customJianShu.mesage.FFMessage;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<FFMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FFMessage message) throws Exception {
        System.out.println(message);
    }
}

这里为什么发送100000条消息,而不是1条,主要是为了测试消息处理的确性,只发送1条是无法暴露拆包粘包问题的。

测试

  1. 启动Server.java
  2. 启动Client.java
    运行截图:
image.png image.png

参考文章:http://www.jianshu.com/p/ba21eb32ae97

上一篇下一篇

猜你喜欢

热点阅读