使用两个线程完成基于Java NIO的服务器案例

2021-06-19  本文已影响0人  JohnYuCN
package johnyu.cn;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;

/**
 * 编程思路:
 * 程序共使用了两个线程:
 * 1. 以非阻塞式,使用主线程,轮循ServerSocketChannel的accept
 * 2. 如有客户端的连接,则将SocketChannel注册给Selector,并向客户端打招呼
 * 3. 打开一个子线程,无限循环监听selector.select(),代码在addConsumer方法中
 * 4. select一旦返回,表明至少有一个客户端发言,则遍历selector.selectedKeys
 * 5. 将key交给Consumer的accept方法
 *
 */
public class ServerSocketChannelTest {

    public static void main(String[] args) throws IOException {
        final Selector selector = Selector.open();
        //启动新线程,监听selector,并使用consumer进行处理
        addConsumer(selector, createConsumer());

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(50000));  //绑定端口
        serverSocketChannel.configureBlocking(false);  //设置成非阻塞模式 即不会再accept因为等待连接而阻塞  如果没有连接则立即返回null

        while (true) {
            SocketChannel socket = serverSocketChannel.accept();
            if (socket != null) {
                System.out.println("accept connect from " + socket.getLocalAddress());
                socket.configureBlocking(false);
                socket.register(selector, SelectionKey.OP_READ);
                selector.wakeup();//唤醒处于阻塞状态的子线程
                //向刚刚连接上的客户端,打招呼
                socket.write(ByteBuffer.wrap("我是服务器,您的名字是:\n".getBytes("UTF-8")));

            }
        }
    }
    //打开子程,监听selector,并使用Consumer进行处理
    public static void addConsumer(Selector selector, Consumer<SelectionKey> consumer){
        new Thread(){
            @Override
            public void run() {
                while (true) {
                    try {
                        selector.select();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isReadable()) {
                            consumer.accept(key);
                        }
                        iterator.remove();
                    }
                }

            }
        }.start();
    }
    //Consuemr处理:接收到客户端的信息,加上前缀hello:,再将信息返回给客户端
    public static Consumer<SelectionKey> createConsumer(){
        return new Consumer<SelectionKey>() {
            @Override
            public void accept(SelectionKey key) {
                try {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    StringBuilder sb = new StringBuilder();
                    int len = socketChannel.read(buffer);
                    if (len == -1) {
                        socketChannel.close();
                        return;
                    }
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        sb.append((char) buffer.get());
                    }

                    socketChannel.write(ByteBuffer.wrap(("您好:"+sb.toString().trim()+"\n").getBytes("UTF-8")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
    }

}
上一篇下一篇

猜你喜欢

热点阅读