Netty学习

2021-01-10  本文已影响0人  kafeimao

参考视频 https://www.bilibili.com/video/BV1DJ411m7NR

目录

1、BIO实现tcp通讯
2、NIO实现tcp通讯
3、线程模型
4、Netty入门,实现tcp协议通讯
5、Netty核心组件
6、Netty实现群聊
7、Netty实现http服务器
8、Netty实现dubbo
9、Netty实现websocket长连接
10、Netty源码解析

1、BIO实现tcp通讯

服务端

public class BioServer {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务启动");
        while (true) {
            Socket accept = serverSocket.accept();
            System.out.println("连接到一个客户端");
            executorService.execute(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    handle(accept);
                }
            });
        }
    }
    private static void handle(Socket accept) throws IOException {
        InputStream inputStream = accept.getInputStream();
        byte[] bytes = new byte[1024];
        while (true) {
            int read = inputStream.read(bytes);
            if (read != -1) {
                System.out.println(new String(bytes, 0, read));
            } else {
                break;
            }
        }
        accept.close();
    }
}

2、NIO实现tcp通讯

服务端

public class NioServer {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        Selector selector = Selector.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(true){
            if(selector.select(1000)==0){
                System.out.println("等待了一秒,无连接");
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                if(key.isAcceptable()){
                    SocketChannel accept = serverSocketChannel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                }
                if(key.isReadable()){
                    SocketChannel channel = (SocketChannel)key.channel();
                    ByteBuffer attachment = (ByteBuffer)key.attachment();
                    channel.read(attachment);
                    System.out.println("from 客户端"+ new String(attachment.array()));
                }
                iterator.remove();
            }
        }
    }
}

客户端

public class NioClient {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        if(!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("因为链接需要时间,没有成功时,可以做别的事");
            }
        }
        String str = "hello,nio";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(byteBuffer);
        System.in.read();
    }
}

3、线程模型

注释:黄色表示对象,蓝色表示线程,白色表示方法
1.传统线程模型


image.png

每个客户端发出连接请求,都有一个对应的线程进行处理
Reactor线程模型:I/O多路复用结合线程池,就是reactor的基本思想
2.单reactor单线程

image.png

所有请求都给reactor,reactor根据不同请求进行处理,在同一个线程中
3.单reactor多线程


image.png

4.主从reactor


image.png

4、Netty入门实现tcp通讯

依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.20.Final</version>
        </dependency>

服务端

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
         //1、创建两个线程组,bossGroup和workGroup
         //2、boosGroup只处理连接请求,真正和客户端业务处理,会交给workGroup
         //3、两个都是无线循环
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            //创建服务器端的启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //配置参数
            serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)//服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });//给workGroup的对应的管道设置处理器
            System.out.println("服务器准备好了");
            //绑定端口,并且同步
            ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
            //监听关闭通道(这里涉及到netty的异步模型)
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服务端处理器

