IO模型代码实例

2020-06-08  本文已影响0人  南园故剑00
package com.gupao.edu.vip.bio;

import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @description: BIO 服务端源码
 *
 * nc 实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口
 *  nc 192.168.74.1 7777  通过nc去访问192.168.74.1主机的7777端口
 *
 * @date : 2020/1/3 10:57
 * @author: zwz
 */
@Slf4j
public class ServerDemo {

    //默认的端口号
    private static final int DEFAULT_PORT = 7777;

    //单例的serverSocket
    private static ServerSocket serverSocket;

    public static void main(String[] args) throws IOException {
        start();
    }

    public static void start() throws IOException {
        start(DEFAULT_PORT);
    }

    public synchronized static void start(int port) throws IOException {
        if (serverSocket != null) {
            return;
        }

        try {
            serverSocket = new ServerSocket(port);

            System.out.println("step1: new ServerSocket(port)");
            log.info("服务端已启动,端口号:" + port);
            System.out.println(("服务端已启动,端口号:" + port));
            //自旋
            while (true) {

                //只能接受一次,while true也没卵用
                Socket socket = serverSocket.accept();

                //这里阻塞
                System.out.println("step2: socket " + socket.getPort());

                InputStream inputStream = socket.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                System.out.println("step3 " + reader.readLine());
            }
        } finally {
            if (serverSocket != null) {
                log.info("服务端已关闭");
                System.out.println(("服务端已关闭"));
                serverSocket.close();
                serverSocket = null;
            }
        }
    }
}
package com.gupao.edu.vip.nio.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Random;

/**
 * @description: 通过JDK实现的NIO
 * @date : 2020/1/3 23:30
 * @author: zwz
 */
public class ServiceSocketChannelNIODemo {

