netty(十九)Netty优化 - option中的参数优化
经过前面的学习,我们已经学会了Netty的使用。本章节开始我们要进行一些细节方面的学习,使其能更好的运用在我们以后的工作当中。
一、什么是option?
前面学习了Netty的服务端,和客户端,知道了创建服务要分别使用ServerBootStrap和BootStrap,不知道有没有关注到其中有一个方法叫做Option的?
我们分别看下ServerBootStrap和BootStrap的option:
1)ServerBootStrap
如上图所示,有两种option,一个是自己的option(ServerSocketChannel),一个childOption(ScoketChannel的option)。
2)BootStrap
只有一个option方法。
无论是上述哪两种,参数都是ChannelOption<T>,而这个ChannelOption Netty已经帮我们准备好了,可以直接使用。
下面我们会针对几种重要的配置讲解一下。
二、常用参数
2.1 CONNECT_TIMEOUT_MILLIS
ScoketChannel的参数。
用在客户端建立连接时,如果超过指定的时间仍未连接,则抛出timeout异常。
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,500);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
worker.shutdownGracefully();
}
}
只启动客户端,抛出如下异常:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.1:8080
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
若果该参数设置过长,且服务端确实没启动,则会抛出java层面的异常,拒绝连接:
Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2.2 SO_TIMEOUT
这个参数适用于阻塞IO,比如阻塞IO当中的read,accept等方法,修饰阻塞的,如果不想一直阻塞,可以通过改参数设置超时时间。
不要与CONNECT_TIMEOUT_MILLIS弄混了。
2.3 SO_BACKLOG
ServerSocketChannal 参数。
在了解这个参数之前,要先了解下TCP的三次握手,sync_queue(半连接队列)和accept_queue(全连接队列)。
其中半连接队列是在首次握手时,将请求放入半连接队列,当三次握手全部成功后,将请求从半连接队列放入全连接队列。
下图展示netty和三次握手的关系:
NETTY 连接与三次握手关系.png- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。
在上面的过程中,提到的sync_queue和accept_queue是我们本篇文章需要关注的重点。
在linux2.2之前,backlog包括了两个队列的大小。在之后的版本当中,由如下两个参数来控制:
-
sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
-
accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数比较,取二者的较小值。
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。
下面回归正题,在netty当中,通过ChannelOption.SO_BACKLOG设置大小,如下所示:
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//此处为了模拟,设置为2
serverBootstrap.option(ChannelOption.SO_BACKLOG,2);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
如上代码所示,设置了一个backlog为2的值。然后我们需要启动至少三个客户端看结果。
通过前面的三次握手的图,可以知道,只有当服务端处理不过来时,才会使用全连接队列,并将其占满,否则会直接走accept()方法,导致我们看不到测试结果。
所以我们这里不做测试了。
我们看下这个backlog的默认值在nio当中是多少:
在NIO当中backlog在ServerSocketChannel当中的bind方法被调用,所以我们从这里跟踪进去找到bind方法:
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}
查看bind被哪些地方调用,NioServerSocketChannel:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
跟踪config.getBacklog():
private final ServerSocketChannelConfig config;
这个config是接口,直接看它的实现DefaultServerSocketChannelConfig:
private volatile int backlog = NetUtil.SOMAXCONN;
找SOMAXCONN:
public static final int SOMAXCONN;
找到SOMAXCONN赋值的位置,默认是windows200,Linux或mac默认128,如果有前面我们提到的文件/proc/sys/net/core/somaxconn,则走此配置中的文件:
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
// Determine the default somaxconn (server socket backlog) value of the platform.
// The known defaults:
// - Windows NT Server 4.0+: 200
// - Linux and Mac OS X: 128
int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
File file = new File("/proc/sys/net/core/somaxconn");
BufferedReader in = null;
try {
// file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
// try / catch block.
// See https://github.com/netty/netty/issues/4936
if (file.exists()) {
in = new BufferedReader(new FileReader(file));
somaxconn = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, somaxconn);
}
} else {
// Try to get from sysctl
Integer tmp = null;
if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
tmp = sysctlGetInt("kern.ipc.somaxconn");
if (tmp == null) {
tmp = sysctlGetInt("kern.ipc.soacceptqueue");
if (tmp != null) {
somaxconn = tmp;
}
} else {
somaxconn = tmp;
}
}
if (tmp == null) {
logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file,
somaxconn);
}
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}",
file, somaxconn, e);
}
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
// Ignored.
}
}
}
return somaxconn;
}
});
2.4 ulimit
属于操作系统参数。
使用ulimit -n 可以查看当前的最大打开文件数。使用ulimit -a 可以查看当前系统的所有限制值。
linux默认1024,当服务器负载较大时,会发生too many open files的错误,所以我们为了提供并发量,需要手动将其调整。
使用如下命令可以将其调整,但是是临时性的,可以考虑将其放在启动脚本当中:
ulimit -n 4096
2.5 TCP_NODELAY
属于 SocketChannal 参数。
在前面的文章当中,我们提到过TCP的nagle算法,我们使用netty时,它的默认开始的。
nagle算法会使我们某些较小的数据包造成延迟,因为为了提升效率,nagle会等到收集到一定数据后进行发送,这样可能造成我们消息的延迟。
可以通过如下方式设置,开启无延迟的配置:
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
2.6 SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数
这两个参数不建议我们手动进行设置,因为操作系统会根据当前占用,进行自动的调整。
2.7 ALLOCATOR
属于 SocketChannal 参数。
ByteBuf的分配器。
serverBootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
这个参数只有一个DEFAULT可以使用。
这个参数与ch.alloc().buffer()命令有关,关系着我们分配的buf是池化还是非池化,是直接内存还是堆内存。
我们从上面的Default跟踪进去:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
继续跟踪DEFAULT_ALLOCATOR:
static final ByteBufAllocator DEFAULT_ALLOCATOR;
找到对其赋值的位置,发现了如下的静态代码块,此处就是设置buf是pooled还是unpooled,通过环境变量:"io.netty.allocator.type" 指定,我们可以在启动项目时指定-Dio.netty.allocator.type=unpooled设置成非池化。从源码可以看到,安卓是unpooled,其他事pooled。
static {
MAX_BYTES_PER_CHAR_UTF8 = (int)CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar();
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
Object alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = (ByteBufAllocator)alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16384);
logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
FIND_NON_ASCII = new ByteProcessor() {
public boolean process(byte value) {
return value >= 0;
}
};
}
如上代码中的UnpooledByteBufAllocator.DEFAULT和PooledByteBufAllocator.DEFAULT就指定了我们使用的直接内存还是堆内存,跟踪其中的UnpooledByteBufAllocator.DEFAULT看一下:
public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
跟踪PlatformDependent.directBufferPreferred():
private static final boolean DIRECT_BUFFER_PREFERRED;
找DIRECT_BUFFER_PREFERRED赋值的位置:
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
重点关注上述代码后半段!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);,我们可用通过-Dio.netty.noPreferDirect=true环境变量指定我们使用堆内存。
2.8 RCVBUF_ALLOCATOR
属于 SocketChannal 参数。
控制 netty 接收缓冲区大小。
这个RCVBUF_ALLOCATOR不要与前面的ALLOCATOR弄混。
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。
通俗的讲在handler内部分配的byteBuf可以是直接内存,也可以是堆内存,但是经过网络io的内存,netty会强制为直接内存。
我们启动一个客户端和服务端去验证一下上述的描述:
服务端:
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
客户端:
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
byteBuf.writeBytes("hello world!".getBytes());
System.out.println(byteBuf);
ctx.writeAndFlush(byteBuf);
super.channelActive(ctx);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
worker.shutdownGracefully();
}
}
我们重点关注服务端接收到的Object msg,它应该是一个Bytebuf对象,那么他是什么类型的?如何生成的?
我们通过打断点的方式,找到线程调用的堆栈,发现第一个channel的位置:
点击进去发现如下代码,并且就是我们得到的ByteBuf:
image.png重点关注这部分有注释代码:
ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();
// 上一小节提到的allocator,负责ByteBuf是池化还是非池化
ByteBufAllocator allocator = config.getAllocator();
//此处Handle是RecvByteBufAllocator内部类
Handle allocHandle = this.recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//allocate方法创建byteBuf
byteBuf = allocHandle.allocate(allocator);
下面重点关注这个allocate方法,这里会分配一个ioBuffer,即直接内存buffer:
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(this.guess());
}
如上的guess()会根据数据量大小,动态分配buffer的大小,范围如下,自适应的AdaptiveRecvByteBufAllocator,最小不会小于64,最大不会超过65536:
public AdaptiveRecvByteBufAllocator() {
this(64, 1024, 65536);
}
关于主要参数就介绍这么多,有用的话帮忙点个赞吧~~