springbootSpring之路

Spring和Netty整合详解

2019-02-28  本文已影响11人  逍遥天扬

Spring和Netty整合详解

官方主页

Spring

Netty

概述

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。

Git地址:
Gitee

项目地址:
品茗IT-Spring之路专题

品茗IT:提供在线快速构建Spring项目工具一站式Springboot项目生成

依赖Jar包

我们假定你已经建好了Spring环境。这里只引入Netty的jar包。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.17.Final</version>
</dependency>

Socket服务器和Http服务器的使用时的些许差异

可以说,没有差异,只是ChannelHandler不同而已。

因此,这里先说明一些公共的使用,然后再针对socket和http做区分。

Netty服务器配置

我们可以编写一个公共的服务器配置模板NettyServiceTemplate。
NettyServiceTemplate:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public abstract class NettyServiceTemplate {
    static private EventLoopGroup bossGroup = new NioEventLoopGroup();
    static private EventLoopGroup workerGroup = new NioEventLoopGroup();

    abstract protected ChannelHandler[] createHandlers();

    abstract public int getPort();

    abstract public String getName();

    public void start() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelHandler[] handlers = createHandlers();
                        for (ChannelHandler handler : handlers) {
                            ch.pipeline().addLast(handler);
                        }
                    }
                }).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_REUSEADDR, true);

        ChannelFuture cf = b.bind(getPort()).await();
//      cf.channel().closeFuture().await();
        if (!cf.isSuccess()) {
            System.out.println("无法绑定端口:" + getPort());
            throw new Exception("无法绑定端口:" + getPort());
        }

        System.out.println("服务[{" + getName() + "}]启动完毕,监听端口[{" + getPort() + "}]");
    }

    public void stop() {
        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
        System.out.println("服务[{" + getName() + "}]关闭。");
    }
}

代码贴出来之后,我们还是要讲下里面的一些重点:

bossGroup和workerGroup就是为了建立线程池,这个自行百度。

ChannelHandler使用抽象方法由子类去实现,这样就可以根据不同的ChannelHandler实现不同的功能。

以上注释了

cf.channel().closeFuture().await();

这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。

两次await的不同:两次await虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,打断点看了下,分别是:

AbstractBootstrap$PendingRegistrationPromise@177655b7(success)
AbstractChannel$CloseFuture@3c4f9fbb(incomplete)

原理就不细说了,我也不知道,毕竟不是专精这方面的。

启动Server

因为后面要写不同的Server,所以这里先把启动Server写出来,免得眼晕。

我们可以直接启动,如果不放心也可以放到线程池中启动:

TestApp:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.netty.anno.PackageScan;
import com.netty.application.NettyApplication;
import com.netty.http.server.JsonHttpServer;
import com.netty.http.server.StringTcpServerTest;

@PackageScan("com.netty")
public class TestApp {
    
    public static void main(String args[]) throws ClassNotFoundException, InstantiationException, IllegalAccessException{
        NettyApplication.run(TestApp.class);
        
        StringTcpServerTest stringTcpServerTest = new StringTcpServerTest(8088);
        JsonHttpServer jsonHttpServer = new JsonHttpServer(8880);
        try {
            stringTcpServerTest.start();
            jsonHttpServer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
//      ExecutorService pool = Executors.newFixedThreadPool(4);
//      pool.submit(new Thread(){
//          @Override
//          public void run() {
//              StringTcpServerTest stringTcpServerTest = new StringTcpServerTest(8088);
//              try {
//                  stringTcpServerTest.start();
//              } catch (Exception e) {
//                  e.printStackTrace();
//              }
//          }
//          
//      });
//      
//      pool.submit(new Thread(){
//          @Override
//          public void run() {
//              JsonHttpServer jsonHttpServer = new JsonHttpServer(8880);
//              try {
//                  jsonHttpServer.start();
//              } catch (Exception e) {
//                  e.printStackTrace();
//              }
//          }
//      });
    }
}

Socket(Tcp)的ChannelHandler

如果我们单纯的使用tcp传递数据,比如String数据,我们可以这样定义一个Server:

StringTcpServerTest:

import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class StringTcpServerTest extends NettyServiceTemplate {
    int port = 8088;
    String name = "String Server";
    
    //SpringBeanService springBeanService;
    
    public StringTcpServerTest(int port) {
        this.port = port;
    }
    
    //可以启动server时将spring的bean传递进来。
    //public StringTcpServerTest(int port, SpringBeanService springBeanService) {
    //   this.port = port;
    //   this.springBeanService = springBeanService;
    //}

    @Override
    protected ChannelHandler[] createHandlers() {
        return new ChannelHandler[] { 
                new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()), 
                new StringDecoder(),
                new StringEncoder(), 
                
                //如果想使用spring的bean,可以将springBeanService传递给StringTcpServerHandler,如new StringTcpServerHandler(springBeanService)
                new StringTcpServerHandler()
                };
    }