    public static void main(String[] args) {

        InetSocketAddress localAddress = new InetSocketAddress(8571);
        LinkedList<SocketChannel> clients = new LinkedList<>();

        Charset utf8 = StandardCharsets.UTF_8;
        ServerSocketChannel ssc = null;
        Random random = new Random();
        try {
            //创建服务器通道
            ssc = ServerSocketChannel.open();

            //配置通道为非阻塞
            ssc.configureBlocking(false);

            //设置监听服务器的端口,设置最大连接缓冲数为100
            ssc.bind(localAddress, 100);

            while (true) {
                Thread.sleep(1000);
                //不会阻塞
                SocketChannel client = ssc.accept();
                if (client == null) {
                    System.out.println("null ...");
                } else {
                    client.configureBlocking(false);
                    int port = client.socket().getPort();
                    System.out.println("clinet port: " + port);
                    clients.add(client);
                }
                System.out.println("没有阻塞");

                //缓冲区可以在堆里,也可以在堆外
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);

                //串行化

                //资源浪费
                //每个连接都是一条路,每条路都要看一眼
                for (SocketChannel c : clients) {
                    //不会阻塞
                    int num = c.read(byteBuffer);
                    if (num > 0) {

                        /*
                         * 在写模式下调用flip() 之后,buffer从写模式变为读模式
                         * 在调用flip()之后,读/写指针指到缓冲区头部,并设置了最多只能读出之前写入的数据长度(而不是整个缓存的容量大小)
                         *
                         *     public final Buffer flip() {
                                limit = position;
                                position = 0;
                                mark = -1;
                                return this;
                            }
                         */
                        byteBuffer.flip();

                        byte[] bytes = new byte[byteBuffer.limit()];
                        byteBuffer.get(bytes);
                        String b = new String(bytes);

//                        CharBuffer cb = utf8.decode(byteBuffer);
//                        String b = new String(cb.array());
                        System.out.println(c.socket().getPort() + " : " + b);
                        byteBuffer.clear();
                    }
                }
            }

        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.gupao.edu.vip.nio.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;

/**
 * @description: JDK 多路复用单线程
 * ServerSocketChannel和SocketChannel 注册到同一个selector(一个线程对应一个selector)上
 * 多路就是多个 channel
 * @date : 2020/1/3 23:30
 * @author: zwz
 */
public class ServiceSocketMuliSingleThreaDemo {

    private static InetSocketAddress localAddress;

    public static void main(String[] args) {

        localAddress = new InetSocketAddress(8087);
        Charset utf8 = StandardCharsets.UTF_8;
        ServerSocketChannel ssc = null;
        Selector selector = null;
        Random random = new Random();
        try {
            //创建选择器
            selector = Selector.open();

            //创建服务器通道
            ssc = ServerSocketChannel.open();

            //配置通道为非阻塞
            ssc.configureBlocking(false);

            //设置监听服务器的端口,设置最大连接缓冲数为100
            ssc.bind(localAddress, 100);

            //服务器通道只能对tcp链接事件感兴趣.ssc注册到selector上
            SelectionKey register = ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("注册后selector.keys的数量:" + selector.keys().size()); // 1

        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println("server start with address" + localAddress);

        //服务器线程被中断后退出
        try {
            while (!Thread.currentThread().isInterrupted()) {
                //询问内核有没有事件
                int n = selector != null ? selector.select(0) : 0;
                if (n == 0) {
                    continue;
                }

                //从多路复用器取出有效的key
                Set<SelectionKey> keySet = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keySet.iterator();
                SelectionKey key = null;

                while (iterator.hasNext()) {
                    key = iterator.next();
                    //防止下次select方法返回已处理过的通道
                    iterator.remove();

                    //若发现异常,说明客户端连接出现问题,但服务器要保证正常
                    try {
                        //ssc通道只能对链接事件感兴趣
                        if (key.isAcceptable()) {  //是否可以连接。有新的客户端连接
                            ServerSocketChannel ssc1 = (ServerSocketChannel) key.channel();
                            //accept方法会返回一个普通通道,每个通道在内核中都对应一个socket缓冲区
                            SocketChannel sc = ssc1.accept();
                            sc.configureBlocking(false);

                            //向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区
                            int interestSet = SelectionKey.OP_READ;
                            ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
                            //将channel和buffer一对一绑定
                            sc.register(selector, interestSet, byteBuffer);
                            System.out.println("客户端连接后 注册后selector.keys的数量:" + selector.keys().size());  // 2,3,4

                            System.out.println("---------------------");
                            System.out.println("accept from " + sc.getRemoteAddress());
                            System.out.println("---------------------");
                        }

                        //普通通道感兴趣读事件且有数据可读
                        if (key.isReadable()) {
                            System.out.println("一般数据到达-------------");
                            //通过selectionKey获取对应的通道
                            SocketChannel sc = (SocketChannel) key.channel();
                            //通过selectionKey获取通道对应的缓冲区
                            ByteBuffer buffers = (ByteBuffer) key.attachment();
                            buffers.clear();
                            int read = 0;
                            try {
                                while (true) {
                                    read = sc.read(buffers);
                                    if (read > 0) {
                                        buffers.flip();
                                        byte[] bytes = new byte[buffers.limit()];
                                        buffers.get(bytes);
                                        String b = new String(bytes);
//                                            CharBuffer cb = utf8.decode(buffers);
                                        System.out.println("读取的数据是 " + b);
                                        while (buffers.hasRemaining()) {
                                            sc.write(buffers);
                                        }

                                        buffers.clear();
                                    } else if (read == 0) {
                                        break;
                                    }
                                    //                 System.out.println("不加 -1 ,疯涨输出----------");
                                    else { // -1 客户端close wait 死循环CPU 100%
                                        sc.close();
                                        break;
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }

                    } catch (IOException e) {
                        System.out.println("server encounter client error");
                        //若客户端连接出现异常,从selector中移除这个key
                        key.cancel();
                        key.channel().close();
                    }
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (selector != null) {
                    selector.close();
                }
            } catch (IOException e) {
                System.out.println("selector close failed");
            }
        }
    }

}
package com.gupao.edu.vip.nio.channel;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: JDK 多路复用多线程
 * @date : 2020/1/3 23:30
 * @author: zwz
 */
public class ServiceSocketMuliThreadsDemo {


    public static void main(String[] args) {
        ServiceSocketMuliThreadsDemo service = new ServiceSocketMuliThreadsDemo();
        service.initServer();

        NioThread t1 = new NioThread(service.selector1, 2);
        NioThread t2 = new NioThread(service.selector2);
        NioThread t3 = new NioThread(service.selector3);

        t1.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();
        t3.start();

        System.out.println("服务器启动了--------------");

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private ServerSocketChannel server = null;
    private Selector selector1 = null;
    private Selector selector2 = null;
    private Selector selector3 = null;

    private InetSocketAddress localAddress = new InetSocketAddress(9999);

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(localAddress);

            selector1 = Selector.open();
            selector2 = Selector.open();
            selector3 = Selector.open();

            //这里将ServerSocketChannel 注册到 selector1 上
            server.register(selector1, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class NioThread extends Thread {
        Selector selector = null;
        static int selectors = 0;
        int id = 0;
        boolean isBoss = false;

        //静态变量:类变量。不论创建多少对象只初始化一次。
        static BlockingQueue<SocketChannel>[] queue;

        static AtomicInteger idx = new AtomicInteger();

        NioThread(Selector sel, int n) {
            this.isBoss = true;
            this.selector = sel;
            selectors = n;
            int id = 0;
            queue = new LinkedBlockingQueue[selectors];
            for (int i = 0; i < n; i++) {
                queue[i] = new LinkedBlockingQueue<>();
            }
            System.out.println("BOSS 启动");
        }

        NioThread(Selector sel) {
            this.selector = sel;
            id = idx.getAndIncrement() % selectors;
            System.out.println("WORKER:" + id + " 启动");
        }

        @Override
        public void run() {
            try {
                while (true) {
                    while (selector.select(10) > 0) {  //阻塞10ms
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            iterator.remove();
                            if (key.isAcceptable()) {
                                acceptHandler(key);
                            } else if (key.isReadable()) {
                                readHandler(key);
                            }
                        }
                    }
                    //boss不参与。只有worker根据id分配
                    if (!isBoss && !queue[id].isEmpty()) {
                        ByteBuffer buffer = ByteBuffer.allocate(8192);
                        SocketChannel client = queue[id].take();
                        //注册在自己的 selector 上
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println("----------------------------");
                        System.out.println("新客户端:" + client.socket().getPort() + " 分配到worker:" + id);
                        System.out.println("----------------------------");
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void readHandler(SelectionKey key) throws IOException {
            System.out.println("一般数据到达-------------");
            //通过selectionKey获取对应的通道
            SocketChannel sc = (SocketChannel) key.channel();
            //通过selectionKey获取通道对应的缓冲区
            ByteBuffer buffers = (ByteBuffer) key.attachment();
            buffers.clear();
            int read = 0;
            try {
                while (true) {
                    read = sc.read(buffers);
                    if (read > 0) {
                        buffers.flip();
                        byte[] bytes = new byte[buffers.limit()];
                        buffers.get(bytes);
                        String b = new String(bytes);
//                                            CharBuffer cb = utf8.decode(buffers);
                        System.out.println("读取的数据是 " + b);
                        while (buffers.hasRemaining()) {
                            sc.write(buffers);
                        }

                        buffers.clear();
                    } else if (read == 0) {
                        break;
                    }
                    //                 System.out.println("不加 -1 ,疯涨输出----------");
                    else { // -1 客户端close wait 死循环CPU 100%
                        sc.close();
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void acceptHandler(SelectionKey key) throws IOException {
            System.out.println("可读-------------");
            //通过selectionKey获取对应的通道
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
            int num = idx.getAndIncrement() % selectors;  //0,1
            queue[num].add(client);
        }
    }

}
上一篇下一篇

猜你喜欢

热点阅读