dpvs学习笔记: 6 定时器实现及连接老化超时

2018-11-08  本文已影响105人  董泽润

定时器实现方式很多种,定时器个数也不同。比如 go 老版本只有一个全局定时器,所以有些高性能项目抛弃了,自己实现用户级别的定时器,并且开启 n 个。但是在新版本没必要了,默认 64 个。dpvs 由于流表可能巨大,并且处在不同状态的 tcp 连接超时时间还不一样,如果实现的低效,会非常影响性能。所以 dpvs 在利用 dpdk 定时器的基础上,自实现了一个,算法是常见的时间轮。

数据结构

struct timer_scheduler {
    /* wheels and cursors */
    rte_spinlock_t      lock;
    uint32_t            cursors[LEVEL_DEPTH];
    struct list_head    *hashs[LEVEL_DEPTH];

    /* leverage dpdk rte_timer to drive us */
    struct rte_timer    rte_tim;
};

lock 是锁,增删改查都要锁。rte_tim 是 dpdk 库自身的定时器,他只负责时间嘀嗒,也就是 tick ++ 操作。cursors 和 hashs 配合使用,构成时间轮。

cursors 保存嘀嗒计数,从 0 到 LEVEL_SIZE (2<<18), LEVEL_DEPTH 值为 2,也就是说 cursors[0] 可以保存 2<<18 个嘀嗒,如果 DPVS_TIMER_HZ = 1000,那么就是 524s. cursors[0] 驱动 cursors[1] 时间轮,两个轮一共 8.7 年时间。hashs 是一个长度为 2 的数组,每个元素是一个链表数组,长度是 LEVEL_SIZE (2<<18), 链表成员就是具体的定时器消息。

初始化

首先,dpvs 调用 rte_timer_subsystem_init 初始化 dpdk 库的定时器。然后 dpvs_timer_init 初始化 dpvs 定时器。

int dpvs_timer_init(void)
{
    lcoreid_t cid;
    int err;

    /* per-lcore timer */
    // 每个 logic core 一个定时器
    rte_eal_mp_remote_launch(timer_lcore_init, NULL, SKIP_MASTER);
    RTE_LCORE_FOREACH_SLAVE(cid) {
        err = rte_eal_wait_lcore(cid);
        if (err < 0) {
            RTE_LOG(ERR, DTIMER, "%s: lcore %d: %s.\n", 
                    __func__, cid, dpvs_strerror(err));
            return err;
        }
    }

    /* global timer */
    return timer_init_schedler(&g_timer_sched, rte_get_master_lcore());
}

看源码得知,每个 slave lcore 都要调 timer_lcore_init 初始化自己的。然后再初始化全局的 g_timer_sched.

初始化 timer_lcore_init

static int timer_init_schedler(struct timer_scheduler *sched, lcoreid_t cid)
{
    int i, l;

    rte_spinlock_init(&sched->lock);


    rte_spinlock_lock(&sched->lock);
    for (l = 0; l < LEVEL_DEPTH; l++) {
        sched->cursors[l] = 0;

        sched->hashs[l] = rte_malloc(NULL,
                                     sizeof(struct list_head) * LEVEL_SIZE, 0);
        if (!sched->hashs[l]) {
            RTE_LOG(ERR, DTIMER, "[%02d] no memory.\n", cid);
            return EDPVS_NOMEM;
        }

        for (i = 0; i < LEVEL_SIZE; i++)
            INIT_LIST_HEAD(&sched->hashs[l][i]);
    }
    rte_spinlock_unlock(&sched->lock);

    rte_timer_init(&sched->rte_tim);
    /* ticks should be exactly same with precision */
    if (rte_timer_reset(&sched->rte_tim, rte_get_timer_hz() / DPVS_TIMER_HZ,
                        PERIODICAL, cid, rte_timer_tick_cb, sched) != 0) {
        RTE_LOG(ERR, DTIMER, "[%02d] fail to reset rte timer.\n", cid);
        return EDPVS_INVAL;
    }

    RTE_LOG(DEBUG, DTIMER, "[%02d] timer initialized %p.\n", cid, sched);
    return EDPVS_OK;
}

