Spring之路spring

Spring和Netty整合详解

2019-05-28  本文已影响2人  逍遥天扬

Spring和Netty整合详解

本篇主要介绍netty如何跟Spring配合,其实真的很没必要将netty和Spring牵扯在一起,我们完全可以用netty做出一个spring的;然而在《Spring环境下使用Netty写Socket和Http详解》一篇中,因为没怎么用到Spring,遭到部分网友质疑,因此这一篇着重介绍如何跟Spring做配合。

官方主页

Spring

Netty

一、概述

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。

Git地址:
Gitee

首发地址:
品茗IT-首发

品茗IT:提供在线快速构建Spring项目工具一站式Springboot项目生成

二、依赖Jar包

我们假定你已经建好了Spring环境。这里只引入Netty的jar包。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.17.Final</version>
</dependency>

完整依赖:

<?xml version="1.0"?>
<project
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>cn.pomit</groupId>
        <artifactId>SpringWork</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>Netty</artifactId>
    <packaging>jar</packaging>
    <name>Netty</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
    </dependencies>
    <build>
        <finalName>Netty</finalName>
    </build>
</project>

父pom管理了所有依赖jar包的版本,地址:
https://www.pomit.cn/spring/SpringWork/pom.xml

三、Netty服务器配置

我们写一个tcp的server,作为线程运行。

NettyServer :

package cn.pomit.springwork.netty.server;


import org.springframework.beans.factory.annotation.Autowired;

import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer implements Runnable {
    private int port;
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Thread nserver;
    @Autowired
    NettyServerInitializer nettyServerInitializer;

    public void init() {
        nserver = new Thread(this);
        nserver.start();
    }

    public void destory() {
        System.out.println("destroy server resources");
        if (null == channel) {
            System.out.println("server channel is null");
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        bossGroup = null;
        workerGroup = null;
        channel = null;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        System.out.println(Thread.currentThread().getName() + "----位置4");
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerInitializer);

            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
            channel = f.channel();
            // 可以简写为
            /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

代码贴出来之后,我们还是要讲下里面的一些重点:

bossGroup和workerGroup就是为了建立线程池,这个自行百度。

以上使用了

f.channel().closeFuture().sync();

这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。

两次sync的不同:两次sync虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,
原理就不细说了,我也不知道,毕竟不是专精这方面的。sync换成await是一样的。

这里面NettyServerInitializer使用Spring的bean注入功能。

四、启动Server

启动NettyServer。

4.1 Spring的xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="
                    http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                    http://www.springframework.org/schema/tx 
                    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                    http://www.springframework.org/schema/aop 
                    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                    http://www.springframework.org/schema/context      
                    http://www.springframework.org/schema/context/spring-context-4.0.xsd
                    http://www.springframework.org/schema/cache 
                    http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                    http://www.springframework.org/schema/jms 
                    http://www.springframework.org/schema/jms/spring-jms-4.0.xsd   
                    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">

    <context:annotation-config />
    <context:component-scan base-package="cn.pomit.springwork">
    </context:component-scan>
    
    <bean id="annotationPropertyConfigurerNetty"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="order" value="1" />
        <property name="ignoreUnresolvablePlaceholders" value="true" />
        <property name="locations">
            <list>
                <value>classpath:netty.properties</value>
            </list>
        </property>
    </bean>
    
    <bean id="nettyDemoServer" class="cn.pomit.springwork.netty.server.NettyServer" init-method="init" destroy-method="destory">
        <property name="port" value="${netty_port}" />
    </bean>
    
    <bean id="closeFutureHandler" class="cn.pomit.springwork.netty.handler.CloseFutureHandler">
        <property name="nextHandler"><null/></property>
    </bean>
    
    <bean id="exceptionFutureHandler" class="cn.pomit.springwork.netty.handler.ExceptionFutureHandler">
        <property name="nextHandler"><null/></property>
    </bean>
    
    <bean id="bussinessFutureHandler" class="cn.pomit.springwork.netty.handler.BussinessFutureHandler">
        <property name="nextHandler"><null/></property>
    </bean>

</beans>

配置文件netty.properties:

netty_port=4444

4.2 注解启动

只是@Service注解和@PostConstruct注解的运用而已。

package cn.pomit.springwork.netty.server;


import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Service
public class NettyServer implements Runnable {
    private int port;
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Thread nserver;
    @Autowired
    NettyServerInitializer nettyServerInitializer;

    @PostConstruct
    public void init() {
        nserver = new Thread(this);
        nserver.start();
    }

    @PreDestroy
    public void destory() {
        System.out.println("destroy server resources");
        if (null == channel) {
            System.out.println("server channel is null");
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        bossGroup = null;
        workerGroup = null;
        channel = null;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        System.out.println(Thread.currentThread().getName() + "----位置4");
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerInitializer);

            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
            channel = f.channel();
            // 可以简写为
            /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

五、NettyServerInitializer

NettyServerInitializer是对NettyServer的handler初始化配置。可以作为Spring的bean处理,所以这里用了@Component将NettyServerInitializer声明为Spring的bean.

NettyServerInitializer:

package cn.pomit.springwork.netty.config;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

@Component
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Autowired
    NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 以("\n")为结尾分割的 解码器
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        System.out.println(Thread.currentThread().getName() + "----位置5");
        // 自己的逻辑Handler
        pipeline.addLast("handler", nettyServerHandler);
    }
}


六、Netty的共享ChannelHandler

netty的ChannelHandler是不能共享的。是一个线程一个的,如果之间将ChannelHandler作为Spring的bean管理,是会报错的。

因此,需要用@Sharable注解对ChannelHandler做声明,然后再由Spring进行管理。

NettyServerHandler:

package cn.pomit.springwork.netty.config;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import cn.pomit.springwork.netty.handler.Handler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

@Component
@Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<String>{
    @Autowired(required = false)
    @Qualifier("closeFutureHandler")
    public Handler closeFutureHandler;
    @Autowired(required = false)
    @Qualifier("exceptionFutureHandler")
    public Handler exceptionFutureHandler;
    @Autowired
    @Qualifier("bussinessFutureHandler")
    public Handler bussinessFutureHandler;
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {       
//      System.out.println(((HandlerServiceImp) exportServiceMap.get("helloWorldService")).test());
//      // 返回客户端消息 - 我已经接收到了你的消息
        System.out.println(Thread.currentThread().getName()+"----位置6");
//      handlerService.handle(msg);
        String retMsg = bussinessFutureHandler.hander(msg);
        ctx.writeAndFlush(retMsg);

    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " channelRegistered " );
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " channelUnregistered " );
        super.channelUnregistered(ctx);
        if(closeFutureHandler !=null){
            closeFutureHandler.hander(ctx.name());
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " channelActive " );
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " channelInactive " );
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " exceptionCaught :" + cause.getMessage() );
        super.exceptionCaught(ctx, cause);
        if(exceptionFutureHandler !=null){
            exceptionFutureHandler.hander(cause.getMessage());
        }
    }

}

