Reactor模式的实现
2018-01-01 本文已影响141人
秦汉邮侠
参考来源
实现
- C++实现
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<netdb.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<netdb.h>
#include<sys/time.h>
#include<string.h>
#include<sys/select.h>
#include<pthread.h>
/*************************
*用于返回最大文件描述值
*
*************************/
int max_fd(int a[], int num)
{
int max = -1;
int i = 0;
for(i = 0; i < num; i++)
{
if(a[i] > max)
{
max = a[i];
}
}
return max;
}
int main(int argc, char *argv[])
{
int sockfd = 0;
int confd = 0;
int i = 0, j = 0;
fd_set fd_read[2];
int clifd[FD_SETSIZE]; /*存放监听以及已与客户连接的套接字*/
int fd_count = 0; /*已经连接的套接字个数*/
struct sockaddr_in seradd, cliadd;
int fd_ret = 0;
socklen_t cli_len = sizeof(cliadd);
char readbuf[1024];
char writebuf[1024];
/*create sock;*/
sockfd = socket(AF_INET, SOCK_STREAM, 0); /*此套接字是阻塞的*/
if(sockfd < 0)
{
perror("sock create fail !!");
exit(-1);
}
seradd.sin_family = AF_INET;
seradd.sin_port = htons(8080);
seradd.sin_addr.s_addr = htonl(INADDR_ANY);
/*bind*/
if(-1 == (bind(sockfd, (struct sockaddr *)&seradd, sizeof(struct sockaddr))))
{
perror("bind error");
exit(-1);
}
/*listen*/
if(-1 == (listen(sockfd, 5)))
{
perror("listen error");
exit(-1);
}
//memset(&fd_read[0],0,sizeof(fd_read[0]));
FD_ZERO(&fd_read[0]); /*清空fd_read[0]所有位*/
FD_SET(sockfd, &fd_read[0]); /*将sockfd添加到fd_read[0]描述集中,也就是说将sockfd对应的fd_read[0]位中置位*/
for(i = 0; i < FD_SETSIZE; i++)
{
clifd[i] = -1;
}
clifd[0] = sockfd;
printf("--ser start work---\n");
while(1)
{
FD_ZERO(&fd_read[1]);
fd_read[1] = fd_read[0]; /*每次select后,没有达到条件的描述将被清空,所以每次都需要重新赋值*/
/*进程将阻塞在select函数处,直到在整个队列中有读描述符可用为止,个人理解--内核检测整个队列,然后将可用的描述符返回(一个或多个,一个没有时将阻塞)*/
fd_ret = select(max_fd(clifd, FD_SETSIZE) + 1, &fd_read[1], NULL, NULL, NULL); /*我们只关心读描述符集*/
if(fd_ret < 0)
{
perror("select error");
}
else if(fd_ret > 0)
{
/*是监听套接字可读*/
if(FD_ISSET(sockfd, &fd_read[1]) && (fd_count < FD_SETSIZE - 1))
{
confd = accept(sockfd, (struct sockaddr *)&cliadd, &cli_len); /*获取与客户端连接套接字*/
if(-1 == confd)
{
perror("confd error");
}
for(i = 1; i < FD_SETSIZE; i++)
{
if(clifd[i] == -1)
{
clifd[i] = confd; /*将获得新连接套接字放到clifd数组中*/
FD_SET(confd, &fd_read[0]); /*将获得新连接套接字添加到读描述集中*/
fd_count++;
break;
}
}
}
/*连接套接字可读*/
for(j = 1; j < FD_SETSIZE; j++)
{
if(FD_ISSET(clifd[j], &fd_read[1]))
{
/*从clifd[i]套接字中读取数据*/
if(read(clifd[j], readbuf, sizeof(readbuf)) <= 0)
{
perror("read data error");
FD_CLR(clifd[j], &fd_read[0]); /*将clifd[j]描述从读描述符集中删除*/
close(clifd[j]); /*关闭该套接字*/
clifd[j] = -1;
fd_count--;
continue;
}
strcpy(writebuf, readbuf);
printf("read data:%s\n", readbuf);
if(write(clifd[j], writebuf, sizeof(writebuf)) <= 0)
{
perror("write data error");
FD_CLR(clifd[j], &fd_read[0]);
close(clifd[j]);
clifd[j] = -1;
fd_count--;
continue;
}
}
}
}
}
}
- Java实现
- Reactor
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);
//向selector注册该channel
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("-->Start serverSocket.register!");
//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
sk.attach(new Acceptor());
System.out.println("-->attach(new Acceptor()!");
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (it.hasNext()) {
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
System.out.println("reactor stop!" + ex);
}
}
//运行Acceptor或SocketReadHandler
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable { // inner
public void run() {
try {
System.out.println("-->ready for accept!");
SocketChannel c = serverSocket.accept();
if (c != null)
//调用Handler来处理channel
new Handler(selector, c);
} catch (IOException ex) {
}
}
}
}
- Handler
public class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//设置为非阻塞模式
c.configureBlocking(false);
//此处的0,表示不关注任何时间
sk = socket.register(sel, 0);
//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
sk.attach(this);
//将SelectionKey标记为可读,以便读取,不可关注可写事件
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
return false;
}
boolean outputIsComplete() {
return false;
}
//这里可以通过线程池处理数据
void process() {
}
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
//
sk.cancel();
}
}
}