Netty学习
2021-01-10 本文已影响0人
kafeimao
参考视频 https://www.bilibili.com/video/BV1DJ411m7NR
目录
1、BIO实现tcp通讯
2、NIO实现tcp通讯
3、线程模型
4、Netty入门,实现tcp协议通讯
5、Netty核心组件
6、Netty实现群聊
7、Netty实现http服务器
8、Netty实现dubbo
9、Netty实现websocket长连接
10、Netty源码解析
1、BIO实现tcp通讯
服务端
public class BioServer {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务启动");
while (true) {
Socket accept = serverSocket.accept();
System.out.println("连接到一个客户端");
executorService.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
handle(accept);
}
});
}
}
private static void handle(Socket accept) throws IOException {
InputStream inputStream = accept.getInputStream();
byte[] bytes = new byte[1024];
while (true) {
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
accept.close();
}
}
2、NIO实现tcp通讯
服务端
public class NioServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
if(selector.select(1000)==0){
System.out.println("等待了一秒,无连接");
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
SocketChannel accept = serverSocketChannel.accept();
accept.configureBlocking(false);
accept.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
if(key.isReadable()){
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer attachment = (ByteBuffer)key.attachment();
channel.read(attachment);
System.out.println("from 客户端"+ new String(attachment.array()));
}
iterator.remove();
}
}
}
}
客户端
public class NioClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
if(!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为链接需要时间,没有成功时,可以做别的事");
}
}
String str = "hello,nio";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(byteBuffer);
System.in.read();
}
}
3、线程模型
注释:黄色表示对象,蓝色表示线程,白色表示方法
1.传统线程模型
image.png
每个客户端发出连接请求,都有一个对应的线程进行处理
Reactor线程模型:I/O多路复用结合线程池,就是reactor的基本思想
2.单reactor单线程
所有请求都给reactor,reactor根据不同请求进行处理,在同一个线程中
3.单reactor多线程
image.png
4.主从reactor
image.png
4、Netty入门实现tcp通讯
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
服务端
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1、创建两个线程组,bossGroup和workGroup
//2、boosGroup只处理连接请求,真正和客户端业务处理,会交给workGroup
//3、两个都是无线循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
//创建服务器端的启动对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置参数
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)//服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyServerHandler());
}
});//给workGroup的对应的管道设置处理器
System.out.println("服务器准备好了");
//绑定端口,并且同步
ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
//监听关闭通道(这里涉及到netty的异步模型)
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务端处理器
/**
* 自定义的handler 需要继承netty规定的HandlerAdapter
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端数据事件
* @param ctx 上下文对象,含有管道,通道,地址
* @param msg 客户端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = "+ ctx);
//将msg转换为ByteBuf
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//给客户端回复消息,发送消息到缓冲,并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
}
/**
* 发生异常事件
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup clientGroup = new NioEventLoopGroup();
try {
//客户端启动对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)//客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("客户端ok");
//启动客户端连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
//监听通道关闭
channelFuture.channel().closeFuture().sync();
}finally {
clientGroup.shutdownGracefully();
}
}
}
客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//通道就绪事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ctx = "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", CharsetUtil.UTF_8));
}
//读取数据事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
5、Netty核心组件
ServerBootstrap:引导服务端启动的对象
Bootstrap:引导客户端启动的对象
Future:Netty中的所有操作都是异步的,但是可以注册一个监听,Future和ChannelFuture就是具体的实现
Channel:能够用于执行网络io,根据不同的协议,都有对应的channel
Selector:选择器,不断查询注册在其上面的channel是否有触发事件
ChannelHandler:
image.png
ChannelPipeline:包含了channelHandler的一个list
image.png
6、Netty实现群聊系统
服务端
public class GroupChatServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
pipeline.addLast("encoder",new StringEncoder());//加入编码器
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("服务器启动");
ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务端处理器
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 一旦建立连接,第一个执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.add(channel);
//将客户端加入聊天的信息,推送给其他在线的客户
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天室"+ simpleDateFormat.format(new Date()));
}
/**
* 表示channel是活跃状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "上线了");
}
/**
* 表示channel不活跃了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+ "离线了");
}
/**
* 表示断开连接
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"退出了群聊");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel channel = channelHandlerContext.channel();
channelGroup.forEach(ch -> {
if(channel!=ch){
ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息"+s+"\n");
}else {
ch.writeAndFlush("[自己]发送了消息"+s+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class GroupChatClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
pipeline.addLast("encoder",new StringEncoder());//加入编码器
pipeline.addLast(new GroupChatClientHandler());
}
});
System.out.println("客户端启动");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg+"\n");
}
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客户端处理器
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}
Netty实现http服务器
服务端
public class TestServer {
public static void main(String[] args) throws Exception{
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("myHttpServerCodec",new HttpServerCodec());
pipeline.addLast("myHandle",new TestHttpServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(9998).sync();
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
处理器
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if(httpObject instanceof HttpRequest){
System.out.println("msg 的类型"+httpObject.getClass().getName());
System.out.println("客户端地址"+ channelHandlerContext.channel().remoteAddress());
HttpRequest httpRequest = (HttpRequest)httpObject;
URI uri = new URI(httpRequest.uri());
if("/favicon.ico".equals(uri.getPath())){
System.out.println("请求网站图标,不做响应");
return;
}
ByteBuf byteContent = Unpooled.copiedBuffer("我是服务器", CharsetUtil.UTF_8);
DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteContent);
httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteContent.readableBytes());
channelHandlerContext.writeAndFlush(httpResponse);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
8、Netty实现dubbo
待提供的服务
public interface HelloService {
String hello(String msg);
}
服务实现
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
System.out.println("收到消费者消息:"+msg);
if(msg!=null){
return "你好,我收到"+msg;
}else {
return "你好";
}
}
}
dubbo服务提供者
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("服务端开始提供服务");
ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务端处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg="+msg);
if(msg.toString().startsWith(ClientBootstrap.provideName)){
String helloResult = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(helloResult);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
dubbo服务消费者
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
public Object getBean(final Class<?> serverClass,final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serverClass},(proxy,method,args)->{
if(client==null){
initClient();
}
client.setPara(providerName+args[0]);
return executorService.submit(client).get();
});
}
private static void initClient(){
client = new NettyClientHandler();
NioEventLoopGroup clientGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String para;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(para);
wait();
return result;
}
void setPara(String para){
this.para = para;
}
}
消费服务
public class ClientBootstrap {
public static final String provideName = "helloService#hello#";
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
HelloService helloService = (HelloService)nettyClient.getBean(HelloService.class, provideName);
String hello = helloService.hello("你好 dubbo");
System.out.println("res:"+hello);
}
}
9、Netty实现websocket长连接
服务端
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务端处理器
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println("服务器收到消息"+textWebSocketFrame.text());
channelHandlerContext.writeAndFlush(new TextWebSocketFrame("hello 客户端"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded被调用"+ctx.channel().id().asLongText());
System.out.println("handlerAdded被调用"+ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用"+ctx.channel().id().asShortText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}