I/O-手写简单netty
2020-11-30 本文已影响0人
麦大大吃不胖
by shihang.mai
1. 概述
netty原理
一共有3个类:
MainThread:这个类负责程序入口,不做任何的业务
SelectorThreadGroup:如上图,会有多个Selector,故用这个类管理
SelectorThread:核心类,用来做Selector
2. 混杂版
不区分boss,worker线程
MainThread
public class MainThread {
public static void main(String[] args) {
SelectorThreadGroup stg = new SelectorThreadGroup(3);
stg.bind(9999);
}
}
SelectorThreadGroup
public class SelectorThreadGroup {
SelectorThread[] sts;
ServerSocketChannel server;
//用来做SelectorThread轮询
AtomicInteger xid = new AtomicInteger(0);
public SelectorThreadGroup(int num) {
sts = new SelectorThread[num];
for (int i = 0; i < num; i++) {
//传入当前的这个SelectorThreadGroup
sts[i] = new SelectorThread(this);
//这里点用了线程开始这里,会在线程的int nums = selector.select();阻塞
new Thread(sts[i]).start();
}
}
public void bind(int port) {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
//选一个Selector
nextSelector(server);
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextSelector(Channel c){
SelectorThread st = next();
/*
*为什么想到用LinkedBlockingDeque<Channel>通讯呢?
*1.当st.selector.wakeup()在上时,线程执行int nums = selector.select();阻塞了,然后调用st.selector.wakeup(),线程被打断,然后s.register(st.selector, SelectionKey.OP_ACCEPT)还没来得及,线程又阻塞了,即还没来得及注册上
*2.当st.selector.wakeup()在下时,根本不会执行,st.selector已经阻塞了
*/
//将Channel放入SelectorThread的LinkedBlockingDeque<Channel>,将Channel交由线程自己去处理
st.lbq.add(c);
//唤醒selector,那么子线程就会继续走下去
st.selector.wakeup();
}
private SelectorThread next() {
int index = xid.incrementAndGet() % sts.length;
return sts[index];
}
}
SelectorThread
public class SelectorThread implements Runnable{
Selector selector = null;
LinkedBlockingDeque<Channel> lbq = new LinkedBlockingDeque<>();
//因为需要用到选择Selector,故引入SelectorThreadGroup,直接调用即可
SelectorThreadGroup stg;
SelectorThread(SelectorThreadGroup stg){
try {
this.stg = stg;
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//阻塞
int nums = selector.select();
if(nums>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
if(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
acceptHandler(key);
}else if(key.isReadable()){
readHandler(key);
}else if(key.isWritable()){
}
}
}
if(!lbq.isEmpty()){
Channel c = lbq.take();
if(c instanceof ServerSocketChannel){
ServerSocketChannel server = (ServerSocketChannel) c;
server.register(selector,SelectionKey.OP_ACCEPT);
System.out.println(Thread.currentThread().getName()+" register listen");
}else if(c instanceof SocketChannel){
SocketChannel client = (SocketChannel) c;
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
client.register(selector,SelectionKey.OP_READ,byteBuffer);
System.out.println(Thread.currentThread().getName()+" register client:"+client.getRemoteAddress());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void readHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" read......");
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel client=(SocketChannel)key.channel();
buffer.clear();
while(true){
try {
int num=client.read(buffer);
if(num>0){
buffer.flip();
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
}else if(num==0){
break;
}else if(num<0){
System.out.println("client:"+client.getRemoteAddress()+" is closed.....");
key.cancel();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void acceptHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" acceptHandler....");
ServerSocketChannel server=(ServerSocketChannel)key.channel();
try {
SocketChannel client = server.accept();
client.configureBlocking(false);
stg.nextSelector(client);
} catch (IOException e) {
e.printStackTrace();
}
}
}
3. 正常版
区分boss,worker线程
MainThread
public class MainThread {
public static void main(String[] args) {
//创建多个boss线程
SelectorThreadGroup boss = new SelectorThreadGroup(3);
//创建多个worker线程
SelectorThreadGroup worker = new SelectorThreadGroup(3);
//boss持有worker,因为当接收到客户端时,需要将客户端分配给worker线程
boss.setWorker(worker);
boss.bind(9999);
boss.bind(8888);
boss.bind(7777);
boss.bind(6666);
}
}
SelectorThreadGroup
public class SelectorThreadGroup {
SelectorThread[] sts;
ServerSocketChannel server;
AtomicInteger xid = new AtomicInteger(0);
//初始化的时候,指代自己
SelectorThreadGroup stg = this;
//在Main时,boss调用setWorker持有worker线程
public void setWorker(SelectorThreadGroup stg){
this.stg = stg;
}
public SelectorThreadGroup(int num) {
sts = new SelectorThread[num];
for (int i = 0; i < num; i++) {
sts[i] = new SelectorThread(this);
new Thread(sts[i]).start();
}
}
public void bind(int port) {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
nextSelector(server);
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextSelector(Channel c){
try {
if(c instanceof ServerSocketChannel){
//在自己的boss线程中选取
SelectorThread st = next();
st.lbq.put(c);
//当时ServerSocketChannel时,需要将worker传到SelectorThread中,以便接收到client,分配Selector
st.setWorker(stg);
st.selector.wakeup();
}else {
//在worker线程中选取
SelectorThread st = nextV2();
st.lbq.put(c);
st.selector.wakeup();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private SelectorThread next() {
int index = xid.incrementAndGet() % sts.length;
return sts[index];
}
private SelectorThread nextV2() {
int index = xid.incrementAndGet() % stg.sts.length;
return sts[index];
}
}
SelectorThread
public class SelectorThread extends ThreadLocal<LinkedBlockingDeque<Channel>> implements Runnable{
Selector selector = null;
LinkedBlockingDeque<Channel> lbq = get();
SelectorThreadGroup stg;
@Override
protected LinkedBlockingDeque<Channel> initialValue() {
return new LinkedBlockingDeque<>();
}
SelectorThread(SelectorThreadGroup stg){
try {
this.stg = stg;
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//阻塞
int nums = selector.select();
if(nums>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
if(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
acceptHandler(key);
}else if(key.isReadable()){
readHandler(key);
}else if(key.isWritable()){
}
}
}
if(!lbq.isEmpty()){
Channel c = lbq.take();
if(c instanceof ServerSocketChannel){
ServerSocketChannel server = (ServerSocketChannel) c;
server.register(selector,SelectionKey.OP_ACCEPT);
System.out.println(Thread.currentThread().getName()+" register listen");
}else if(c instanceof SocketChannel){
SocketChannel client = (SocketChannel) c;
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
client.register(selector,SelectionKey.OP_READ,byteBuffer);
System.out.println(Thread.currentThread().getName()+" register client:"+client.getRemoteAddress());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void readHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" read......");
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel client=(SocketChannel)key.channel();
buffer.clear();
while(true){
try {
int num=client.read(buffer);
if(num>0){
buffer.flip();
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
}else if(num==0){
break;
}else if(num<0){
System.out.println("client:"+client.getRemoteAddress()+" is closed.....");
key.cancel();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void acceptHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" acceptHandler....");
ServerSocketChannel server=(ServerSocketChannel)key.channel();
try {
SocketChannel client = server.accept();
client.configureBlocking(false);
stg.nextSelector(client);
} catch (IOException e) {
e.printStackTrace();
}
}
public void setWorker(SelectorThreadGroup stgWorker) {
this.stg = stgWorker;
}
}