traffic control 之 INGRESS 队列

2020-10-26  本文已影响0人  分享放大价值

ingress队列用于报文入方向处理,限速,丢包等。

下面分析
a. 命令行下发格式。了解命令行下发时如何填充参数,对理解kernel代码是很有帮助的。
b. 内核端解析命令行参数,创建队列规则和filter规则。

//添加ingress qdisc
tc qdisc add dev eth0 ingress

//在ingress qdisc上添加过滤规则
//parent ffff:是ingress的handle。
//protocol和prio是filter命令的参数。
//u32是一种filter,后面的参数是u32的私有参数,表示对匹配源ip为网段
//1.1.1.0/24的报文,执行action为police,即进行限速,超过范围的drop掉。
tc filter add dev eth0 parent ffff: protocol ip prio 10 u32 match ip src 1.1.1.0/24 police rate 2048kbps burst 1m drop

可以加-h查看指定kind的help信息,比如下面查看u32类型的help信息

root@ubuntu:~# tc filter add dev ens9 u32 -h
What is "-h"?
Usage: ... u32 [ match SELECTOR ... ] [ link HTID ] [ classid CLASSID ]
               [ action ACTION_SPEC ] [ offset OFFSET_SPEC ]
               [ ht HTID ] [ hashkey HASHKEY_SPEC ]
               [ sample SAMPLE ] [skip_hw | skip_sw]
or         u32 divisor DIVISOR

Where: SELECTOR := SAMPLE SAMPLE ...
       SAMPLE := { ip | ip6 | udp | tcp | icmp | u{32|16|8} | mark }
                 SAMPLE_ARGS [ divisor DIVISOR ]
       FILTERID := X:Y:Z

NOTE: CLASSID is parsed at hexadecimal input.

c. 入方向报文处理流程。

下面先附上一张kernel端ingress qdisc和filter数据结构的关联


image.png

1. tc下发命令格式

下面这个结构体是tc下发命令使用的格式,最终会下发给kernel
端,kernel按照相同的格式解析出来。其中第一部分struct nlmsghdr是netlink下发消息的公共部分,包含具体命令和flag,第二部分struct tcmsg是tc的私有结构,第三部分是一个很大的字符型数组,可以保存配置。最终下发内容长度以实际配置为准,基本不会下发TCA_BUF_MAX这么长的内容。

#define TCA_BUF_MAX (64*1024)
struct {
        struct nlmsghdr n;
        struct tcmsg        t;
        char            buf[TCA_BUF_MAX];
}

对于tc子系统来说,buf可存储的内容为TCA_MAX 个struct nlattr,每个struct nlattr保存一种配置,每种配置还能嵌套配置。

enum {
    TCA_UNSPEC,
    TCA_KIND,    //qdisc种类
    TCA_OPTIONS, //qdisc的私有选项
    TCA_STATS,
    TCA_XSTATS,
    TCA_RATE,
    TCA_FCNT,
    TCA_STATS2,
    TCA_STAB,
    __TCA_MAX
};
#define TCA_MAX (__TCA_MAX - 1)
struct nlmsghdr + struct tcmsg + struct nlattr[TCA_MAX + 1]

2. 设置ingress qdisc

这里看一下执行下面命令创建ingress qdisc时,命令行和kernel端代码

tc qdisc add dev eth0 ingress

命令行端代码

//iproute2/tc/tc_qdisc.c
static int tc_qdisc_modify(int cmd, unsigned int flags, int argc, char **argv)
    struct {
        struct nlmsghdr n;
        struct tcmsg        t;
        char            buf[TCA_BUF_MAX];
    } req = {
        .n.nlmsg_len = NLMSG_LENGTH(sizeof(struct tcmsg)),
        .n.nlmsg_flags = NLM_F_REQUEST | , NLM_F_EXCL|NLM_F_CREATE,
        .n.nlmsg_type = RTM_NEWQDISC,
        .t.tcm_family = AF_UNSPEC,
        //固定值
        .t.tcm_parent = TC_H_INGRESS, 
        //取TC_H_INGRESS的高16位
        .t.tcm_handle = TC_H_MAKE(TC_H_INGRESS, 0),
        .t.tcm_ifindex= ll_name_to_index("eth0"),
    };
    //buf部分只需要携带kind即可,没有额外的参数
    //kind 为 "ingress"
    addattr_l(&req.n, sizeof(req), TCA_KIND, k, strlen(k)+1);

    //最终调用此函数将请求内容发送到kernel端
    rtnl_talk(&rth, &req.n, NULL)

