Reactor模式的实现

2018-01-01  本文已影响141人  秦汉邮侠

参考来源

实现

#include<stdio.h>
 
#include<stdlib.h>
 
#include<unistd.h>
 
#include<errno.h>
 
#include<netdb.h>
 
#include<sys/types.h>
 
#include<sys/socket.h>
 
#include<netinet/in.h>
 
#include<arpa/inet.h>
 
#include<netdb.h>
 
#include<sys/time.h>
 
#include<string.h>
 
#include<sys/select.h>
 
#include<pthread.h>
 
 
 
 
/*************************
*用于返回最大文件描述值
*
*************************/
int max_fd(int a[], int num)
{
    int max = -1;
    int i = 0;
    for(i = 0; i < num; i++)
    {
        if(a[i] > max)
        {
            max = a[i];
 
        }
    }
 
    return max;
}
 
int main(int argc, char *argv[])
{
    int sockfd = 0;
    int confd = 0;
    int i = 0, j = 0;
    fd_set fd_read[2];
    int clifd[FD_SETSIZE]; /*存放监听以及已与客户连接的套接字*/
    int fd_count = 0; /*已经连接的套接字个数*/
    struct sockaddr_in seradd, cliadd;
    int fd_ret = 0;
    socklen_t cli_len = sizeof(cliadd);
    char readbuf[1024];
    char writebuf[1024];
    /*create sock;*/
    sockfd = socket(AF_INET, SOCK_STREAM, 0); /*此套接字是阻塞的*/
    if(sockfd < 0)
    {
        perror("sock create fail !!");
        exit(-1);
    }
 
 
    seradd.sin_family = AF_INET;
    seradd.sin_port = htons(8080);
    seradd.sin_addr.s_addr = htonl(INADDR_ANY);
 
    /*bind*/
    if(-1 == (bind(sockfd, (struct sockaddr *)&seradd, sizeof(struct sockaddr))))
    {
        perror("bind error");
        exit(-1);
    }
 
    /*listen*/
    if(-1 == (listen(sockfd, 5)))
    {
        perror("listen error");
        exit(-1);
    }
 
    //memset(&fd_read[0],0,sizeof(fd_read[0]));
 
    FD_ZERO(&fd_read[0]); /*清空fd_read[0]所有位*/
    FD_SET(sockfd, &fd_read[0]); /*将sockfd添加到fd_read[0]描述集中,也就是说将sockfd对应的fd_read[0]位中置位*/
    for(i = 0; i < FD_SETSIZE; i++)
    {
        clifd[i] = -1;
    }
 
    clifd[0] = sockfd;
    printf("--ser start work---\n");
    while(1)
    {
        FD_ZERO(&fd_read[1]);
        fd_read[1] = fd_read[0]; /*每次select后,没有达到条件的描述将被清空,所以每次都需要重新赋值*/
        /*进程将阻塞在select函数处,直到在整个队列中有读描述符可用为止,个人理解--内核检测整个队列,然后将可用的描述符返回(一个或多个,一个没有时将阻塞)*/
        fd_ret = select(max_fd(clifd, FD_SETSIZE) + 1, &fd_read[1], NULL, NULL, NULL); /*我们只关心读描述符集*/
 
        if(fd_ret < 0)
        {
            perror("select error");
        }
        else if(fd_ret > 0)
        {
            /*是监听套接字可读*/
            if(FD_ISSET(sockfd, &fd_read[1]) && (fd_count < FD_SETSIZE - 1))
            {
 
                confd = accept(sockfd, (struct sockaddr *)&cliadd, &cli_len); /*获取与客户端连接套接字*/
                if(-1 == confd)
                {
                    perror("confd error");
                }
 
                for(i = 1; i < FD_SETSIZE; i++)
                {
                    if(clifd[i] == -1)
                    {
                        clifd[i] = confd; /*将获得新连接套接字放到clifd数组中*/
                        FD_SET(confd, &fd_read[0]); /*将获得新连接套接字添加到读描述集中*/
                        fd_count++;
                        break;
                    }
                }
            }
 
            /*连接套接字可读*/
            for(j = 1; j < FD_SETSIZE; j++)
            {
                if(FD_ISSET(clifd[j], &fd_read[1]))
                {
                    /*从clifd[i]套接字中读取数据*/
                    if(read(clifd[j], readbuf, sizeof(readbuf)) <= 0)
                    {
                        perror("read data error");
                        FD_CLR(clifd[j], &fd_read[0]); /*将clifd[j]描述从读描述符集中删除*/
                        close(clifd[j]); /*关闭该套接字*/
                        clifd[j] = -1;
                        fd_count--;
                        continue;
 
                    }
 
                    strcpy(writebuf, readbuf);
                    printf("read data:%s\n", readbuf);
                    if(write(clifd[j], writebuf, sizeof(writebuf)) <= 0)
                    {
                        perror("write data error");
                        FD_CLR(clifd[j], &fd_read[0]);
                        close(clifd[j]);
                        clifd[j] = -1;
                        fd_count--;
                        continue;
                    }
                }
            }
        }
 
 
    }
 
}
public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);
        //向selector注册该channel
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("-->Start serverSocket.register!");

        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        System.out.println("-->attach(new Acceptor()!");
    }


    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    //来一个事件 第一次触发一个accepter线程
                    //以后触发SocketReadHandler
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            System.out.println("reactor stop!" + ex);
        }
    }

    //运行Acceptor或SocketReadHandler
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                System.out.println("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    //调用Handler来处理channel
                    new Handler(selector, c);
            } catch (IOException ex) {
            }
        }
    }
}
public class Handler implements Runnable {

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        //设置为非阻塞模式
        c.configureBlocking(false);
        //此处的0,表示不关注任何时间
        sk = socket.register(sel, 0);
        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
        sk.attach(this);
        //将SelectionKey标记为可读,以便读取,不可关注可写事件
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        return false;
    }

    boolean outputIsComplete() {
        return false;
    }

    //这里可以通过线程池处理数据
    void process() {

    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }

    }


    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            //
            sk.cancel();
        }
    }

}
上一篇 下一篇

猜你喜欢

热点阅读