代码蛮详细了,需要注意两点

  1. hashs 是一个二维数组,大小是 2 * sizeof(struct list_head) * LEVEL_SIZE,空间换时间
  2. 利用了 dpdk 自身的定时器去维护嘀嗒,通过 rte_timer_reset 来注册,回调函数是 rte_timer_tick_cb,间隔是 rte_get_timer_hz() / DPVS_TIMER_HZ ticks

回调函数 rte_timer_tick_cb

static void rte_timer_tick_cb(struct rte_timer *tim, void *arg)
{
    struct timer_scheduler *sched = arg;
    struct dpvs_timer *timer, *next;
    uint64_t left, hash, off;
    int level, lower;
    uint32_t *cursor;
    bool carry;

    rte_spinlock_lock(&sched->lock);
    /* drive timer to move and handle expired timers. */
    for (level = 0; level < LEVEL_DEPTH; level++) {
        cursor = &sched->cursors[level];
        (*cursor)++; // tick ++ 

        if (*cursor < LEVEL_SIZE) {
            carry = false;
        } else {
            /* reset the cursor and handle next level later. */
            *cursor = 0;
            carry = true;
        }
        // 遍历当前 cursor 链表
        list_for_each_entry_safe(timer, next,
                                 &sched->hashs[level][*cursor], list) {
            /* is all lower levels ticks empty ? */
            left = timer->delay % get_level_ticks(level);
            if (!left) {
                timer_expire(sched, timer);
            } else {
                /* drop to lower level wheel, note it may not drop to
                 * "next" lower level wheel. */
                list_del(&timer->list);

                lower = level;
                while (--lower >= 0) {
                    off = timer->delay / get_level_ticks(lower);
                    if (!off)
                        continue; /* next lower level */

                    hash = (*cursor + off) % LEVEL_SIZE;
                    list_add_tail(&timer->list, &sched->hashs[lower][hash]);
                    break;
                }
                assert(lower >= 0);
            }
        }
        if (!carry)
            break;
    }
    rte_spinlock_unlock(&sched->lock);
    return;
}
  1. 当 level 0 时,使用第一个轮,并调用 (*cursor)++ 将嘀嗒加一。如果嘀嗒值大于 LEVEL_SIZE 说明第一个轮用完了,重置为 0 并设置 carry 进位标记。
  2. 遍历当前轮的 hash 数组,get_level_ticks 获取每一层的轮步进一个单位代表多少嘀嗒,当 level=0 时返回 1, 当 level=1时返回 LEVEL_SIZE。left = timer->delay % get_level_ticks(level) 如果 left 为 0 说明这个定时任务属于当前时间轮,并且己经过期了,触发 timer_expire 回调。
  3. 如果 left 有值,说明还没到超时时间。但是时间轮己经触发一次了,所以要降级到 --lower 层。从原有的链表中调用 list_del 删除,然后 timer->delay / get_level_ticks(lower) 求出在当前层步进单位个数,(*cursor + off) % LEVEL_SIZE 求出 hash 索引,然后 list_add_tail 添加到对应链表中。
  4. 如果没有进位,也就是 carry 未标记,那么退出循环。否则处理下一层的轮。

添加超时任务

dp_vs_conn_new 新建连接时会调用 dpvs_timer_sched 添加超时任务

dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, true);

看一下具体的 dpvs_timer_sched 实现

static int __dpvs_timer_sched(struct timer_scheduler *sched,
                              struct dpvs_timer *timer, struct timeval *delay,
                              dpvs_timer_cb_t handler, void *arg, bool period)
{
    uint32_t off, hash;
    int level;

    assert(timer && delay && handler);

    if (timer_pending(timer))
        RTE_LOG(WARNING, DTIMER, "schedule a pending timer ?\n");

    timer->handler = handler;
    timer->priv = arg;
    timer->is_period = period;
    timer->delay = timeval_to_ticks(delay);

    if (unlikely(timer->delay >= TIMER_MAX_TICKS)) {
        RTE_LOG(WARNING, DTIMER, "exceed timer range\n");
        return EDPVS_INVAL;
    }

    /*
     * to schedule a 0 delay timer is not make sence.
     * and it will never stopped (periodic) or never triggered (one-shut).
     */
    if (unlikely(!timer->delay)) {
        RTE_LOG(WARNING, DTIMER, "schedule 0 timeout timer.\n");
        return EDPVS_INVAL;
    }

    /* add to corresponding wheel, from higher level to lower. */
    for (level = LEVEL_DEPTH - 1; level >= 0; level--) {
        off = timer->delay / get_level_ticks(level);
        if (off > 0) {
            hash = (sched->cursors[level] + off) % LEVEL_SIZE;
            list_add_tail(&timer->list, &sched->hashs[level][hash]);
            return EDPVS_OK;
        }
    }

    /* not adopted by any wheel (never happend) */
    return EDPVS_INVAL;
}
  1. 生成 timer 任务结构体,最重要的是 timeval_to_ticks 根据超时时间生成嘀嗒,也就是说过了 delay 个嘀嗒后超时。
  2. 各种异常 case 检测,不能为 0,不能超过最大 TIMER_MAX_TICKS
  3. 从最大层开始遍历时间轮,添加到指定链表中。off 是在当前轮中需要多少步进单位,hash 求出索引。

