traffic control 之 INGRESS 队列
ingress队列用于报文入方向处理,限速,丢包等。
下面分析
a. 命令行下发格式。了解命令行下发时如何填充参数,对理解kernel代码是很有帮助的。
b. 内核端解析命令行参数,创建队列规则和filter规则。
//添加ingress qdisc
tc qdisc add dev eth0 ingress
//在ingress qdisc上添加过滤规则
//parent ffff:是ingress的handle。
//protocol和prio是filter命令的参数。
//u32是一种filter,后面的参数是u32的私有参数,表示对匹配源ip为网段
//1.1.1.0/24的报文,执行action为police,即进行限速,超过范围的drop掉。
tc filter add dev eth0 parent ffff: protocol ip prio 10 u32 match ip src 1.1.1.0/24 police rate 2048kbps burst 1m drop
可以加-h查看指定kind的help信息,比如下面查看u32类型的help信息
root@ubuntu:~# tc filter add dev ens9 u32 -h
What is "-h"?
Usage: ... u32 [ match SELECTOR ... ] [ link HTID ] [ classid CLASSID ]
[ action ACTION_SPEC ] [ offset OFFSET_SPEC ]
[ ht HTID ] [ hashkey HASHKEY_SPEC ]
[ sample SAMPLE ] [skip_hw | skip_sw]
or u32 divisor DIVISOR
Where: SELECTOR := SAMPLE SAMPLE ...
SAMPLE := { ip | ip6 | udp | tcp | icmp | u{32|16|8} | mark }
SAMPLE_ARGS [ divisor DIVISOR ]
FILTERID := X:Y:Z
NOTE: CLASSID is parsed at hexadecimal input.
c. 入方向报文处理流程。
下面先附上一张kernel端ingress qdisc和filter数据结构的关联
image.png
1. tc下发命令格式
下面这个结构体是tc下发命令使用的格式,最终会下发给kernel
端,kernel按照相同的格式解析出来。其中第一部分struct nlmsghdr是netlink下发消息的公共部分,包含具体命令和flag,第二部分struct tcmsg是tc的私有结构,第三部分是一个很大的字符型数组,可以保存配置。最终下发内容长度以实际配置为准,基本不会下发TCA_BUF_MAX这么长的内容。
#define TCA_BUF_MAX (64*1024)
struct {
struct nlmsghdr n;
struct tcmsg t;
char buf[TCA_BUF_MAX];
}
对于tc子系统来说,buf可存储的内容为TCA_MAX 个struct nlattr,每个struct nlattr保存一种配置,每种配置还能嵌套配置。
enum {
TCA_UNSPEC,
TCA_KIND, //qdisc种类
TCA_OPTIONS, //qdisc的私有选项
TCA_STATS,
TCA_XSTATS,
TCA_RATE,
TCA_FCNT,
TCA_STATS2,
TCA_STAB,
__TCA_MAX
};
#define TCA_MAX (__TCA_MAX - 1)
struct nlmsghdr + struct tcmsg + struct nlattr[TCA_MAX + 1]
2. 设置ingress qdisc
这里看一下执行下面命令创建ingress qdisc时,命令行和kernel端代码
tc qdisc add dev eth0 ingress
命令行端代码
//iproute2/tc/tc_qdisc.c
static int tc_qdisc_modify(int cmd, unsigned int flags, int argc, char **argv)
struct {
struct nlmsghdr n;
struct tcmsg t;
char buf[TCA_BUF_MAX];
} req = {
.n.nlmsg_len = NLMSG_LENGTH(sizeof(struct tcmsg)),
.n.nlmsg_flags = NLM_F_REQUEST | , NLM_F_EXCL|NLM_F_CREATE,
.n.nlmsg_type = RTM_NEWQDISC,
.t.tcm_family = AF_UNSPEC,
//固定值
.t.tcm_parent = TC_H_INGRESS,
//取TC_H_INGRESS的高16位
.t.tcm_handle = TC_H_MAKE(TC_H_INGRESS, 0),
.t.tcm_ifindex= ll_name_to_index("eth0"),
};
//buf部分只需要携带kind即可,没有额外的参数
//kind 为 "ingress"
addattr_l(&req.n, sizeof(req), TCA_KIND, k, strlen(k)+1);
//最终调用此函数将请求内容发送到kernel端
rtnl_talk(&rth, &req.n, NULL)
kernel端只保留和ingress相关代码
//linux/net/sched/sck_api.c
static int tc_modify_qdisc(struct sk_buff *skb, struct nlmsghdr *n)
struct net_device *dev;
struct Qdisc *q;
struct tcmsg *tcm;
struct nlattr *tca[TCA_MAX + 1];
//解析出配置的参数,保存到tca中
nlmsg_parse(n, sizeof(*tcm), tca, TCA_MAX, NULL);
tcm = nlmsg_data(n);
clid = tcm->tcm_parent;
dev = __dev_get_by_index(net, tcm->tcm_ifindex);
//a. 获取 ingress queue。ingress qdisc会保存到ingress queue中
if (dev_ingress_queue_create(dev)) {
q = dev_ingress_queue(dev)->qdisc_sleeping;
//b. 创建qdisc
if (clid == TC_H_INGRESS) {
if (dev_ingress_queue(dev))
q = qdisc_create(dev, dev_ingress_queue(dev), p,
tcm->tcm_parent, tcm->tcm_parent,
tca, &err);
else
err = -ENOENT;
}
//c. 将新创建的qdisc赋给ingress queue
qdisc_graft(dev, p, skb, n, clid, q, NULL);
a. 获取ingress_queue, 如果是第一次执行则创建
struct netdev_queue *dev_ingress_queue_create(struct net_device *dev)
{
struct netdev_queue *queue = dev_ingress_queue(dev);
#ifdef CONFIG_NET_CLS_ACT
if (queue)
return queue;
queue = kzalloc(sizeof(*queue), GFP_KERNEL);
if (!queue)
return NULL;
netdev_init_one_queue(dev, queue, NULL);
queue->qdisc = &noop_qdisc;
queue->qdisc_sleeping = &noop_qdisc;
rcu_assign_pointer(dev->ingress_queue, queue);
#endif
return queue;
}
b. 创建qdisc,kind为ingress
static struct Qdisc *
qdisc_create(struct net_device *dev, struct netdev_queue *dev_queue, struct Qdisc *p, u32 parent, u32 handle, struct nlattr **tca, int *errp)
struct nlattr *kind = tca[TCA_KIND];
struct Qdisc *sch;
struct Qdisc_ops *ops;
//根据kind到链表qdisc_base查找是否已经加载,如果没有,则动态加载
//对于ingress来说,ops为ingress_qdisc_ops,需要加载sch_ingress.ko
ops = qdisc_lookup_ops(kind);
//申请qdisc内存
sch = qdisc_alloc(dev_queue, ops);
//设置parent,对于ingress来说,parent是TC_H_INGRESS(0xfffffff1)
sch->parent = parent;
if (handle == TC_H_INGRESS) {
//设置标志位 TCQ_F_INGRESS
sch->flags |= TCQ_F_INGRESS;
//获取handle值,TC_H_INGRESS的高16位
handle = TC_H_MAKE(TC_H_INGRESS, 0);
lockdep_set_class(qdisc_lock(sch), &qdisc_rx_lock);
sch->handle = handle;
//如果ops提供了init函数,则调用,对于ingress来说,为空
ops->init(sch, tca[TCA_OPTIONS])
//参考下面对于qdisc_list_add的注释,对于ingress来说,空操作
qdisc_list_add(sch);
return sch;
函数qdisc_list_add的作用是: parent不是 ROOT,并且parent不是 INGRESS 时才会插入链表 dev->qdisc,此链表首元素是根队列,比如下面的三种队列不会插入链表 dev->qdisc:
a. 根队列--parent为root(0xFFFFFFFFU) tc qdisc add dev xxx root ... 此命令会将新创建的qdisc赋给 dev->qdisc 当做首元素,后面创建了子qdisc才会插入此链表。并且新创建的qdisc也会赋给 struct net_device->_tx[i]
b. ingress 队列--parent为 ingress(TC_H_INGRESS) tc qdisc add dev xxx ingress 新创建的 qdisc 赋给 struct net_device->ingress_queue
c. clsact 队列--parent为 ingress(TC_H_INGRESS) tc qdisc add dev xxx clsact 新创建的 qdisc 赋给 struct net_device->ingress_queue
void qdisc_list_add(struct Qdisc *q)
{
if ((q->parent != TC_H_ROOT) && !(q->flags & TCQ_F_INGRESS)) {
struct Qdisc *root = qdisc_dev(q)->qdisc;
WARN_ON_ONCE(root == &noop_qdisc);
list_add_tail(&q->list, &root->list);
}
}
c. 将新创建的qdisc赋给ingress queue
static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
struct sk_buff *skb, struct nlmsghdr *n, u32 classid,
struct Qdisc *new, struct Qdisc *old)
//如果是ingress
num_q = dev->num_tx_queues;
if ((q && q->flags & TCQ_F_INGRESS) ||
(new && new->flags & TCQ_F_INGRESS)) {
num_q = 1;
ingress = 1;
if (!dev_ingress_queue(dev))
return -ENOENT;
}
for (i = 0; i < num_q; i++) {
struct netdev_queue *dev_queue = dev_ingress_queue(dev);
if (!ingress)
dev_queue = netdev_get_tx_queue(dev, i);
//先将new qdisc赋给 dev_queue->qdisc_sleeping
old = dev_graft_qdisc(dev_queue, new);
if (new && i > 0)
atomic_inc(&new->refcnt);
if (!ingress)
qdisc_destroy(old);
}
//如果网卡是up的,才会将dev_queue->qdisc_sleeping赋给
//dev_queue->qdisc,处理报文时,用的也是dev_queue->qdisc。
if (dev->flags & IFF_UP)
dev_activate(dev);
struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue,
struct Qdisc *qdisc)
{
struct Qdisc *oqdisc = dev_queue->qdisc_sleeping;
spinlock_t *root_lock;
root_lock = qdisc_lock(oqdisc);
spin_lock_bh(root_lock);
/* Prune old scheduler */
if (oqdisc && atomic_read(&oqdisc->refcnt) <= 1)
qdisc_reset(oqdisc);
/* ... and graft new one */
if (qdisc == NULL)
qdisc = &noop_qdisc;
//将new qdisc赋给dev_queue->qdisc_sleeping
dev_queue->qdisc_sleeping = qdisc;
rcu_assign_pointer(dev_queue->qdisc, &noop_qdisc);
spin_unlock_bh(root_lock);
return oqdisc;
}
3. 设置filter
执行下面命令给ingress qdisc设置filter。
tc filter add dev eth0 parent ffff: protocol ip prio 10 u32 match ip src 1.1.1.0/24 police rate 2048kbps burst 1m drop
parent ffff:是ingress的handle。
protocol和prio是filter命令的参数。
u32是一种filter,后面的参数是u32的私有参数,表示对源ip为网段1.1.1.0/24的报文,执行限速,超过范围的drop掉。
命令行端代码
//iproute2/tc/tc_filter.c
static int tc_filter_modify(int cmd, unsigned int flags, int argc, char **argv)
struct {
struct nlmsghdr n;
struct tcmsg t;
char buf[MAX_MSG];
} req = {
.n.nlmsg_len = NLMSG_LENGTH(sizeof(struct tcmsg)),
.n.nlmsg_flags = NLM_F_REQUEST | NLM_F_EXCL|NLM_F_CREATE,
.n.nlmsg_type = RTM_NEWTFILTER,
.t.tcm_family = AF_UNSPEC,
};
req.t.tcm_ifindex = ll_name_to_index("eth0");
req.t.tcm_parent = handle; //parent ffff: 和 0000 组合成 ffff0000
req.t.tcm_info = TC_H_MAKE(prio<<16, protocol); //prio和protocol组合
addattr_l(&req.n, sizeof(req), TCA_KIND, k, strlen(k)+1); //kind为u32
addattr_l(n, MAX_MSG, TCA_U32_SEL, &sel, sizeof(sel.sel) +
sel.sel.nkeys * sizeof(struct tc_u32_key)); //u32的参数
kernel端代码
//linux/net/sched/cls_api.c
static int tc_ctl_tfilter(struct sk_buff *skb, struct nlmsghdr *n)
struct nlattr *tca[TCA_MAX + 1];
struct tcmsg *t;
struct Qdisc *q;
struct net_device *dev;
struct tcf_proto __rcu **back;
struct tcf_proto __rcu **chain;
struct tcf_proto *tp;
const struct tcf_proto_ops *tp_ops;
const struct Qdisc_class_ops *cops;
nlmsg_parse(n, sizeof(*t), tca, TCA_MAX, NULL);
t = nlmsg_data(n);
protocol = TC_H_MIN(t->tcm_info);
prio = TC_H_MAJ(t->tcm_info);
nprio = prio;
parent = t->tcm_parent;
dev = __dev_get_by_index(net, t->tcm_ifindex);
q = qdisc_lookup(dev, TC_H_MAJ(t->tcm_parent));
//如果cl_ops没有提供函数tcf_chain,则说明不支持设置filter
const struct Qdisc_class_ops *cops;
cops = q->ops->cl_ops;
if (cops->tcf_chain == NULL)
return -EOPNOTSUPP;
//如果parent低16指定了子类id,class id.
//但是对于ingress qdisc来说,是没有分类的,而且从命令行
//下发部分可知,parent低16位为0
/* Do we search for filter, attached to class? */
if (TC_H_MIN(parent)) {
cl = cops->get(q, parent);
if (cl == 0)
return -ENOENT;
}
//对于ingress来说,直接返回链表头
//struct ingress_qdisc_data *p = qdisc_priv(sch);
//return &p->filter_list;
/* And the last stroke */
struct tcf_proto __rcu **chain;
chain = cops->tcf_chain(q, cl);
//根据优先级查找是否已经存在tp,如果不存在则创建,如果存在则更新
/* Check the chain for existence of proto-tcf with this priority */
for (back = chain;
(tp = rtnl_dereference(*back)) != NULL;
back = &tp->next) {
if (tp->prio >= prio) {
if (tp->prio == prio) {
if (!nprio ||
(tp->protocol != protocol && protocol))
goto errout;
} else
tp = NULL;
break;
}
}
if (tp == NULL) {
/* Proto-tcf does not exist, create new one */
if (tca[TCA_KIND] == NULL || !protocol)
goto errout;
err = -ENOENT;
if (n->nlmsg_type != RTM_NEWTFILTER ||
!(n->nlmsg_flags & NLM_F_CREATE))
goto errout;
/* Create new proto tcf */
err = -ENOBUFS;
tp = kzalloc(sizeof(*tp), GFP_KERNEL);
if (tp == NULL)
goto errout;
err = -ENOENT;
//查找kind。kind通过register_tcf_proto_ops注册。比如u32类型的
//在加载modprobe cls_u32时注册。
tp_ops = tcf_proto_lookup_ops(tca[TCA_KIND]);
tp->ops = tp_ops;
tp->protocol = protocol;
tp->prio = nprio ? :TC_H_MAJ(tcf_auto_prio(rtnl_dereference(*back)));
tp->q = q;
tp->classify = tp_ops->classify; //u32_classify
tp->classid = parent;
err = tp_ops->init(tp); //u32_init
tp_created = 1;
}
if (tp_created) {
RCU_INIT_POINTER(tp->next, rtnl_dereference(*back));
rcu_assign_pointer(*back, tp);
}
4. ingress处理数据包流程
ingress方向的数据包在协议栈入口函数__netif_receive_skb_core中被handle_ing处理。
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
#ifdef CONFIG_NET_CLS_ACT
skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
if (!skb)
goto out;
ncls:
#endif
如果skb经过handle_ing处理后被丢了,则返回NULL,否则返回原始报文。
static inline struct sk_buff *handle_ing(struct sk_buff *skb,
struct packet_type **pt_prev,
int *ret, struct net_device *orig_dev)
{
//取出 ingress_queue
struct netdev_queue *rxq = rcu_dereference(skb->dev->ingress_queue);
//如果rxq为空或者rxq上的qdisc为noop_qdisc,说明没设置ingress qdisc
if (!rxq || rcu_access_pointer(rxq->qdisc) == &noop_qdisc)
goto out;
if (*pt_prev) {
*ret = deliver_skb(skb, *pt_prev, orig_dev);
*pt_prev = NULL;
}
//调用ing_filter处理
switch (ing_filter(skb, rxq)) {
case TC_ACT_SHOT:
case TC_ACT_STOLEN:
kfree_skb(skb);
return NULL;
}
out:
skb->tc_verd = 0;
return skb;
}
static int ing_filter(struct sk_buff *skb, struct netdev_queue *rxq)
struct net_device *dev = skb->dev;
int result = TC_ACT_OK;
//取出qdisc,如果不是无效值,则将skb入队
q = rcu_dereference(rxq->qdisc);
if (q != &noop_qdisc) {
spin_lock(qdisc_lock(q));
if (likely(!test_bit(__QDISC_STATE_DEACTIVATED, &q->state)))
result = qdisc_enqueue_root(skb, q);
spin_unlock(qdisc_lock(q));
}
return result;
static inline int qdisc_enqueue_root(struct sk_buff *skb, struct Qdisc *sch)
{
qdisc_skb_cb(skb)->pkt_len = skb->len;
return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
}
static inline int qdisc_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
qdisc_calculate_pkt_len(skb, sch);
//对于ingress来说,enqueue就是ingress_enqueue
return sch->enqueue(skb, sch);
}
ingress qidsc的入队操作,虽然名字就入队,但实际上是执行filter
static int ingress_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
struct ingress_qdisc_data *p = qdisc_priv(sch);
struct tcf_result res;
//获取filter链表头
struct tcf_proto *fl = rcu_dereference_bh(p->filter_list);
int result;
result = tc_classify(skb, fl, &res);
}
//遍历filter链表,执行每个filter的classify函数,检查设置的参数
int tc_classify(struct sk_buff *skb, const struct tcf_proto *tp,
struct tcf_result *res)
{
int err = 0;
#ifdef CONFIG_NET_CLS_ACT
const struct tcf_proto *otp = tp;
reclassify:
#endif
err = tc_classify_compat(skb, tp, res);
#ifdef CONFIG_NET_CLS_ACT
if (err == TC_ACT_RECLASSIFY) {
u32 verd = G_TC_VERD(skb->tc_verd);
tp = otp;
if (verd++ >= MAX_REC_LOOP) {
net_notice_ratelimited("%s: packet reclassify loop rule prio %u protocol %02x\n",
tp->q->ops->id,
tp->prio & 0xffff,
ntohs(tp->protocol));
return TC_ACT_SHOT;
}
skb->tc_verd = SET_TC_VERD(skb->tc_verd, verd);
goto reclassify;
}
#endif
return err;
}
int tc_classify_compat(struct sk_buff *skb, const struct tcf_proto *tp,
struct tcf_result *res)
{
__be16 protocol = skb->protocol;
int err;
for (; tp; tp = rcu_dereference_bh(tp->next)) {
if (tp->protocol != protocol &&
tp->protocol != htons(ETH_P_ALL))
continue;
//u32_classify,匹配u32规则,如果命中则执行action,比如police
//tcf_act_police
err = tp->classify(skb, tp, res);
if (err >= 0) {
#ifdef CONFIG_NET_CLS_ACT
if (err != TC_ACT_RECLASSIFY && skb->tc_verd)
skb->tc_verd = SET_TC_VERD(skb->tc_verd, 0);
#endif
return err;
}
}
return -1;
}