kernel端只保留和ingress相关代码

//linux/net/sched/sck_api.c
static int tc_modify_qdisc(struct sk_buff *skb, struct nlmsghdr *n)
    struct net_device *dev;
    struct Qdisc *q;
    struct tcmsg *tcm;
    struct nlattr *tca[TCA_MAX + 1];

    //解析出配置的参数,保存到tca中
    nlmsg_parse(n, sizeof(*tcm), tca, TCA_MAX, NULL);
    tcm = nlmsg_data(n);
    clid = tcm->tcm_parent;
    dev = __dev_get_by_index(net, tcm->tcm_ifindex);

    //a. 获取 ingress queue。ingress qdisc会保存到ingress queue中
    if (dev_ingress_queue_create(dev)) {
        q = dev_ingress_queue(dev)->qdisc_sleeping;

    //b. 创建qdisc
    if (clid == TC_H_INGRESS) {
        if (dev_ingress_queue(dev))
            q = qdisc_create(dev, dev_ingress_queue(dev), p,
                     tcm->tcm_parent, tcm->tcm_parent,
                     tca, &err);
        else
            err = -ENOENT;
    }
    
    //c. 将新创建的qdisc赋给ingress queue
    qdisc_graft(dev, p, skb, n, clid, q, NULL);

a. 获取ingress_queue, 如果是第一次执行则创建

struct netdev_queue *dev_ingress_queue_create(struct net_device *dev)
{
    struct netdev_queue *queue = dev_ingress_queue(dev);

#ifdef CONFIG_NET_CLS_ACT
    if (queue)
        return queue;
    queue = kzalloc(sizeof(*queue), GFP_KERNEL);
    if (!queue)
        return NULL;
    netdev_init_one_queue(dev, queue, NULL);
    queue->qdisc = &noop_qdisc;
    queue->qdisc_sleeping = &noop_qdisc;
    rcu_assign_pointer(dev->ingress_queue, queue);
#endif
    return queue;
}

b. 创建qdisc,kind为ingress

static struct Qdisc *
qdisc_create(struct net_device *dev, struct netdev_queue *dev_queue, struct Qdisc *p, u32 parent, u32 handle, struct nlattr **tca, int *errp)
    struct nlattr *kind = tca[TCA_KIND];
    struct Qdisc *sch;
    struct Qdisc_ops *ops;
    //根据kind到链表qdisc_base查找是否已经加载,如果没有,则动态加载
    //对于ingress来说,ops为ingress_qdisc_ops,需要加载sch_ingress.ko
    ops = qdisc_lookup_ops(kind);
    //申请qdisc内存
    sch = qdisc_alloc(dev_queue, ops);
    //设置parent,对于ingress来说,parent是TC_H_INGRESS(0xfffffff1)
    sch->parent = parent;
    if (handle == TC_H_INGRESS) {
        //设置标志位 TCQ_F_INGRESS
        sch->flags |= TCQ_F_INGRESS;
        //获取handle值,TC_H_INGRESS的高16位
        handle = TC_H_MAKE(TC_H_INGRESS, 0);
        lockdep_set_class(qdisc_lock(sch), &qdisc_rx_lock);
    sch->handle = handle;
    //如果ops提供了init函数,则调用,对于ingress来说,为空
    ops->init(sch, tca[TCA_OPTIONS])
    //参考下面对于qdisc_list_add的注释,对于ingress来说,空操作
    qdisc_list_add(sch);
    return sch;

函数qdisc_list_add的作用是: parent不是 ROOT,并且parent不是 INGRESS 时才会插入链表 dev->qdisc,此链表首元素是根队列,比如下面的三种队列不会插入链表 dev->qdisc:
a. 根队列--parent为root(0xFFFFFFFFU) tc qdisc add dev xxx root ... 此命令会将新创建的qdisc赋给 dev->qdisc 当做首元素,后面创建了子qdisc才会插入此链表。并且新创建的qdisc也会赋给 struct net_device->_tx[i]
b. ingress 队列--parent为 ingress(TC_H_INGRESS) tc qdisc add dev xxx ingress 新创建的 qdisc 赋给 struct net_device->ingress_queue
c. clsact 队列--parent为 ingress(TC_H_INGRESS) tc qdisc add dev xxx clsact 新创建的 qdisc 赋给 struct net_device->ingress_queue

void qdisc_list_add(struct Qdisc *q)
{
    if ((q->parent != TC_H_ROOT) && !(q->flags & TCQ_F_INGRESS)) {
        struct Qdisc *root = qdisc_dev(q)->qdisc;

        WARN_ON_ONCE(root == &noop_qdisc);
        list_add_tail(&q->list, &root->list);
    }
}

c. 将新创建的qdisc赋给ingress queue

static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
               struct sk_buff *skb, struct nlmsghdr *n, u32 classid,
               struct Qdisc *new, struct Qdisc *old)
        //如果是ingress
        num_q = dev->num_tx_queues;
        if ((q && q->flags & TCQ_F_INGRESS) ||
            (new && new->flags & TCQ_F_INGRESS)) {
            num_q = 1;
            ingress = 1;
            if (!dev_ingress_queue(dev))
                return -ENOENT;
        }
        for (i = 0; i < num_q; i++) {
            struct netdev_queue *dev_queue = dev_ingress_queue(dev);

            if (!ingress)
                dev_queue = netdev_get_tx_queue(dev, i);
            //先将new qdisc赋给 dev_queue->qdisc_sleeping
            old = dev_graft_qdisc(dev_queue, new);
            if (new && i > 0)
                atomic_inc(&new->refcnt);

            if (!ingress)
                qdisc_destroy(old);
        }
        //如果网卡是up的,才会将dev_queue->qdisc_sleeping赋给
        //dev_queue->qdisc,处理报文时,用的也是dev_queue->qdisc。
        if (dev->flags & IFF_UP)
            dev_activate(dev);

struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue,
                  struct Qdisc *qdisc)
{
    struct Qdisc *oqdisc = dev_queue->qdisc_sleeping;
    spinlock_t *root_lock;

