netty编解码器示例

2020-03-28  本文已影响0人  wallimn

代码很简单,仅是个应用示范,无实际意义。客户端发送“长度+字节数组”形式的消息,服务器收到直接显示。客户端发送时,自动调用编码器编码信息;服务器收到消息,自动调用解码器,完整准确显示收到信息。

一、客户端程序

(一)编码器LenStringEncoder

package com.wallimn.iteye.netty.lenstr;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 将传入的字符串以长度+字节数组的形式编码。
 * 客户端发送信息时,直接发送字符串类型数据,netty的handler调用这个编码器进行编码。
 * 
 * <br>
 * <br>时间:2019年9月11日 下午10:53:42,作者:wallimn
 */
public class LenStringEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        if(msg!=null){
            byte[] bs = msg.getBytes();
            out.writeInt(bs.length);
            out.writeBytes(bs);
        }
    }

}

(二)处理器ClientHandler

package com.wallimn.iteye.netty.lenstr;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客户端消息处理器
 * 监听连接Active事件,事件中发送一些测试信息。
 * 
 * <br>
 * <br>时间:2019年9月11日 下午10:58:38,作者:wallimn
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String shortMsg = "hello netty, from wallimn.";
        for(int i=0; i<50;i++){
            ctx.writeAndFlush(shortMsg + " No."+i);
        }
        String longMsg = "仅仅问候一下,最近挺好的吧,工作忙吗? from wallimn.";
        for(int i=0; i<50;i++){
            ctx.writeAndFlush(longMsg + " No."+i);
        }
        System.out.println("消息发送完毕!");
        ctx.close();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

(三)客户端Client

package com.wallimn.iteye.netty.lenstr;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


/**
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.lenstr.Client
 * 客户端,仅启动ClientHandler发送些信息,然后退出。
 * <br>
 * <br>时间:2019年9月11日 下午11:40:51,作者:wallimn
 */
public class Client {

    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        //未使用函数链式操作,看起来容易懂一点儿。
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_SNDBUF, 10);//发送缓冲区长度
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        //bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        //bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        bootstrap.handler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LenStringEncoder());
                ch.pipeline().addLast(new ClientHandler());
            }
        });
        
        try {
            ChannelFuture future = bootstrap.connect("localhost",8585).sync();//连接服务器
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally{
            group.shutdownGracefully();
        }
    }

}

二、服务器端

(一)解码器LenStringDecoder

package com.wallimn.iteye.netty.lenstr;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * 解码器,用于将收到的长度+字节数组的消息解决成字符器。
 * 使用长度标识信息,可使信息免受粘包、拆包影响
 * 
 * <br>
 * <br>时间:2019年9月12日 下午1:33:37,作者:wallimn
 */
public class LenStringDecoder extends ByteToMessageDecoder {
    private final static int HEAD_LENGTH=4;
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes()<HEAD_LENGTH){
            return;
        }
        
        in.markReaderIndex();//标记一下读索引位置,信息不完整时恢复。
        int msglen = in.readInt();
        if(msglen==0){
            return;
        }
        else if(in.readableBytes()<msglen){
            in.resetReaderIndex();
            return;
        }
        
        byte[] msg = new byte[msglen];
        in.readBytes(msg);
        out.add(new String(msg,0,msg.length));
    }

}

(二)处理器ServerHandler

package com.wallimn.iteye.netty.lenstr;

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;

/**
 * 服务器消息处理器
 * 显示解码器解完的消息。
 * 
 * <br>
 * <br>时间:2019年9月12日 下午1:44:54,作者:wallimn
 */
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String _msg = (String)msg;
        System.out.println("收到消息:"+_msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

(三)服务器Server

package com.wallimn.iteye.netty.lenstr;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


/**
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.lenstr.Server
 * 通信服务器的程序
 * <br>
 * <br>时间:2019年9月11日 下午11:40:18,作者:wallimn
 */
public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        
        ServerBootstrap bootstrap = new ServerBootstrap();//综合管理相关的组件、操纵组件
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LenStringDecoder());
                ch.pipeline().addLast(new ServerHandler());
            }
        });
        
        try {
            ChannelFuture future = bootstrap.bind(8585).sync();//绑定监听端口,成功后就可以响应客户端请求
            System.out.println("服务启动成功!");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally{
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读