    @Override
    public int getPort() {
        return port;
    }

    @Override
    public String getName() {
        return name;
    }

}

这里,DelimiterBasedFrameDecoder定义了 以("\n")为结尾分割的 解码器。

然后是String解码与编码器

然后是自定义的一个处理器Handler。

StringTcpServerHandler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class StringTcpServerHandler extends SimpleChannelInboundHandler<String> {
    String charset = "UTF-8";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("内容:" + msg);
        ctx.writeAndFlush("返回内容:" + msg);
    }
}

HTTP 的ChannelHandler

如果我们想使用http传输json数据,我们可以这样玩:

JsonHttpServer:

import com.netty.http.NettyServiceTemplate;
import com.netty.http.coder.json.HttpMsgRequestDecoder;
import com.netty.http.coder.json.HttpMsgResponseEncoder;
import com.netty.http.handler.JsonHttpServerHandler;

import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class JsonHttpServer extends NettyServiceTemplate {
    int port = 8888;
    String name = "Json Server";
    private String charset = "UTF-8";
    private int timeout = 60;
    
    public JsonHttpServer(int port) {
        this.port = port;
    }

    @Override
    protected ChannelHandler[] createHandlers() {
        return new ChannelHandler[] { 
                new HttpResponseEncoder(), 
                new HttpRequestDecoder(),
                new HttpObjectAggregator(1048576), 
                new HttpMsgResponseEncoder(charset, timeout),
                new HttpMsgRequestDecoder(charset), 
                new JsonHttpServerHandler() };
    }

    @Override
    public int getPort() {
        return port;
    }

    @Override
    public String getName() {
        return name;
    }

}

其中,HttpResponseEncoder和HttpRequestDecoder是编码解码器。
HttpObjectAggregator将请求合并,如果没有,同一个http请求我们可能需要处理两次。

HttpMsgResponseEncoder和HttpMsgRequestDecoder是自定义的通用http处理方式。JsonHttpServerHandler是自定义的json处理器。

我们可以自定义一个http实体HttpResponseMsg,方便处理http响应等:

HttpResponseMsg:

public class HttpResponseMsg {
    public enum ResType {  
        HTML("text/html"),
        JSON("application/json"),
        JS("application/javascript"),
        PNG("image/png"),
        JPG("image/jpg");
        String value = null;
        ResType(String value) {
            this.value = value;
        }
        public String getValue() {
            return value;
        }
    }  
    
    public enum ResCode {  
        NOT_FOUND(404),
        OK(200),
        INTERNAL_ERROR(500);
        int value = 200;
        ResCode(int value) {
            this.value = value;
        }
        public int getValue() {
            return value;
        }
    }  
    public int resCode;
    
    public String resType;
    
    public String message;

    public int getResCode() {
        return resCode;
    }

    public void setResCode(int resCode) {
        this.resCode = resCode;
    }

    public String getResType() {
        return resType;
    }

    public void setResType(String resType) {
        this.resType = resType;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
    
}

HttpMsgResponseEncoder:

import java.util.List;

import com.netty.http.HttpResponseMsg;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

public class HttpMsgResponseEncoder extends MessageToMessageEncoder<HttpResponseMsg> {
    private String charset;
    private int timeout;

    public HttpMsgResponseEncoder(String charset, int timeout) {
        super();
        this.charset = charset;
        this.timeout = timeout;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, HttpResponseMsg message, List<Object> out) {
        try {
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(message.getResCode()),
                    Unpooled.wrappedBuffer(message.getMessage().getBytes(charset)));
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, message.getResType()+";charset=" + charset);
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

            // 强制keep-alive
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            response.headers().set("Keep-Alive", "timeout=" + timeout);

            out.add(response);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

HttpMsgRequestDecoder:

import java.nio.charset.Charset;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;

public class HttpMsgRequestDecoder extends MessageToMessageDecoder<HttpObject>{
    private String charset;

    public HttpMsgRequestDecoder(String charset) {
        super();
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, HttpObject in,
            List<Object> out) throws Exception {
        FullHttpRequest request = (FullHttpRequest) in;

        ByteBuf buf = request.content();
        String jsonStr = buf.toString(Charset.forName(charset));
        out.add(jsonStr);
    }
}

JsonHttpServerHandler:

import com.netty.http.HttpResponseMsg;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class JsonHttpServerHandler extends SimpleChannelInboundHandler<String> {
    String charset = "UTF-8";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("post内容:" + msg);
        HttpResponseMsg hrm = new HttpResponseMsg();
        hrm.setResType(HttpResponseMsg.ResType.JSON.getValue());
        hrm.setResCode(HttpResponseMsg.ResCode.OK.getValue());
        hrm.setMessage(msg);
        ctx.writeAndFlush(hrm);
    }

}

至此,以上tcp和http处理方式已完成。

快速构建项目

Spring组件化构建

上一篇下一篇

猜你喜欢

热点阅读