    root_lock = qdisc_lock(oqdisc);
    spin_lock_bh(root_lock);

    /* Prune old scheduler */
    if (oqdisc && atomic_read(&oqdisc->refcnt) <= 1)
        qdisc_reset(oqdisc);

    /* ... and graft new one */
    if (qdisc == NULL)
        qdisc = &noop_qdisc;
    //将new qdisc赋给dev_queue->qdisc_sleeping
    dev_queue->qdisc_sleeping = qdisc;
    rcu_assign_pointer(dev_queue->qdisc, &noop_qdisc);

    spin_unlock_bh(root_lock);

    return oqdisc;
}

3. 设置filter

执行下面命令给ingress qdisc设置filter。

tc filter add dev eth0 parent ffff: protocol ip prio 10 u32 match ip src 1.1.1.0/24 police rate 2048kbps burst 1m drop

parent ffff:是ingress的handle。
protocol和prio是filter命令的参数。
u32是一种filter,后面的参数是u32的私有参数,表示对源ip为网段1.1.1.0/24的报文,执行限速,超过范围的drop掉。

命令行端代码

//iproute2/tc/tc_filter.c
static int tc_filter_modify(int cmd, unsigned int flags, int argc, char **argv)
    struct {
        struct nlmsghdr n;
        struct tcmsg        t;
        char            buf[MAX_MSG];
    } req = {
        .n.nlmsg_len = NLMSG_LENGTH(sizeof(struct tcmsg)),
        .n.nlmsg_flags = NLM_F_REQUEST | NLM_F_EXCL|NLM_F_CREATE,
        .n.nlmsg_type = RTM_NEWTFILTER,
        .t.tcm_family = AF_UNSPEC,
    };
    req.t.tcm_ifindex = ll_name_to_index("eth0");
    req.t.tcm_parent = handle;  //parent ffff: 和 0000 组合成 ffff0000
    req.t.tcm_info = TC_H_MAKE(prio<<16, protocol); //prio和protocol组合
    addattr_l(&req.n, sizeof(req), TCA_KIND, k, strlen(k)+1); //kind为u32
    addattr_l(n, MAX_MSG, TCA_U32_SEL, &sel, sizeof(sel.sel) +
              sel.sel.nkeys * sizeof(struct tc_u32_key)); //u32的参数