这里面使用了上面xml中定义的三个业务处理Handler。

七、业务Handler

7.1 Handler接口

Handler :

package cn.pomit.springwork.netty.handler;


public interface Handler {
    public static String COMMONRET="200";
    public String hander(String msg);
}

7.2 BussinessFutureHandler

BussinessFutureHandler:

package cn.pomit.springwork.netty.handler;

public class BussinessFutureHandler implements Handler {
    private Handler nextHandler;

    public Handler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
    }

    @Override
    public String hander(String msg) {
        System.out.println("接收到信息:" + msg);
        if (nextHandler != null) {
            nextHandler.hander(msg);
        }
        return msg;
    }

}

7.3 CloseFutureHandler

CloseFutureHandler:

package cn.pomit.springwork.netty.handler;

public class CloseFutureHandler implements Handler {
    private Handler nextHandler;

    public Handler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
    }

    @Override
    public String hander(String msg) {
        System.out.println(msg + "正在关闭。");
        if (nextHandler != null) {
            nextHandler.hander(msg);
        }
        return COMMONRET;
    }

}


7.4 ExceptionFutureHandler

ExceptionFutureHandler:

package cn.pomit.springwork.netty.handler;

public class ExceptionFutureHandler implements Handler {
    private Handler nextHandler;

    public Handler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
    }

    @Override
    public String hander(String msg) {
        System.out.println("出现异常,异常信息:" + msg);
        if (nextHandler != null) {
            nextHandler.hander(msg);
        }
        
        return COMMONRET;
    }

}

至此,Spring整合netty简单的处理方式就完成了。

快速构建项目

Spring组件化构建

喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!


品茗IT交流群
上一篇下一篇

猜你喜欢

热点阅读