IT@程序员猿媛SpringBoot精选

Netty整合JBoss Marshalling编解码

2019-10-26  本文已影响0人  程就人生

JBoss Marshalling也是在Netty框架中被经常使用的编解码技术,它对JDK默认的序列号框架做了优化,又保持和java.io.Serializable接口的兼容,有必要学习一下,现在看一下JBoss Marshalling在netty中是如何编解码的,对于半包的问题是否有处理呢?

示例编码环境:
springboot2.1.4
jdk1.8

首先,在pom中引入必要的架包:

<!-- jboss-marshalling编解码和序列号架包 -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>2.0.9.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>2.0.9.Final</version>
        </dependency>

第二步,服务端编码实现;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * JBoss Marshalling编解码服务端测试示例
 * @author 程就人生
 * @date 2019年10月26日
 */
public class MarshallingServer {
    
    //创建线程组
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    /**
     * 启动服务
     * @param port
     */
    public void start(int port){

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)
                                    //非阻塞
                                    .channel(NioServerSocketChannel.class)
                                    .option(ChannelOption.SO_BACKLOG, 100)
                                    .handler(new LoggingHandler(LogLevel.INFO))
                                    .childHandler(new ChannelInitializer<SocketChannel>(){

                                        @Override
                                        protected void initChannel(SocketChannel ch) throws Exception {
                                            //JBoss解码
                                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                                            //JBoss编码
                                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                                            ch.pipeline().addLast(new MarshallingServerHandler());
                                        }
                                        
                                    });
            //绑定端口,同步等等成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();           
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    

    public static void main(String[] args) {
        MarshallingServer server = new MarshallingServer();
        server.start(7788);
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * I/O消息的接收和处理
 * @author 程就人生
 * @date 2019年10月26日
 */
public class MarshallingServerHandler extends ChannelInboundHandlerAdapter{

    private static Logger log = LoggerFactory.getLogger(MarshallingServerHandler.class);
    
    /**
     * 接收到消息时的处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Test test = (Test) msg;
        if(test.getBody().contains("程就人生")){
            log.info("接收到的消息:" + test.toString());
            ctx.writeAndFlush(test);
        }
    }
    
    /**
     * 异常时关闭
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        //发生异常,关闭连接
        ctx.close();
    }
}

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
/**
 * JBoss 编解码实现
 * @author 程就人生
 * @date 2019年10月26日
 */
public final class MarshallingCodeCFactory {

    /**
     * 创建JBoss Marshalling解码器
     * @return
     *
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1);
        return decoder;
    }

    /**
     * 创建JBoss Marshalling编码器
     * @return
     *
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}

import java.io.Serializable;
/**
 * java POJO 必须序列化
 * @author 程就人生
 * @date 2019年10月26日
 */
public class Test implements Serializable {
    
    private String uid;
    private String body;
    
    public String getUid() {
        return uid;
    }
    public void setUid(String uid) {
        this.uid = uid;
    }
    public String getBody() {
        return body;
    }
    public void setBody(String body) {
        this.body = body;
    }
    
    @Override
    public String toString() {
        return "Test [uid=" + uid + ", body=" + body + "]";
    }
}

第三步,客户端编码实现;

import com.example.demo.server5.MarshallingCodeCFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * JBoss Marshalling编解码,客户端模拟
 * @author 程就人生
 * @date 2019年10月13日
 */
public class MarshallingClient {

    public void connect(int port, String host){
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)//设置为非阻塞
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //JBoss解码
                    ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    //JBoss编码
                    ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    ch.pipeline().addLast(new MarshallingClientHandler());
                }
            });
            
            //建立连接
            ChannelFuture channelFuture = bootstrap.connect(host, port);
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
            
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        new MarshallingClient().connect(7788, "localhost");
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.demo.server5.Test;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * I/O消息的接收和处理
 * @author 程就人生
 * @date 2019年10月26日
 */
public class MarshallingClientHandler extends ChannelInboundHandlerAdapter{

    private static Logger log = LoggerFactory.getLogger(MarshallingClientHandler.class);
    
    /**
     * 
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Test test = null;
        for(int i=0;i<100;i++){
            test = new Test();
            test.setUid(i+"");
            test.setBody("程就人生"+i);
            ctx.writeAndFlush(test);
        }
    }
     
    /**
     * 接收到消息时的处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      
        Test test = (Test) msg;
        log.info("接收到的消息:" + test.toString());      
    }
    
    /**
     * 异常时关闭
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

第四步,测试;先启动服务端,在启动客户端,为了了解Marshalling对半包的处理,代码里第一次连接服务器成功后,就向服务端主动发送一百次消息,看看服务端是否能够完整接收,完整返回;

服务端测试结果
客户端测试结果

从控制台的结果来看,服务器端很完整地接收到了客户端发送的消息,并没有发送粘包/半包的情况,客户端也完整地接收到了服务端发生的消息;通过本次测试,可以发现JBoss的Marshalling是支持半包处理的,在服务器端的hanlder和客户端的hanlder中只是加了编解码,并未加入其它的对半包处理的代码,使用起来还是很方便的。

上一篇下一篇

猜你喜欢

热点阅读