连接老化处理

在 tcp 每个阶段,都有不同的超时时间,并且超时到期后动作也有所不同,

static int conn_expire(void *priv)
{
    struct dp_vs_conn *conn = priv;
    struct dp_vs_proto *pp;
    struct rte_mbuf *cloned_syn_mbuf;
    struct dp_vs_synproxy_ack_pakcet *ack_mbuf, *t_ack_mbuf;
    struct rte_mempool *pool;
    assert(conn);

    /* set proper timeout */
    unsigned conn_timeout = 0;

    pp = dp_vs_proto_lookup(conn->proto);
    if (((conn->proto == IPPROTO_TCP) &&
        (conn->state == DPVS_TCP_S_ESTABLISHED)) ||
        ((conn->proto == IPPROTO_UDP) &&
        (conn->state == DPVS_UDP_S_NORMAL))) {
        conn_timeout = dp_vs_get_conn_timeout(conn);
        if (unlikely(conn_timeout > 0))
            conn->timeout.tv_sec = conn_timeout;
        else if (pp && pp->timeout_table)
            conn->timeout.tv_sec = pp->timeout_table[conn->state];
        else
            conn->timeout.tv_sec = 60;
    }
    else if (pp && pp->timeout_table)
        conn->timeout.tv_sec = pp->timeout_table[conn->state];
    else
        conn->timeout.tv_sec = 60;

dp_vs_get_conn_timeout 获取超时时间,查看源码是在配置 dpvs 时后端服务的超时时间

    dpvs_time_rand_delay(&conn->timeout, 1000000);
    rte_atomic32_inc(&conn->refcnt);

    /* retransmit syn packet to rs */
    if (conn->syn_mbuf && rte_atomic32_read(&conn->syn_retry_max) > 0) {
        if (likely(conn->packet_xmit != NULL)) {
            pool = get_mbuf_pool(conn, DPVS_CONN_DIR_INBOUND);
            if (unlikely(!pool)) {
                RTE_LOG(WARNING, IPVS, "%s: no route for syn_proxy rs's syn "
                        "retransmit\n", __func__);
            } else {
                cloned_syn_mbuf = rte_pktmbuf_clone(conn->syn_mbuf, pool);
                if (unlikely(!cloned_syn_mbuf)) {
                    RTE_LOG(WARNING, IPVS, "%s: no memory for syn_proxy rs's syn "
                            "retransmit\n", __func__);
                } else {
                    cloned_syn_mbuf->userdata = NULL;
                    conn->packet_xmit(pp, conn, cloned_syn_mbuf);
                }
            }
        }

        rte_atomic32_dec(&conn->syn_retry_max);
        dp_vs_estats_inc(SYNPROXY_RS_ERROR);

        /* expire later */
        dp_vs_conn_put(conn);
        return DTIMER_OK;
    }

如果支持 syn proxy, 并且当前 syn_mbuf 还在,syn_retry_max 重试次数大于 0,重发给 client

    /* somebody is controlled by me, expire later */
    if (rte_atomic32_read(&conn->n_control)) {
        dp_vs_conn_put(conn);
        return DTIMER_OK;
    }

其实不太理解这个 n_control 有什么用,慢慢看吧

    /* unhash it then no further user can get it,
     * even we cannot del it now. */
    conn_unhash(conn);

这是重点,删除连接,都超时了留着有啥用,清理流表

    /* refcnt == 1 means we are the only referer.
     * no one is using the conn and it's timed out. */
    if (rte_atomic32_read(&conn->refcnt) == 1) {
        struct dp_vs_proto *proto = dp_vs_proto_lookup(conn->proto);

        if (conn->flags & DPVS_CONN_F_TEMPLATE)
            dpvs_timer_cancel(&conn->timer, true);
        else
            dpvs_timer_cancel(&conn->timer, false);

        /* I was controlled by someone */
        if (conn->control)
            dp_vs_control_del(conn);

        if (proto && proto->conn_expire)
            proto->conn_expire(proto, conn);

        if (conn->dest->fwdmode == DPVS_FWD_MODE_SNAT
                && conn->proto != IPPROTO_ICMP) {
            struct sockaddr_in daddr, saddr;

            memset(&daddr, 0, sizeof(daddr));
            daddr.sin_family = AF_INET;
            daddr.sin_addr = conn->caddr.in;
            daddr.sin_port = conn->cport;

            memset(&saddr, 0, sizeof(saddr));
            saddr.sin_family = AF_INET;
            saddr.sin_addr = conn->vaddr.in;
            saddr.sin_port = conn->vport;

            sa_release(conn->out_dev, &daddr, &saddr);
        }

        conn_unbind_dest(conn);
        dp_vs_laddr_unbind(conn);

        /* free stored ack packet */
        list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) {
            list_del_init(&ack_mbuf->list);
            rte_pktmbuf_free(ack_mbuf->mbuf);
            sp_dbg_stats32_dec(sp_ack_saved);
            rte_mempool_put(this_ack_mbufpool, ack_mbuf);
        }
        conn->ack_num = 0;

        /* free stored syn mbuf */
        if (conn->syn_mbuf) {
            rte_pktmbuf_free(conn->syn_mbuf);
            sp_dbg_stats32_dec(sp_syn_saved);
        }

        rte_atomic32_dec(&conn->refcnt);

        rte_mempool_put(conn->connpool, conn);
        this_conn_count--;

#ifdef CONFIG_DPVS_IPVS_STATS_DEBUG
        conn_stats_dump("del conn", conn);
#endif
#ifdef CONFIG_DPVS_IPVS_DEBUG
        conn_dump("del conn: ", conn);
#endif
        return DTIMER_STOP;
    }