kernel端代码

//linux/net/sched/cls_api.c
static int tc_ctl_tfilter(struct sk_buff *skb, struct nlmsghdr *n)
    struct nlattr *tca[TCA_MAX + 1];
    struct tcmsg *t;
    struct Qdisc  *q;
    struct net_device *dev;
    struct tcf_proto __rcu **back;
    struct tcf_proto __rcu **chain;
    struct tcf_proto *tp;
    const struct tcf_proto_ops *tp_ops;
    const struct Qdisc_class_ops *cops;

    nlmsg_parse(n, sizeof(*t), tca, TCA_MAX, NULL);
    t = nlmsg_data(n);
    protocol = TC_H_MIN(t->tcm_info);
    prio = TC_H_MAJ(t->tcm_info);
    nprio = prio;
    parent = t->tcm_parent;
    dev = __dev_get_by_index(net, t->tcm_ifindex);
    q = qdisc_lookup(dev, TC_H_MAJ(t->tcm_parent));
    //如果cl_ops没有提供函数tcf_chain,则说明不支持设置filter
    const struct Qdisc_class_ops *cops;
    cops = q->ops->cl_ops;
    if (cops->tcf_chain == NULL)
        return -EOPNOTSUPP;
    //如果parent低16指定了子类id,class id.
    //但是对于ingress qdisc来说,是没有分类的,而且从命令行
    //下发部分可知,parent低16位为0
    /* Do we search for filter, attached to class? */
    if (TC_H_MIN(parent)) {
        cl = cops->get(q, parent);
        if (cl == 0)
            return -ENOENT;
    }
    //对于ingress来说,直接返回链表头
    //struct ingress_qdisc_data *p = qdisc_priv(sch);
    //return &p->filter_list;
    /* And the last stroke */
    struct tcf_proto __rcu **chain;
    chain = cops->tcf_chain(q, cl);
    //根据优先级查找是否已经存在tp,如果不存在则创建,如果存在则更新
    /* Check the chain for existence of proto-tcf with this priority */
    for (back = chain;
         (tp = rtnl_dereference(*back)) != NULL;
         back = &tp->next) {
        if (tp->prio >= prio) {
            if (tp->prio == prio) {
                if (!nprio ||
                    (tp->protocol != protocol && protocol))
                    goto errout;
            } else
                tp = NULL;
            break;
        }
    }
    if (tp == NULL) {
        /* Proto-tcf does not exist, create new one */

        if (tca[TCA_KIND] == NULL || !protocol)
            goto errout;

        err = -ENOENT;
        if (n->nlmsg_type != RTM_NEWTFILTER ||
            !(n->nlmsg_flags & NLM_F_CREATE))
            goto errout;

        /* Create new proto tcf */
        err = -ENOBUFS;
        tp = kzalloc(sizeof(*tp), GFP_KERNEL);
        if (tp == NULL)
            goto errout;
        err = -ENOENT;
        //查找kind。kind通过register_tcf_proto_ops注册。比如u32类型的
        //在加载modprobe cls_u32时注册。
        tp_ops = tcf_proto_lookup_ops(tca[TCA_KIND]);
        tp->ops = tp_ops;
        tp->protocol = protocol;
        tp->prio = nprio ? :TC_H_MAJ(tcf_auto_prio(rtnl_dereference(*back)));
        tp->q = q;
        tp->classify = tp_ops->classify; //u32_classify
        tp->classid = parent;

        err = tp_ops->init(tp); //u32_init
        tp_created = 1;
    }
    if (tp_created) {
        RCU_INIT_POINTER(tp->next, rtnl_dereference(*back));
        rcu_assign_pointer(*back, tp);
    }

4. ingress处理数据包流程

ingress方向的数据包在协议栈入口函数__netif_receive_skb_core中被handle_ing处理。

static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
#ifdef CONFIG_NET_CLS_ACT
    skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
    if (!skb)
        goto out;
ncls:
#endif

