Redis源码学习笔记

Redis源码学习之事件

2019-05-17  本文已影响0人  lixin_karl

事件

   Redis服务器是一个事件驱动程序,服务器主要处理时间事件与文件事件。其中,时间事件是指服务器中的一些操作比如(serverCron函数)在给定的时间点执行函数,文件事件就是指服务器通过套接字跟客户端通信,redis中client结构就是套接字的一个封装。

一、文件事件

   Redis服务器采用了IO多路复用技术,实现高性能的监听。每来一个客户端连接或生成一个aeFileEvent,AE_READABLE表示此事件可读,即客户端发送命令到达服务器。AE_WRITABLE表示此事件可写,即服务器可以发送结果到客户端了。

二、时间事件

Redis目前的时间事件就是周期性处理,服务器将所有时间事件放到一个链表中,每次遍历链表,如果达到指定的时间点,那么就去处理事件。

三、结构与API
aeEventLoop *aeCreateEventLoop(int setsize);//创建并初始化事件循环结构体
void aeDeleteEventLoop(aeEventLoop *eventLoop);//析构 事件循环结构体
void aeStop(aeEventLoop *eventLoop);//停止事件循环
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);//创建文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);//删除文件事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);//获得文件事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);//创建时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);//删除时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);//处理所有事件
int aeWait(int fd, int mask, long long milliseconds);//等待
void aeMain(aeEventLoop *eventLoop);//循环等待处理aeProcessEvents函数
char *aeGetApiName(void);//返回 "select"
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);//设置休眠之前的处理函数
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);//设置休眠之后的处理函数
int aeGetSetSize(aeEventLoop *eventLoop);//get setsize
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);//reallcoc fileevent set
        /*处理每个正在进行的时间事件,文件事件。没有特殊标志的函数休眠直到一些文件事件被触发,或者下一个时间事件发生
         *flags=0 啥也不做
         *flags=AE_ALL_EVENTS 处理所有事件
         *flags=AE_FILE_EVENTS 处理所有文件事件
         *flags=AE_TIME_EVENTS 处理所有时间事件
         *flags=AE_DONT_WAIT  返回ASAP 直到所有不需要被等待的可能被处理的事件都被处理
         *flags=AE_CALL_AFTER_SLEEP 调用休眠后函数*/
        int aeProcessEvents(aeEventLoop *eventLoop, int flags)
        {
            int processed = 0, numevents;
            /*如果不是时间事件也不是文件事件返回ASAP */
            if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
            //如果maxfd!=-1表示有事件 或者 需要处理时间事件且不处理不等待事件
            if (eventLoop->maxfd != -1 ||
                ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
                int j;aeTimeEvent *shortest = NULL;
                struct timeval tv, *tvp;
                //如果flags是指需要处理时间事件且不处理不等待事件,寻找时间参数最小的时间事件
                if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                    shortest = aeSearchNearestTimer(eventLoop);
                if (shortest) {//只要存在时间事件 就会走到这步
                    long now_sec, now_ms;
                    aeGetTime(&now_sec, &now_ms);//得到现在的时间
                    tvp = &tv;
                    /* 下次事件触发需要多少微秒 */
                    long long ms=(shortest->when_sec - now_sec)*1000 +
                        shortest->when_ms - now_ms;
                    if (ms > 0) {
                        tvp->tv_sec = ms/1000;
                        tvp->tv_usec = (ms % 1000)*1000;
                    } else {
                        tvp->tv_sec = 0;
                        tvp->tv_usec = 0;
                    }
                } else {
                    if (flags & AE_DONT_WAIT) {
                        tv.tv_sec = tv.tv_usec = 0;
                        tvp = &tv;
                    } else {
                        /* Otherwise we can block */
                        tvp = NULL; /* wait forever */
                    }
                }
                numevents = aeApiPoll(eventLoop, tvp);//多路复用API 可读 可写结果都出来了
                /* 调用休眠之后的回调函数*/
                if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
                    eventLoop->aftersleep(eventLoop);
                for (j = 0; j < numevents; j++) {
                    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                    int mask = eventLoop->fired[j].mask;
                    int fd = eventLoop->fired[j].fd;
                    int fired = 0; /* 当前fd触发的文件事件的个数 */
                    /*
                     * 设置AE_BARRIER      读-写
                     * 未设置AE_BARRIER    写-读
                     * */
                    int invert = fe->mask & AE_BARRIER;
                    //如果没有设置AE_BARRIER 且可读 处理读事件
                    if (!invert && fe->mask & mask & AE_READABLE) {
                        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                    //处理写
                    if (fe->mask & mask & AE_WRITABLE) {
                        if (!fired || fe->wfileProc != fe->rfileProc) {
                            fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                            fired++;
                        }
                    }
                    //设置了AE_BARRIER 最后处理读
                    if (invert && fe->mask & mask & AE_READABLE) {
                        if (!fired || fe->wfileProc != fe->rfileProc) {
                            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                            fired++;
                        }
                    }
                    processed++;
                }
            }
            /* 如果需要处理时间事件*/
            if (flags & AE_TIME_EVENTS)
                processed += processTimeEvents(eventLoop);//处理函数,返回被处理的时间事件的个数
            return processed; /* return the number of processed file/time events */
        }
3. processTimeEvents
        /* 处理时间事件*/
        static int processTimeEvents(aeEventLoop *eventLoop) {
            int processed = 0;
            aeTimeEvent *te;
            long long maxId;
            time_t now = time(NULL);
            if (now < eventLoop->lastTime) {//防止系统时间被往前调了,重新设置时间数据
                te = eventLoop->timeEventHead;
                while(te) {
                    te->when_sec = 0;
                    te = te->next;
                }
            }
            eventLoop->lastTime = now;//最近一次被处理时间
            te = eventLoop->timeEventHead;
            maxId = eventLoop->timeEventNextId-1;
            while(te) { //一个个的处理
                long now_sec, now_ms;
                long long id;
                /* id==-1的话 是要被删除的 */
                if (te->id == AE_DELETED_EVENT_ID) {
                    aeTimeEvent *next = te->next;
                    if (te->prev)
                        te->prev->next = te->next;
                    else
                        eventLoop->timeEventHead = te->next;
                    if (te->next)
                        te->next->prev = te->prev;
                    if (te->finalizerProc)
                        te->finalizerProc(eventLoop, te->clientData);
                    zfree(te);
                    te = next;
                    continue;
                }
                //再这次循环中我们不处理刚触发的时间事件
                if (te->id > maxId) {
                    te = te->next;
                    continue;
                }
                aeGetTime(&now_sec, &now_ms);
                //现在时间大于预设的时间
                if (now_sec > te->when_sec ||
                    (now_sec == te->when_sec && now_ms >= te->when_ms))
                {
                    int retval;
                    id = te->id;
                    retval = te->timeProc(eventLoop, id, te->clientData);//时间处理函数
                    processed++;
                    if (retval != AE_NOMORE) {//周期事件
                        aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                    } else {//定时事件 处理完就删除
                                        te->id = AE_DELETED_EVENT_ID;
                    }
                }
                te = te->next;
            }
            return processed;
        }

四、参考

上一篇下一篇

猜你喜欢

热点阅读