如果引用计数 1,说明只有自己在用,获取协义,调用协义的连接老化函数 proto->conn_expire,这里还要兼容 DPVS_FWD_MODE_SNAT,我觉得代码写的丑... conn_unbind_dest 解除连接和后端的引用,dp_vs_laddr_unbind 释放本地地址。最后清理 ack_mbuf, syn_mbuf 等等

    conn_hash(conn);

如果走到这里,说明连接还有人在用,那么加回流表

    /* some one is using it when expire,
     * try del it again later */
    if (conn->flags & DPVS_CONN_F_TEMPLATE)
        dpvs_timer_update(&conn->timer, &conn->timeout, true);
    else
        dpvs_timer_update(&conn->timer, &conn->timeout, false);

    rte_atomic32_dec(&conn->refcnt);
    return DTIMER_OK;
}

如果走到这里,说明连接还有人在用,那么更新超时时间加回定时器。

tcp连接清除

static int tcp_conn_expire(struct dp_vs_proto *proto, 
                       struct dp_vs_conn *conn)
{
    int err;
    assert(proto && conn && conn->dest);

    if (conn->dest->fwdmode == DPVS_FWD_MODE_NAT 
            || conn->dest->fwdmode == DPVS_FWD_MODE_FNAT) {
        /* send RST to RS and client */
        err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_INBOUND);
        if (err != EDPVS_OK)
            RTE_LOG(WARNING, IPVS, "%s: fail RST RS.\n", __func__);
        err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_OUTBOUND);
        if (err != EDPVS_OK)
            RTE_LOG(WARNING, IPVS, "%s: fail RST Client.\n", __func__);
    }

    return EDPVS_OK;
}

代码很好理解,双向发送 rst

小结

这块实现的比较标准,大量连接时流表就是会很膨胀,很考验定时器性能。时间轮+多个定时器,想要高性能必备。

上一篇 下一篇

猜你喜欢

热点阅读