如果skb经过handle_ing处理后被丢了,则返回NULL,否则返回原始报文。

static inline struct sk_buff *handle_ing(struct sk_buff *skb,
                     struct packet_type **pt_prev,
                     int *ret, struct net_device *orig_dev)
{
    //取出 ingress_queue
    struct netdev_queue *rxq = rcu_dereference(skb->dev->ingress_queue);
    //如果rxq为空或者rxq上的qdisc为noop_qdisc,说明没设置ingress qdisc
    if (!rxq || rcu_access_pointer(rxq->qdisc) == &noop_qdisc)
        goto out;

    if (*pt_prev) {
        *ret = deliver_skb(skb, *pt_prev, orig_dev);
        *pt_prev = NULL;
    }
    //调用ing_filter处理
    switch (ing_filter(skb, rxq)) {
    case TC_ACT_SHOT:
    case TC_ACT_STOLEN:
        kfree_skb(skb);
        return NULL;
    }

out:
    skb->tc_verd = 0;
    return skb;
}
static int ing_filter(struct sk_buff *skb, struct netdev_queue *rxq)
    struct net_device *dev = skb->dev;
    int result = TC_ACT_OK;
    //取出qdisc,如果不是无效值,则将skb入队
    q = rcu_dereference(rxq->qdisc);
    if (q != &noop_qdisc) {
        spin_lock(qdisc_lock(q));
        if (likely(!test_bit(__QDISC_STATE_DEACTIVATED, &q->state)))
            result = qdisc_enqueue_root(skb, q);
        spin_unlock(qdisc_lock(q));
    }

    return result;

static inline int qdisc_enqueue_root(struct sk_buff *skb, struct Qdisc *sch)
{
    qdisc_skb_cb(skb)->pkt_len = skb->len;
    return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
}
static inline int qdisc_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
    qdisc_calculate_pkt_len(skb, sch);
    //对于ingress来说,enqueue就是ingress_enqueue
    return sch->enqueue(skb, sch);
}

ingress qidsc的入队操作,虽然名字就入队,但实际上是执行filter

static int ingress_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
    struct ingress_qdisc_data *p = qdisc_priv(sch);
    struct tcf_result res;
    //获取filter链表头
    struct tcf_proto *fl = rcu_dereference_bh(p->filter_list);
    int result;

    result = tc_classify(skb, fl, &res);
}
//遍历filter链表,执行每个filter的classify函数,检查设置的参数
int tc_classify(struct sk_buff *skb, const struct tcf_proto *tp,
        struct tcf_result *res)
{
    int err = 0;
#ifdef CONFIG_NET_CLS_ACT
    const struct tcf_proto *otp = tp;
reclassify:
#endif

    err = tc_classify_compat(skb, tp, res);
#ifdef CONFIG_NET_CLS_ACT
    if (err == TC_ACT_RECLASSIFY) {
        u32 verd = G_TC_VERD(skb->tc_verd);
        tp = otp;

        if (verd++ >= MAX_REC_LOOP) {
            net_notice_ratelimited("%s: packet reclassify loop rule prio %u protocol %02x\n",
                           tp->q->ops->id,
                           tp->prio & 0xffff,
                           ntohs(tp->protocol));
            return TC_ACT_SHOT;
        }
        skb->tc_verd = SET_TC_VERD(skb->tc_verd, verd);
        goto reclassify;
    }
#endif
    return err;
}
int tc_classify_compat(struct sk_buff *skb, const struct tcf_proto *tp,
               struct tcf_result *res)
{
    __be16 protocol = skb->protocol;
    int err;

    for (; tp; tp = rcu_dereference_bh(tp->next)) {
        if (tp->protocol != protocol &&
            tp->protocol != htons(ETH_P_ALL))
            continue;
        //u32_classify,匹配u32规则,如果命中则执行action,比如police 
        //tcf_act_police
        err = tp->classify(skb, tp, res); 

        if (err >= 0) {
#ifdef CONFIG_NET_CLS_ACT
            if (err != TC_ACT_RECLASSIFY && skb->tc_verd)
                skb->tc_verd = SET_TC_VERD(skb->tc_verd, 0);
#endif
            return err;
        }
    }
    return -1;
}
上一篇下一篇

猜你喜欢

热点阅读