/**
 * 自定义的handler 需要继承netty规定的HandlerAdapter
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 读取客户端数据事件
     * @param ctx 上下文对象,含有管道,通道,地址
     * @param msg  客户端发送的数据
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx = "+ ctx);
        //将msg转换为ByteBuf
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 数据读取完毕
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //给客户端回复消息,发送消息到缓冲,并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
    }
    /**
     * 发生异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        NioEventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            //客户端启动对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientGroup)
                    .channel(NioSocketChannel.class)//客户端通道的实现类
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("客户端ok");
            //启动客户端连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
            //监听通道关闭
            channelFuture.channel().closeFuture().sync();
        }finally {
            clientGroup.shutdownGracefully();
        }
    }
}

客户端处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //通道就绪事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ctx = "+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", CharsetUtil.UTF_8));
    }
    //读取数据事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

5、Netty核心组件

ServerBootstrap:引导服务端启动的对象
Bootstrap:引导客户端启动的对象
Future:Netty中的所有操作都是异步的,但是可以注册一个监听,Future和ChannelFuture就是具体的实现
Channel:能够用于执行网络io,根据不同的协议,都有对应的channel
Selector:选择器,不断查询注册在其上面的channel是否有触发事件
ChannelHandler:


image.png

ChannelPipeline:包含了channelHandler的一个list


image.png

6、Netty实现群聊系统

服务端

public class GroupChatServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                            pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
                            pipeline.addLast("encoder",new StringEncoder());//加入编码器
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("服务器启动");
            ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服务端处理器

public class GroupChatServerHandler  extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 一旦建立连接,第一个执行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.add(channel);
        //将客户端加入聊天的信息,推送给其他在线的客户
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天室"+ simpleDateFormat.format(new Date()));
    }
    /**
     * 表示channel是活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "上线了");
    }
    /**
     * 表示channel不活跃了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+ "离线了");
    }
    /**
     * 表示断开连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"退出了群聊");
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel channel = channelHandlerContext.channel();
        channelGroup.forEach(ch -> {
            if(channel!=ch){
                ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息"+s+"\n");
            }else {
                ch.writeAndFlush("[自己]发送了消息"+s+"\n");
            }
        });
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

public class GroupChatClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
                            pipeline.addLast("encoder",new StringEncoder());//加入编码器
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            System.out.println("客户端启动");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNextLine()){
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg+"\n");
            }
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客户端处理器

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

Netty实现http服务器

服务端

public class TestServer {
    public static void main(String[] args) throws Exception{
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("myHttpServerCodec",new HttpServerCodec());
                            pipeline.addLast("myHandle",new TestHttpServerHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.bind(9998).sync();
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

处理器

public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        if(httpObject instanceof HttpRequest){
            System.out.println("msg 的类型"+httpObject.getClass().getName());
            System.out.println("客户端地址"+ channelHandlerContext.channel().remoteAddress());
            HttpRequest httpRequest = (HttpRequest)httpObject;
            URI uri = new URI(httpRequest.uri());
            if("/favicon.ico".equals(uri.getPath())){
                System.out.println("请求网站图标,不做响应");
                return;
            }
            ByteBuf byteContent = Unpooled.copiedBuffer("我是服务器", CharsetUtil.UTF_8);
            DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteContent);
            httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
            httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteContent.readableBytes());
            channelHandlerContext.writeAndFlush(httpResponse);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

8、Netty实现dubbo

待提供的服务

public interface HelloService {
   String hello(String msg);
}

服务实现

public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String msg) {
        System.out.println("收到消费者消息:"+msg);
        if(msg!=null){
            return "你好,我收到"+msg;
        }else {
            return "你好";
        }
    }
}

dubbo服务提供者

public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("服务端开始提供服务");
            ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服务端处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter  {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg="+msg);
        if(msg.toString().startsWith(ClientBootstrap.provideName)){
            String helloResult = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(helloResult);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

dubbo服务消费者

public class NettyClient {
   private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
   private static NettyClientHandler client;
   public Object getBean(final Class<?> serverClass,final String providerName){
       return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serverClass},(proxy,method,args)->{
          if(client==null){
              initClient();
          }
          client.setPara(providerName+args[0]);
          return executorService.submit(client).get();
       });
   }
   private static void  initClient(){
       client = new NettyClientHandler();

       NioEventLoopGroup clientGroup = new NioEventLoopGroup();
       Bootstrap bootstrap = new Bootstrap();
       bootstrap.group(clientGroup)
               .channel(NioSocketChannel.class)
               .option(ChannelOption.TCP_NODELAY,true)
               .handler(new ChannelInitializer<SocketChannel>() {
                   @Override
                   protected void initChannel(SocketChannel socketChannel) throws Exception {
                       ChannelPipeline pipeline = socketChannel.pipeline();
                       pipeline.addLast(new StringDecoder());
                       pipeline.addLast(new StringEncoder());
                       pipeline.addLast(client);
                   }
               });
       try {
           ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

客户端处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    private String result;
    private String para;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    @Override
    public synchronized Object call() throws Exception {
        context.writeAndFlush(para);
        wait();
        return result;
    }
    void setPara(String para){
        this.para = para;
    }
}

消费服务

public class ClientBootstrap {
    public static final String provideName = "helloService#hello#";

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        HelloService helloService = (HelloService)nettyClient.getBean(HelloService.class, provideName);
        String hello = helloService.hello("你好 dubbo");
        System.out.println("res:"+hello);
    }
}

9、Netty实现websocket长连接

服务端

public class WebSocketServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new ChunkedWriteHandler());
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服务端处理器

public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("服务器收到消息"+textWebSocketFrame.text());
        channelHandlerContext.writeAndFlush(new TextWebSocketFrame("hello 客户端"));
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded被调用"+ctx.channel().id().asLongText());
        System.out.println("handlerAdded被调用"+ctx.channel().id().asShortText());
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved被调用"+ctx.channel().id().asShortText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

10、Netty源码解析

上一篇下一篇

猜你喜欢

热点阅读