IO模型代码实例
2020-06-08 本文已影响0人
南园故剑00
package com.gupao.edu.vip.bio;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @description: BIO 服务端源码
*
* nc 实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口
* nc 192.168.74.1 7777 通过nc去访问192.168.74.1主机的7777端口
*
* @date : 2020/1/3 10:57
* @author: zwz
*/
@Slf4j
public class ServerDemo {
//默认的端口号
private static final int DEFAULT_PORT = 7777;
//单例的serverSocket
private static ServerSocket serverSocket;
public static void main(String[] args) throws IOException {
start();
}
public static void start() throws IOException {
start(DEFAULT_PORT);
}
public synchronized static void start(int port) throws IOException {
if (serverSocket != null) {
return;
}
try {
serverSocket = new ServerSocket(port);
System.out.println("step1: new ServerSocket(port)");
log.info("服务端已启动,端口号:" + port);
System.out.println(("服务端已启动,端口号:" + port));
//自旋
while (true) {
//只能接受一次,while true也没卵用
Socket socket = serverSocket.accept();
//这里阻塞
System.out.println("step2: socket " + socket.getPort());
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
System.out.println("step3 " + reader.readLine());
}
} finally {
if (serverSocket != null) {
log.info("服务端已关闭");
System.out.println(("服务端已关闭"));
serverSocket.close();
serverSocket = null;
}
}
}
}
package com.gupao.edu.vip.nio.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Random;
/**
* @description: 通过JDK实现的NIO
* @date : 2020/1/3 23:30
* @author: zwz
*/
public class ServiceSocketChannelNIODemo {
public static void main(String[] args) {
InetSocketAddress localAddress = new InetSocketAddress(8571);
LinkedList<SocketChannel> clients = new LinkedList<>();
Charset utf8 = StandardCharsets.UTF_8;
ServerSocketChannel ssc = null;
Random random = new Random();
try {
//创建服务器通道
ssc = ServerSocketChannel.open();
//配置通道为非阻塞
ssc.configureBlocking(false);
//设置监听服务器的端口,设置最大连接缓冲数为100
ssc.bind(localAddress, 100);
while (true) {
Thread.sleep(1000);
//不会阻塞
SocketChannel client = ssc.accept();
if (client == null) {
System.out.println("null ...");
} else {
client.configureBlocking(false);
int port = client.socket().getPort();
System.out.println("clinet port: " + port);
clients.add(client);
}
System.out.println("没有阻塞");
//缓冲区可以在堆里,也可以在堆外
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
//串行化
//资源浪费
//每个连接都是一条路,每条路都要看一眼
for (SocketChannel c : clients) {
//不会阻塞
int num = c.read(byteBuffer);
if (num > 0) {
/*
* 在写模式下调用flip() 之后,buffer从写模式变为读模式
* 在调用flip()之后,读/写指针指到缓冲区头部,并设置了最多只能读出之前写入的数据长度(而不是整个缓存的容量大小)
*
* public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
*/
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
String b = new String(bytes);
// CharBuffer cb = utf8.decode(byteBuffer);
// String b = new String(cb.array());
System.out.println(c.socket().getPort() + " : " + b);
byteBuffer.clear();
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
package com.gupao.edu.vip.nio.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
/**
* @description: JDK 多路复用单线程
* ServerSocketChannel和SocketChannel 注册到同一个selector(一个线程对应一个selector)上
* 多路就是多个 channel
* @date : 2020/1/3 23:30
* @author: zwz
*/
public class ServiceSocketMuliSingleThreaDemo {
private static InetSocketAddress localAddress;
public static void main(String[] args) {
localAddress = new InetSocketAddress(8087);
Charset utf8 = StandardCharsets.UTF_8;
ServerSocketChannel ssc = null;
Selector selector = null;
Random random = new Random();
try {
//创建选择器
selector = Selector.open();
//创建服务器通道
ssc = ServerSocketChannel.open();
//配置通道为非阻塞
ssc.configureBlocking(false);
//设置监听服务器的端口,设置最大连接缓冲数为100
ssc.bind(localAddress, 100);
//服务器通道只能对tcp链接事件感兴趣.ssc注册到selector上
SelectionKey register = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注册后selector.keys的数量:" + selector.keys().size()); // 1
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("server start with address" + localAddress);
//服务器线程被中断后退出
try {
while (!Thread.currentThread().isInterrupted()) {
//询问内核有没有事件
int n = selector != null ? selector.select(0) : 0;
if (n == 0) {
continue;
}
//从多路复用器取出有效的key
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = keySet.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
//防止下次select方法返回已处理过的通道
iterator.remove();
//若发现异常,说明客户端连接出现问题,但服务器要保证正常
try {
//ssc通道只能对链接事件感兴趣
if (key.isAcceptable()) { //是否可以连接。有新的客户端连接
ServerSocketChannel ssc1 = (ServerSocketChannel) key.channel();
//accept方法会返回一个普通通道,每个通道在内核中都对应一个socket缓冲区
SocketChannel sc = ssc1.accept();
sc.configureBlocking(false);
//向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区
int interestSet = SelectionKey.OP_READ;
ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
//将channel和buffer一对一绑定
sc.register(selector, interestSet, byteBuffer);
System.out.println("客户端连接后 注册后selector.keys的数量:" + selector.keys().size()); // 2,3,4
System.out.println("---------------------");
System.out.println("accept from " + sc.getRemoteAddress());
System.out.println("---------------------");
}
//普通通道感兴趣读事件且有数据可读
if (key.isReadable()) {
System.out.println("一般数据到达-------------");
//通过selectionKey获取对应的通道
SocketChannel sc = (SocketChannel) key.channel();
//通过selectionKey获取通道对应的缓冲区
ByteBuffer buffers = (ByteBuffer) key.attachment();
buffers.clear();
int read = 0;
try {
while (true) {
read = sc.read(buffers);
if (read > 0) {
buffers.flip();
byte[] bytes = new byte[buffers.limit()];
buffers.get(bytes);
String b = new String(bytes);
// CharBuffer cb = utf8.decode(buffers);
System.out.println("读取的数据是 " + b);
while (buffers.hasRemaining()) {
sc.write(buffers);
}
buffers.clear();
} else if (read == 0) {
break;
}
// System.out.println("不加 -1 ,疯涨输出----------");
else { // -1 客户端close wait 死循环CPU 100%
sc.close();
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (IOException e) {
System.out.println("server encounter client error");
//若客户端连接出现异常,从selector中移除这个key
key.cancel();
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (selector != null) {
selector.close();
}
} catch (IOException e) {
System.out.println("selector close failed");
}
}
}
}
package com.gupao.edu.vip.nio.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description: JDK 多路复用多线程
* @date : 2020/1/3 23:30
* @author: zwz
*/
public class ServiceSocketMuliThreadsDemo {
public static void main(String[] args) {
ServiceSocketMuliThreadsDemo service = new ServiceSocketMuliThreadsDemo();
service.initServer();
NioThread t1 = new NioThread(service.selector1, 2);
NioThread t2 = new NioThread(service.selector2);
NioThread t3 = new NioThread(service.selector3);
t1.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
t3.start();
System.out.println("服务器启动了--------------");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private ServerSocketChannel server = null;
private Selector selector1 = null;
private Selector selector2 = null;
private Selector selector3 = null;
private InetSocketAddress localAddress = new InetSocketAddress(9999);
public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(localAddress);
selector1 = Selector.open();
selector2 = Selector.open();
selector3 = Selector.open();
//这里将ServerSocketChannel 注册到 selector1 上
server.register(selector1, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
static class NioThread extends Thread {
Selector selector = null;
static int selectors = 0;
int id = 0;
boolean isBoss = false;
//静态变量:类变量。不论创建多少对象只初始化一次。
static BlockingQueue<SocketChannel>[] queue;
static AtomicInteger idx = new AtomicInteger();
NioThread(Selector sel, int n) {
this.isBoss = true;
this.selector = sel;
selectors = n;
int id = 0;
queue = new LinkedBlockingQueue[selectors];
for (int i = 0; i < n; i++) {
queue[i] = new LinkedBlockingQueue<>();
}
System.out.println("BOSS 启动");
}
NioThread(Selector sel) {
this.selector = sel;
id = idx.getAndIncrement() % selectors;
System.out.println("WORKER:" + id + " 启动");
}
@Override
public void run() {
try {
while (true) {
while (selector.select(10) > 0) { //阻塞10ms
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
}
}
}
//boss不参与。只有worker根据id分配
if (!isBoss && !queue[id].isEmpty()) {
ByteBuffer buffer = ByteBuffer.allocate(8192);
SocketChannel client = queue[id].take();
//注册在自己的 selector 上
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("----------------------------");
System.out.println("新客户端:" + client.socket().getPort() + " 分配到worker:" + id);
System.out.println("----------------------------");
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private void readHandler(SelectionKey key) throws IOException {
System.out.println("一般数据到达-------------");
//通过selectionKey获取对应的通道
SocketChannel sc = (SocketChannel) key.channel();
//通过selectionKey获取通道对应的缓冲区
ByteBuffer buffers = (ByteBuffer) key.attachment();
buffers.clear();
int read = 0;
try {
while (true) {
read = sc.read(buffers);
if (read > 0) {
buffers.flip();
byte[] bytes = new byte[buffers.limit()];
buffers.get(bytes);
String b = new String(bytes);
// CharBuffer cb = utf8.decode(buffers);
System.out.println("读取的数据是 " + b);
while (buffers.hasRemaining()) {
sc.write(buffers);
}
buffers.clear();
} else if (read == 0) {
break;
}
// System.out.println("不加 -1 ,疯涨输出----------");
else { // -1 客户端close wait 死循环CPU 100%
sc.close();
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void acceptHandler(SelectionKey key) throws IOException {
System.out.println("可读-------------");
//通过selectionKey获取对应的通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
int num = idx.getAndIncrement() % selectors; //0,1
queue[num].add(client);
}
}
}