Spring和Netty整合详解
Spring和Netty整合详解
本篇主要介绍netty如何跟Spring配合,其实真的很没必要将netty和Spring牵扯在一起,我们完全可以用netty做出一个spring的;然而在《Spring环境下使用Netty写Socket和Http详解》一篇中,因为没怎么用到Spring,遭到部分网友质疑,因此这一篇着重介绍如何跟Spring做配合。
官方主页
一、概述
Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。
Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。
使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。
Git地址:
Gitee
首发地址:
品茗IT-首发
品茗IT:提供在线快速构建Spring项目工具和一站式Springboot项目生成。
二、依赖Jar包
我们假定你已经建好了Spring环境。这里只引入Netty的jar包。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
完整依赖:
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.pomit</groupId>
<artifactId>SpringWork</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>Netty</artifactId>
<packaging>jar</packaging>
<name>Netty</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>
<build>
<finalName>Netty</finalName>
</build>
</project>
父pom管理了所有依赖jar包的版本,地址:
https://www.pomit.cn/spring/SpringWork/pom.xml
三、Netty服务器配置
我们写一个tcp的server,作为线程运行。
NettyServer :
package cn.pomit.springwork.netty.server;
import org.springframework.beans.factory.annotation.Autowired;
import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer implements Runnable {
private int port;
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Thread nserver;
@Autowired
NettyServerInitializer nettyServerInitializer;
public void init() {
nserver = new Thread(this);
nserver.start();
}
public void destory() {
System.out.println("destroy server resources");
if (null == channel) {
System.out.println("server channel is null");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
bossGroup = null;
workerGroup = null;
channel = null;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
System.out.println(Thread.currentThread().getName() + "----位置4");
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerInitializer);
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();
channel = f.channel();
// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
代码贴出来之后,我们还是要讲下里面的一些重点:
bossGroup和workerGroup就是为了建立线程池,这个自行百度。
以上使用了
f.channel().closeFuture().sync();
这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。
两次sync的不同:两次sync虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,
原理就不细说了,我也不知道,毕竟不是专精这方面的。sync换成await是一样的。
这里面NettyServerInitializer使用Spring的bean注入功能。
四、启动Server
启动NettyServer。
4.1 Spring的xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">
<context:annotation-config />
<context:component-scan base-package="cn.pomit.springwork">
</context:component-scan>
<bean id="annotationPropertyConfigurerNetty"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="order" value="1" />
<property name="ignoreUnresolvablePlaceholders" value="true" />
<property name="locations">
<list>
<value>classpath:netty.properties</value>
</list>
</property>
</bean>
<bean id="nettyDemoServer" class="cn.pomit.springwork.netty.server.NettyServer" init-method="init" destroy-method="destory">
<property name="port" value="${netty_port}" />
</bean>
<bean id="closeFutureHandler" class="cn.pomit.springwork.netty.handler.CloseFutureHandler">
<property name="nextHandler"><null/></property>
</bean>
<bean id="exceptionFutureHandler" class="cn.pomit.springwork.netty.handler.ExceptionFutureHandler">
<property name="nextHandler"><null/></property>
</bean>
<bean id="bussinessFutureHandler" class="cn.pomit.springwork.netty.handler.BussinessFutureHandler">
<property name="nextHandler"><null/></property>
</bean>
</beans>
配置文件netty.properties:
netty_port=4444
4.2 注解启动
只是@Service注解和@PostConstruct注解的运用而已。
package cn.pomit.springwork.netty.server;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import cn.pomit.springwork.netty.config.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer implements Runnable {
private int port;
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Thread nserver;
@Autowired
NettyServerInitializer nettyServerInitializer;
@PostConstruct
public void init() {
nserver = new Thread(this);
nserver.start();
}
@PreDestroy
public void destory() {
System.out.println("destroy server resources");
if (null == channel) {
System.out.println("server channel is null");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
bossGroup = null;
workerGroup = null;
channel = null;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
System.out.println(Thread.currentThread().getName() + "----位置4");
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerInitializer);
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
// 监听服务器关闭监听
f.channel().closeFuture().sync();
channel = f.channel();
// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
五、NettyServerInitializer
NettyServerInitializer是对NettyServer的handler初始化配置。可以作为Spring的bean处理,所以这里用了@Component将NettyServerInitializer声明为Spring的bean.
NettyServerInitializer:
package cn.pomit.springwork.netty.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
@Component
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 以("\n")为结尾分割的 解码器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// 字符串解码 和 编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
System.out.println(Thread.currentThread().getName() + "----位置5");
// 自己的逻辑Handler
pipeline.addLast("handler", nettyServerHandler);
}
}
六、Netty的共享ChannelHandler
netty的ChannelHandler是不能共享的。是一个线程一个的,如果之间将ChannelHandler作为Spring的bean管理,是会报错的。
因此,需要用@Sharable注解对ChannelHandler做声明,然后再由Spring进行管理。
NettyServerHandler:
package cn.pomit.springwork.netty.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import cn.pomit.springwork.netty.handler.Handler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@Component
@Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<String>{
@Autowired(required = false)
@Qualifier("closeFutureHandler")
public Handler closeFutureHandler;
@Autowired(required = false)
@Qualifier("exceptionFutureHandler")
public Handler exceptionFutureHandler;
@Autowired
@Qualifier("bussinessFutureHandler")
public Handler bussinessFutureHandler;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// System.out.println(((HandlerServiceImp) exportServiceMap.get("helloWorldService")).test());
// // 返回客户端消息 - 我已经接收到了你的消息
System.out.println(Thread.currentThread().getName()+"----位置6");
// handlerService.handle(msg);
String retMsg = bussinessFutureHandler.hander(msg);
ctx.writeAndFlush(retMsg);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelRegistered " );
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelUnregistered " );
super.channelUnregistered(ctx);
if(closeFutureHandler !=null){
closeFutureHandler.hander(ctx.name());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelActive " );
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " channelInactive " );
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " exceptionCaught :" + cause.getMessage() );
super.exceptionCaught(ctx, cause);
if(exceptionFutureHandler !=null){
exceptionFutureHandler.hander(cause.getMessage());
}
}
}
这里面使用了上面xml中定义的三个业务处理Handler。
七、业务Handler
7.1 Handler接口
Handler :
package cn.pomit.springwork.netty.handler;
public interface Handler {
public static String COMMONRET="200";
public String hander(String msg);
}
7.2 BussinessFutureHandler
BussinessFutureHandler:
package cn.pomit.springwork.netty.handler;
public class BussinessFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println("接收到信息:" + msg);
if (nextHandler != null) {
nextHandler.hander(msg);
}
return msg;
}
}
7.3 CloseFutureHandler
CloseFutureHandler:
package cn.pomit.springwork.netty.handler;
public class CloseFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println(msg + "正在关闭。");
if (nextHandler != null) {
nextHandler.hander(msg);
}
return COMMONRET;
}
}
7.4 ExceptionFutureHandler
ExceptionFutureHandler:
package cn.pomit.springwork.netty.handler;
public class ExceptionFutureHandler implements Handler {
private Handler nextHandler;
public Handler getNextHandler() {
return nextHandler;
}
public void setNextHandler(Handler nextHandler) {
this.nextHandler = nextHandler;
}
@Override
public String hander(String msg) {
System.out.println("出现异常,异常信息:" + msg);
if (nextHandler != null) {
nextHandler.hander(msg);
}
return COMMONRET;
}
}
至此,Spring整合netty简单的处理方式就完成了。
快速构建项目
喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!
品茗IT交流群