dpvs学习笔记: 9 arp和路由
相对于传统网卡,net_dev 在内核层,而 dpvs 在用户层抽像了一个网卡。所以相应的,arp 和 路由也要有,又因为是 dpdk 程序,每个核尽可能不和其它核交互,所以就需要在核之间广播,这是大前提。
arp和路由的作用
网络方面菜鸟,更专业的建义看其它分享~~ 数据包流入网卡时,协义栈先看 mac 地址是否是本机,是的话向上层传递,不是丢弃或转发。三层 ip 拿到数据包后,判断 ip 是否是本机,做同样操作。数据包写到协义栈时,查找路由,先择下一跳的 ip 地址,然后通过 arp 缓存表拿到对应的 mac, 填充到数据帧,发送到网卡。
为什么不能只有 ip 或是只有 mac 地址,非要结合呢?这个问题比较大,mac 址址机器唯一的(确实可以修改),同一个物理网络之间传输数据没问题,但是全世界网络设备都互联,那这个物理网络传输的数据多到爆炸。引入 ip 的好处是带来了路由,也就是定位的功能,访问某个 ip 时会经过很多跳(hop),这就是 traceroute 路由的功能。但是 ip 地址是会改变的,所以不能只有 ip,需要结合。
初始化arp
初始化 main 调用 inet_init
时, 调用 neigh_init
初始化 arp, 调用 route_init
初始化路由。先看 arp
static int arp_init(void)
{
int i, j;
int err;
uint64_t lcore_mask;
lcoreid_t cid;
for (i = 0; i < DPVS_MAX_LCORE; i++) {
for (j = 0; j < ARP_TAB_SIZE; j++) {
INIT_LIST_HEAD(&neigh_table[i][j]);
}
}
/*choose one core to sync master*/
netif_get_slave_lcores(NULL, &lcore_mask);
for (cid = 0 ; cid < DPVS_MAX_LCORE; cid++) {
if (lcore_mask & (1L << cid)) {
g_cid = cid;
break;
}
}
master_cid = rte_lcore_id();
arp_pkt_type.type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
if ((err = netif_register_pkt(&arp_pkt_type)) != EDPVS_OK)
return err;
if ((err = sockopt_register(&neigh_sockopts)) != EDPVS_OK)
return err;
neigh_ring_init();
/*get static arp entry from master*/
snprintf(neigh_sync_job.name, sizeof(neigh_sync_job.name) - 1, "%s", "neigh_sync");
neigh_sync_job.func = neigh_process_ring;
neigh_sync_job.data = NULL;
neigh_sync_job.type = NETIF_LCORE_JOB_SLOW;
neigh_sync_job.skip_loops = NEIGH_PROCESS_MAC_RING_INTERVAL;
err = netif_lcore_loop_job_register(&neigh_sync_job);
if (err != EDPVS_OK)
return err;
return EDPVS_OK;
}
- neigh_table 二维数组,每个 lcore 都要有自己的本地 arp 缓存表,neigh_table[i][j] 又是个链表,相当于固定长度的哈希桶。
- 选举一个 slave 核,设置到全局变量 g_cid
- 注册网卡 arp_pkt_type 数据包类型的处理回调,注册管理接口处理回调 neigh_sockopts
- 注册 loop 任务,每个 lcore 在大循环时,都会运行函数
neigh_process_ring
如何处理 arp 包
每个 lcore 都会启动大 loop, lcore_job_recv_fwd
负责转发所有数据,在调用 lcore_process_packets
处理数据包时,netif_deliver_mbuf
会截住 arp 包。
static inline int netif_deliver_mbuf(struct rte_mbuf *mbuf,
uint16_t eth_type,
struct netif_port *dev,
struct netif_queue_conf *qconf,
bool forward2kni,
lcoreid_t cid,
bool pkts_from_ring)
{
struct pkt_type *pt;
int err;
uint16_t data_off;
assert(mbuf->port <= NETIF_MAX_PORTS);
assert(dev != NULL);
pt = pkt_type_get(eth_type, dev);
/*clone arp pkt to every queue*/
if (pt->type == rte_cpu_to_be_16(ETHER_TYPE_ARP) && !pkts_from_ring) {
struct rte_mempool *mbuf_pool;
struct rte_mbuf *mbuf_clone;
uint8_t i;
struct arp_hdr *arp;
unsigned socket_id;
socket_id = rte_socket_id();
mbuf_pool = pktmbuf_pool[socket_id];
rte_pktmbuf_adj(mbuf, sizeof(struct ether_hdr));
arp = rte_pktmbuf_mtod(mbuf, struct arp_hdr *);
rte_pktmbuf_prepend(mbuf,(uint16_t)sizeof(struct ether_hdr));
if (rte_be_to_cpu_16(arp->arp_op) == ARP_OP_REPLY) {
for (i = 0; i < DPVS_MAX_LCORE; i++) {
if ((i == cid) || (!is_lcore_id_fwd(i))
|| (i == rte_get_master_lcore()))
continue;
/*rte_pktmbuf_clone will not clone pkt.data, just copy pointer!*/
mbuf_clone = rte_pktmbuf_clone(mbuf, mbuf_pool);
if (mbuf_clone) {
int ret = rte_ring_enqueue(arp_ring[i], mbuf_clone);
if (unlikely(-EDQUOT == ret)) {
RTE_LOG(WARNING, NETIF, "%s: arp ring of lcore %d quota exceeded\n",
__func__, i);
}
else if (ret < 0) {
RTE_LOG(WARNING, NETIF, "%s: arp ring of lcore %d enqueue failed\n",
__func__, i);
rte_pktmbuf_free(mbuf_clone);
}
}
}
}
}
mbuf->l2_len = sizeof(struct ether_hdr);
/* Remove ether_hdr at the beginning of an mbuf */
data_off = mbuf->data_off;
// 调整 mbuf 指向三层 ip 层
if (unlikely(NULL == rte_pktmbuf_adj(mbuf, sizeof(struct ether_hdr))))
return EDPVS_INVPKT;
err = pt->func(mbuf, dev);
if (err == EDPVS_KNICONTINUE) {
if (pkts_from_ring || forward2kni) {
rte_pktmbuf_free(mbuf);
return EDPVS_OK;
}
if (likely(NULL != rte_pktmbuf_prepend(mbuf,
(mbuf->data_off - data_off)))) {
kni_ingress(mbuf, dev, qconf);
} else {
rte_pktmbuf_free(mbuf);
}
}
return EDPVS_OK;
}
- 判断 arp_op 是否是 ARP_OP_REPLY,是的话,需要 clone mbuf, 然后调用
rte_ring_enqueue
发送到其它每个核一份。这里没上锁,可能 rte_ring 本身是安全的,有机会研究下 - 调用 pt->func 处理 mbuf, 由前文可知,netif_register_pkt(&arp_pkt_type)) 注册了 arp 处理接口,对应回调是
neigh_resolve_input
int neigh_resolve_input(struct rte_mbuf *m, struct netif_port *port)
{
struct arp_hdr *arp = rte_pktmbuf_mtod(m, struct arp_hdr *);
struct ether_hdr *eth;
uint32_t ipaddr;
struct neighbour_entry *neighbour = NULL;
unsigned int hashkey;
struct route_entry *rt = NULL;
rt = route4_local(arp->arp_data.arp_tip, port);
if(!rt){
return EDPVS_KNICONTINUE;
}
route4_put(rt);
eth = (struct ether_hdr *)rte_pktmbuf_prepend(m,
(uint16_t)sizeof(struct ether_hdr));
if (rte_be_to_cpu_16(arp->arp_op) == ARP_OP_REQUEST) {
ether_addr_copy(ð->s_addr, ð->d_addr);
rte_memcpy(ð->s_addr, &port->addr, 6);
arp->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY);
ether_addr_copy(&arp->arp_data.arp_sha, &arp->arp_data.arp_tha);//from to
ether_addr_copy(ð->s_addr, &arp->arp_data.arp_sha);
ipaddr = arp->arp_data.arp_sip;
arp->arp_data.arp_sip = arp->arp_data.arp_tip;
arp->arp_data.arp_tip = ipaddr;
m->l2_len = sizeof(struct ether_hdr);
m->l3_len = sizeof(struct arp_hdr);
netif_xmit(m, port);
return EDPVS_OK;
} else if(arp->arp_op == htons(ARP_OP_REPLY)) {
ipaddr = arp->arp_data.arp_sip;
hashkey = neigh_hashkey(ipaddr, port);
neighbour = neigh_lookup_entry(&ipaddr, port, hashkey);
if (neighbour && !(neighbour->flag & NEIGHBOUR_STATIC)) {
neigh_edit(neighbour, &arp->arp_data.arp_sha, hashkey);
neigh_entry_state_trans(neighbour, 1);
} else {
neighbour = neigh_add_table(ipaddr, &arp->arp_data.arp_sha, port, hashkey, 0);
if(!neighbour){
RTE_LOG(ERR, NEIGHBOUR, "[%s] add neighbour wrong\n", __func__);
rte_pktmbuf_free(m);
return EDPVS_NOMEM;
}
neigh_entry_state_trans(neighbour, 1);
}
neigh_send_mbuf_cach(neighbour);
return EDPVS_KNICONTINUE;
} else {
rte_pktmbuf_free(m);
return EDPVS_DROP;
}
}
- 判断 arp 请求类型,如果是 ARP_OP_REQUEST,生成 arp 应答包,调用
netif_xmit
发送出去 - 如果数据包是 ARP_OP_REPLY 应答,neigh_hashkey(ipaddr, port) 根据源 ip 和网卡生成 hashkey, 如果是静态 arp,直接更新将 hashkey 对应的更新成 arp_sha 即源 hardware address. 调用
neigh_add_table
回到 arp 表中
static struct neighbour_entry *
neigh_add_table(uint32_t ipaddr, const struct ether_addr* eth_addr,
struct netif_port* port, unsigned int hashkey, int flag)
{
struct neighbour_entry *new_neighbour=NULL;
struct in_addr *ip_addr = (struct in_addr*)&ipaddr;
struct timeval delay;
lcoreid_t cid = rte_lcore_id();
new_neighbour = rte_zmalloc("new_neighbour_entry",
sizeof(struct neighbour_entry), RTE_CACHE_LINE_SIZE);
if(new_neighbour == NULL)
return NULL;
rte_memcpy(&new_neighbour->ip_addr, ip_addr,
sizeof(struct in_addr));
new_neighbour->flag = flag;
if(eth_addr){
rte_memcpy(&new_neighbour->eth_addr, eth_addr, 6);
new_neighbour->state = DPVS_NUD_S_REACHABLE;
}
else{
new_neighbour->state = DPVS_NUD_S_NONE;
}
new_neighbour->port = port;
new_neighbour->que_num = 0;
delay.tv_sec = nud_timeouts[new_neighbour->state];
delay.tv_usec = 0;
INIT_LIST_HEAD(&new_neighbour->queue_list);
if (!(new_neighbour->flag & NEIGHBOUR_STATIC) && cid != master_cid) {
dpvs_timer_sched(&new_neighbour->timer, &delay,
neighbour_timer_event, new_neighbour, false);
}
if ((g_cid == cid) && !(new_neighbour->flag & NEIGHBOUR_STATIC)) {
struct raw_neigh *mac_param;
mac_param = neigh_ring_clone_entry(new_neighbour, 1);
if (mac_param) {
int ret = rte_ring_enqueue(neigh_ring[master_cid], mac_param);
if (unlikely(-EDQUOT == ret))
RTE_LOG(WARNING, NEIGHBOUR, "%s: neigh ring quota exceeded\n",
__func__);
else if (ret < 0) {
rte_free(mac_param);
RTE_LOG(WARNING, NETIF, "%s: neigh ring enqueue failed\n",
__func__);
}
}
else
RTE_LOG(WARNING, NEIGHBOUR, "%s: clone ring param faild\n", __func__);
}
neigh_hash(new_neighbour, hashkey);
return new_neighbour;
}
- 创建新的 neighbour_entry arp 条目
- 根据是否可达设置 new_neighbour->state
- 加到定时器,如果处于 DPVS_NUD_S_NONE 状态,
neighbour_timer_event
会将条目删除 - 如果当前核是初始化时挑选的 g_cid,并且不是静态,调用 rte_ring_enqueue 入队,扔到 master lcore 队列。
- 调用 neigh_hash 将 arp 条目添加到 arp 表中。
master lcore 处理 arp
回到前文,arp_init 时会注册一个 loop 任务 neigh_process_ring
,负责处理 rte_ring 里的 arp 包
void neigh_process_ring(void *arg)
{
struct raw_neigh *params[NETIF_MAX_PKT_BURST];
uint16_t nb_rb;
unsigned int hash;
struct neighbour_entry *neigh;
struct raw_neigh *param;
lcoreid_t cid = rte_lcore_id();
nb_rb = rte_ring_dequeue_burst(neigh_ring[cid], (void **)params, NETIF_MAX_PKT_BURST, NULL);
if (nb_rb > 0) {
int i;
for (i = 0; i < nb_rb; i++) {
param = params[i];
hash = neigh_hashkey(param->ip_addr.s_addr, param->port);
neigh = neigh_lookup_entry(¶m->ip_addr.s_addr, param->port, hash);
if (param->add) {
if (neigh) {
neigh_edit(neigh, ¶m->eth_addr, hash);
}
else {
neigh = neigh_add_table(param->ip_addr.s_addr, ¶m->eth_addr,
param->port, hash, param->flag);
if ((cid == master_cid)&&(neigh)) {
num_neighbours++;
}
}
}
else {
if (neigh) {
if (!(neigh->flag & NEIGHBOUR_STATIC) &&
(cid != master_cid))
dpvs_timer_cancel(&neigh->timer, false);
neigh_unhash(neigh);
struct neighbour_mbuf_entry *mbuf, *mbuf_next;
list_for_each_entry_safe(mbuf, mbuf_next,
&neigh->queue_list, neigh_mbuf_list) {
list_del(&mbuf->neigh_mbuf_list);
rte_pktmbuf_free(mbuf->m);
rte_free(mbuf);
}
rte_free(neigh);
if (cid == master_cid)
num_neighbours--;
}
else
RTE_LOG(WARNING, NEIGHBOUR, "%s: not exist\n", __func__);
}
rte_free(param);
}
}
}
-
rte_ring_dequeue_burst
从每个核的 neigh_ring 中取出最多 NETIF_MAX_PKT_BURST 条数据。 - 处理裸数据包,如果类型是 add,将之添加。
- 其它类型的调用
neigh_unhash
从表中删除。
初始化路由
初始化 main 调用 inet_init
时, 调用 neigh_init
初始化 arp, 调用 route_init
初始化路由。调用 inet_addr_init
设置路由对外接口
int route_init(void)
{
int err;
lcoreid_t cid;
struct dpvs_msg_type msg_type;
rte_atomic32_set(&this_num_routes, 0);
/* master core also need routes */
rte_eal_mp_remote_launch(route_lcore_init, NULL, CALL_MASTER);
RTE_LCORE_FOREACH_SLAVE(cid) {
if ((err = rte_eal_wait_lcore(cid)) < 0) {
RTE_LOG(WARNING, ROUTE, "%s: lcore %d: %s.\n",
__func__, cid, dpvs_strerror(err));
return err;
}
}
memset(&msg_type, 0, sizeof(struct dpvs_msg_type));
msg_type.type = MSG_TYPE_ROUTE_ADD;
msg_type.mode = DPVS_MSG_MULTICAST;
msg_type.cid = rte_lcore_id();
msg_type.unicast_msg_cb = route_add_msg_cb;
err = msg_type_mc_register(&msg_type);
if (err != EDPVS_OK) {
RTE_LOG(ERR, ROUTE, "%s: fail to register msg.\n", __func__);
return err;
}
memset(&msg_type, 0, sizeof(struct dpvs_msg_type));
msg_type.type = MSG_TYPE_ROUTE_DEL;
msg_type.mode = DPVS_MSG_MULTICAST;
msg_type.cid = rte_lcore_id();
msg_type.unicast_msg_cb = route_del_msg_cb;
err = msg_type_mc_register(&msg_type);
if (err != EDPVS_OK) {
RTE_LOG(ERR, ROUTE, "%s: fail to register msg.\n", __func__);
return err;
}
if ((err = sockopt_register(&route_sockopts)) != EDPVS_OK)
return err;
return EDPVS_OK;
}
- rte_eal_mp_remote_launch 在每个核调用
route_lcore_init
初始化路由 - sockopt_register 注册管理接口 route_sockopts
static int route_lcore_init(void *arg)
{
int i;
if (!rte_lcore_is_enabled(rte_lcore_id()))
return EDPVS_DISABLED;
for (i = 0; i < LOCAL_ROUTE_TAB_SIZE; i++)
INIT_LIST_HEAD(&this_local_route_table[i]);
INIT_LIST_HEAD(&this_net_route_table);
return EDPVS_OK;
}
再看一下 route_lcore_init
, 很简单,初始化两张路由表,具体作用后文再说。可以看到为了高性能,所有数据都是每个核一份,不用上锁。
int inet_addr_init(void)
{
int err, hash;
rte_rwlock_init(&in_addr_lock);
rte_rwlock_write_lock(&in_addr_lock);
for (hash = 0; hash < INET_ADDR_HSIZE; hash++)
INIT_LIST_HEAD(&in_addr_tab[hash]);
rte_rwlock_write_unlock(&in_addr_lock);
if ((err = sockopt_register(&ifa_sockopts)) != EDPVS_OK)
return err;
return EDPVS_OK;
}
初始化 in_addr_tab 链表,设置对外操作接口 ifa_sockopts
操作路由表项
由于 dpvs 抽像了网卡层,所以路由也是类似 ip net 的方式操作,可以参考官网说明,这里直接搂源码。控制入口函数是 sockopt_ctl
,程序 main 最后的 while 死循环会调用。系统命令通过 unix_socket 与 dpvs 通信。
int sockopt_ctl(__rte_unused void *arg)
{
int clt_fd;
int ret;
socklen_t clt_len;
struct sockaddr_un clt_addr;
struct dpvs_sockopts *skopt;
struct dpvs_sock_msg *msg;
struct dpvs_sock_msg_reply reply_hdr;
void *reply_data = NULL;
size_t reply_data_len = 0;
memset(&clt_addr, 0, sizeof(struct sockaddr_un));
clt_len = sizeof(clt_addr);
/* Note: srv_fd is nonblock */
clt_fd = accept(srv_fd, (struct sockaddr*)&clt_addr, &clt_len);
if (clt_fd < 0) {
if (EWOULDBLOCK != errno) {
RTE_LOG(WARNING, MSGMGR, "%s: Fail to accept client request\n", __func__);
}
return EDPVS_IO;
}
/* Note: clt_fd is block */
ret = sockopt_msg_recv(clt_fd, &msg);
if (unlikely(EDPVS_OK != ret)) {
close(clt_fd);
return ret;
}
skopt = sockopts_get(msg);
if (skopt) {
if (msg->type == SOCKOPT_GET)
ret = skopt->get(msg->id, msg->data, msg->len, &reply_data, &reply_data_len);
else if (msg->type == SOCKOPT_SET)
ret = skopt->set(msg->id, msg->data, msg->len);
if (ret < 0) {
/* assume that reply_data is freed by user when callback fails */
reply_data = NULL;
reply_data_len = 0;
RTE_LOG(INFO, MSGMGR, "%s: socket msg<type=%s, id=%d> callback failed\n",
__func__, msg->type == SOCKOPT_GET ? "GET" : "SET", msg->id);
}
memset(&reply_hdr, 0, sizeof(reply_hdr));
reply_hdr.version = SOCKOPT_VERSION;
reply_hdr.id = msg->id;
reply_hdr.type = msg->type;
reply_hdr.errcode = ret;
strncpy(reply_hdr.errstr, dpvs_strerror(ret), SOCKOPT_ERRSTR_LEN - 1);
reply_hdr.len = reply_data_len;
/* send response */
ret = sockopt_msg_send(clt_fd, &reply_hdr, reply_data, reply_data_len);
if (reply_data)
rte_free(reply_data);
if (EDPVS_OK != ret) {
sockopt_msg_free(msg);
close(clt_fd);
return ret;
}
}
sockopt_msg_free(msg);
close(clt_fd);
return EDPVS_OK;
}
- accept 阻塞等待连接
- sockopt_msg_recv 获取数据
- 判断 msg->type 类型,进行回调,路由操作的是
route_sockopt_set
路由操作入口
路由的添加删除需要命令行操作,最终调用的接口就是 route_sockopt_set
static int route_sockopt_set(sockoptid_t opt, const void *conf, size_t size)
{
struct dp_vs_route_conf *cf = (void *)conf;
struct netif_port *dev;
uint32_t flags = 0;
if (!conf || size < sizeof(*cf))
return EDPVS_INVAL;
if (cf->af != AF_INET && cf->af != AF_UNSPEC)
return EDPVS_NOTSUPP;
if (cf->scope == ROUTE_CF_SCOPE_HOST) {
flags |= RTF_LOCALIN;
if (inet_is_addr_any(cf->af, &cf->dst) || cf->plen != 32)
return EDPVS_INVAL;
}
else if (cf->scope == ROUTE_CF_SCOPE_KNI) {
flags |= RTF_KNI;
if (inet_is_addr_any(cf->af, &cf->dst) || cf->plen != 32)
return EDPVS_INVAL;
}
else {
flags |= RTF_FORWARD;
if (inet_is_addr_any(cf->af, &cf->dst))
flags |= RTF_DEFAULT;
}
dev = netif_port_get_by_name(cf->ifname);
if (!dev) /* no dev is OK ? */
return EDPVS_INVAL;
switch (opt) {
case SOCKOPT_SET_ROUTE_ADD:
return route_add(&cf->dst.in, cf->plen, flags,
&cf->via.in, dev, &cf->src.in, cf->mtu, cf->metric);
case SOCKOPT_SET_ROUTE_DEL:
return route_del(&cf->dst.in, cf->plen, flags,
&cf->via.in, dev, &cf->src.in, cf->mtu, cf->metric);
case SOCKOPT_SET_ROUTE_SET:
return EDPVS_NOTSUPP;
case SOCKOPT_SET_ROUTE_FLUSH:
return EDPVS_NOTSUPP;
default:
return EDPVS_NOTSUPP;
}
}
- 判断配置的是不是 AF_INET 网络协义,其它丢弃
- 根据配置的 cf->scope, 设置路由的作用域: RTF_LOCALIN ,RTF_KNI 这里还判断长度是否是 32,当前阅读的版本还不支持 ipv6
- 根据 opt 操作类型,分别调用 route_add 或是 route_del,其实这两个都会调用
route_add_del
实现,参数不同而己
static int route_add_del(bool add, struct in_addr* dest,
uint8_t netmask, uint32_t flag,
struct in_addr* gw, struct netif_port *port,
struct in_addr* src, unsigned long mtu,
short metric)
{
lcoreid_t cid = rte_lcore_id();
int err;
struct dpvs_msg *msg;
struct dp_vs_route_conf cf;
if (cid != rte_get_master_lcore()) {
RTE_LOG(INFO, ROUTE, "[%s] must set from master lcore\n", __func__);
return EDPVS_NOTSUPP;
}
/* set route on master lcore first */
if (add)
err = route_add_lcore(dest, netmask, flag, gw, port, src, mtu, metric);
else
err = route_del_lcore(dest, netmask, flag, gw, port, src, mtu, metric);
if (err != EDPVS_OK) {
RTE_LOG(INFO, ROUTE, "[%s] fail to set route\n", __func__);
return err;
}
/* set route on all slave lcores */
memset(&cf, 0, sizeof(struct dp_vs_route_conf));
if (dest)
cf.dst.in = *dest;
cf.plen = netmask;
cf.flags = flag;
if (gw)
cf.via.in = *gw;
snprintf(cf.ifname, sizeof(cf.ifname), "%s", port->name);
if (src)
cf.src.in = *src;
cf.mtu = mtu;
cf.metric = metric;
if (add)
msg = msg_make(MSG_TYPE_ROUTE_ADD, 0, DPVS_MSG_MULTICAST,
cid, sizeof(struct dp_vs_route_conf), &cf);
else
msg = msg_make(MSG_TYPE_ROUTE_DEL, 0, DPVS_MSG_MULTICAST,
cid, sizeof(struct dp_vs_route_conf), &cf);
err = multicast_msg_send(msg, 0/*DPVS_MSG_F_ASYNC*/, NULL);
if (err != EDPVS_OK) {
msg_destroy(&msg);
RTE_LOG(INFO, ROUTE, "[%s] fail to send multicast message\n", __func__);
return err;
}
msg_destroy(&msg);
return EDPVS_OK;
}
- cid != rte_get_master_lcore() 判断只有 master lcore 才可以操作路由
- 调用 route_add_lcore 或是 route_del_lcore 操作
- 调用
multicast_msg_send
将消息广播到哪呢?给其它 slave lcore, 因为 dpdk 程序,每个 lcore 都要有自己的本地数据,所以要广播一份。
路由操作入口 route_add_lcore
static int route_add_lcore(struct in_addr* dest,uint8_t netmask, uint32_t flag,
struct in_addr* gw, struct netif_port *port,
struct in_addr* src, unsigned long mtu,short metric)
{
if((flag & RTF_LOCALIN) || (flag & RTF_KNI))
return route_local_add(dest, netmask, flag, gw,
port, src, mtu, metric);
if((flag & RTF_FORWARD) || (flag & RTF_DEFAULT))
return route_net_add(dest, netmask, flag, gw,
port, src, mtu, metric);
return EDPVS_INVAL;
}
根据 scope 作用域,将路由添加到不同路由表。
static int route_local_add(struct in_addr* dest, uint8_t netmask, uint32_t flag,
struct in_addr* gw, struct netif_port *port,
struct in_addr* src, unsigned long mtu,short metric)
{
unsigned hashkey;
struct route_entry *route_node, *route;
hashkey = route_local_hashkey(*(uint32_t *)(dest),NULL);
list_for_each_entry(route_node, &this_local_route_table[hashkey], list){
if (net_cmp(port, dest->s_addr, netmask, route_node)
&& (dest->s_addr == route_node->dest.s_addr) ){
return EDPVS_EXIST;
}
}
route = route_new_entry(dest,netmask, flag,
gw, port, src, mtu,metric);
if (!route){
return EDPVS_NOMEM;
}
route_local_hash(route);
rte_atomic32_inc(&this_num_routes);
return EDPVS_OK;
}
根据目地址址做 hashkey,如果当前路由表里己经有了,返回报错,否则添加路由条目
static int route_net_add(struct in_addr *dest, uint8_t netmask, uint32_t flag,
struct in_addr *gw, struct netif_port *port,
struct in_addr *src, unsigned long mtu,short metric)
{
struct route_entry *route_node, *route;
list_for_each_entry(route_node, &this_net_route_table, list){
if (net_cmp(port, dest->s_addr, netmask, route_node)
&& (netmask == route_node->netmask)){
return EDPVS_EXIST;
}
if (route_node->netmask < netmask){
route = route_new_entry(dest,netmask, flag,
gw, port, src, mtu, metric);
if (!route){
return EDPVS_NOMEM;
}
__list_add(&route->list, (&route_node->list)->prev,
&route_node->list);
rte_atomic32_inc(&this_num_routes);
rte_atomic32_inc(&route->refcnt);
return EDPVS_OK;
}
}
route = route_new_entry(dest,netmask, flag,
gw, port, src, mtu, metric);
if (!route){
return EDPVS_NOMEM;
}
list_add_tail(&route->list,&this_net_route_table);
rte_atomic32_inc(&this_num_routes);
rte_atomic32_inc(&route->refcnt);
return EDPVS_OK;
}
添加 net 作用域的路由条目
路由操作入口 route_del_lcore
static int route_del_lcore(struct in_addr* dest,uint8_t netmask, uint32_t flag,
struct in_addr* gw, struct netif_port *port,
struct in_addr* src, unsigned long mtu,short metric)
{
struct route_entry *route = NULL;
int error;
if(flag & RTF_LOCALIN || (flag & RTF_KNI)){
route = route_local_lookup(dest->s_addr, port);
error = route_local_del(route);
return error;
}
if(flag & RTF_FORWARD || (flag & RTF_DEFAULT)){
route = route_net_lookup(port, dest, netmask);
error = route_net_del(route);
return error;
}
return EDPVS_INVAL;
}
先查找,如果找到了再删除,也没啥好说的
dpvs 如何使用路由
dp_vs_in
最后会发送数据包 xmit_inbound
或是 xmit_outbound
,涉及到回调函数 packet_xmit
和 packet_out_xmit
,虽然每种转发模式的不同,但是路由操作是一致的,就只看 fnat 下的 dp_vs_xmit_fnat
函数。
int dp_vs_xmit_fnat(struct dp_vs_proto *proto,
struct dp_vs_conn *conn,
struct rte_mbuf *mbuf)
{
struct flow4 fl4;
struct ipv4_hdr *iph = ip4_hdr(mbuf);
struct route_entry *rt;
int err, mtu;
......
memset(&fl4, 0, sizeof(struct flow4));
fl4.daddr = conn->daddr.in;
fl4.saddr = conn->laddr.in;
fl4.tos = iph->type_of_service;
rt = route4_output(&fl4);
if (!rt) {
err = EDPVS_NOROUTE;
goto errout;
}
/*
* didn't cache the pointer to rt
* or route can't be deleted when there is conn ref
* this is for neighbour confirm
*/
dp_vs_conn_cache_rt(conn, rt, true);
mtu = rt->mtu;
if (mbuf->pkt_len > mtu
&& (iph->fragment_offset & htons(IPV4_HDR_DF_FLAG))) {
RTE_LOG(DEBUG, IPVS, "%s: frag needed.\n", __func__);
icmp_send(mbuf, ICMP_DEST_UNREACH, ICMP_UNREACH_NEEDFRAG, htonl(mtu));
err = EDPVS_FRAG;
goto errout;
}
mbuf->userdata = rt;
/* after route lookup and before translation */
if (xmit_ttl) {
if (unlikely(iph->time_to_live <= 1)) {
icmp_send(mbuf, ICMP_TIME_EXCEEDED, ICMP_EXC_TTL, 0);
err = EDPVS_DROP;
goto errout;
}
iph->time_to_live--;
}
/* pre-handler before translation */
if (proto->fnat_in_pre_handler) {
err = proto->fnat_in_pre_handler(proto, conn, mbuf);
if (err != EDPVS_OK)
goto errout;
/*
* re-fetch IP header
* the offset may changed during pre-handler
*/
iph = ip4_hdr(mbuf);
}
/* L3 translation before l4 re-csum */
iph->hdr_checksum = 0;
iph->src_addr = conn->laddr.in.s_addr;
iph->dst_addr = conn->daddr.in.s_addr;
/* L4 FNAT translation */
if (proto->fnat_in_handler) {
err = proto->fnat_in_handler(proto, conn, mbuf);
if (err != EDPVS_OK)
goto errout;
}
if (likely(mbuf->ol_flags & PKT_TX_IP_CKSUM)) {
iph->hdr_checksum = 0;
} else {
ip4_send_csum(iph);
}
return INET_HOOK(INET_HOOK_LOCAL_OUT, mbuf, NULL, rt->port, ipv4_output);
}
- 根据 flow4 查找路由,route4_output 根据目标 ip 址址,先查 local 路由表,如果没有再查 net 路由表
-
dp_vs_conn_cache_rt
将路由信息绑定到 conn 结构体。确定出入网卡设备,确认下一跳 ip 地址 - mbuf->userdata = rt, 将路由绑定到 mbuf
- ipv4_output 最终查找 arp 表,填写 mac 地址后发到网卡
dpvs 如何使用路由 dp_vs_conn_cache_rt
先看一下 dp_vs_conn_cache_rt
函数实现,都如何绑定的
/*
* in: route to rs
* out:route to client
*/
static void dp_vs_conn_cache_rt(struct dp_vs_conn *conn, struct route_entry *rt, bool in)
{
if ((in && conn->in_dev && (conn->in_nexthop.in.s_addr == htonl(INADDR_ANY))) ||
(!in && conn->out_dev && (conn->out_nexthop.in.s_addr == htonl(INADDR_ANY))))
return;
if (in) {
conn->in_dev = rt->port;
if (rt->gw.s_addr == htonl(INADDR_ANY)) {
conn->in_nexthop.in = conn->daddr.in;
} else {
conn->in_nexthop.in = rt->gw;
}
} else {
conn->out_dev = rt->port;
if (rt->gw.s_addr == htonl(INADDR_ANY)) {
conn->out_nexthop.in = conn->caddr.in;
} else {
conn->out_nexthop.in = rt->gw;
}
}
}
- 首先 in 是判断流量方向,是从 client 到 rs, 还是相反。然后判断如果 in_nexthop 或是 out_nexthop 己经设置了那么退出。
- conn->in_dev = rt->port 设置路由对应哪个网卡
- 如果网关是一个通配地址,那么就直接用目的地址来代替,否则下一跳地址设置为网卡。
dpvs 如何使用路由 ipv4_output
一路看代码,最终调用 ipv4_output_fin2
static int ipv4_output_fin2(struct rte_mbuf *mbuf)
{
struct route_entry *rt = mbuf->userdata;
int err;
struct in_addr nexthop;
if (rt->gw.s_addr == htonl(INADDR_ANY))
nexthop.s_addr = ip4_hdr(mbuf)->dst_addr;
else
nexthop = rt->gw;
/**
* XXX:
* because lacking of suitable fields in mbuf
* (m.l3_type is only 4 bits, too short),
* m.packet_type is used to save ether_type
* e.g., 0x0800 for IPv4.
* note it was used in RX path for eth_type_t.
* really confusing.
*/
mbuf->packet_type = ETHER_TYPE_IPv4;
mbuf->l3_len = ip4_hdrlen(mbuf);
/* reuse @userdata/@udata64 for prio (used by tc:pfifo_fast) */
mbuf->udata64 = ((ip4_hdr(mbuf)->type_of_service >> 1) & 15);
err = neigh_resolve_output(&nexthop, mbuf, rt->port);
route4_put(rt);
return err;
}
- 通过之前设置的 mbuf->userdata 获取路由条目 route_entry
- 再次检测下一跳 nexthop 是否有效
- 根据 nexthop 调用
neigh_resolve_output
将 mbuf 发送到路由指定的网卡 rt-port
int neigh_resolve_output(struct in_addr *nexhop, struct rte_mbuf *m,
struct netif_port *port)
{
struct neighbour_entry *neighbour;
struct neighbour_mbuf_entry *m_buf;
unsigned int hashkey;
uint32_t nexhop_addr = nexhop->s_addr;
if (port->flag & NETIF_PORT_FLAG_NO_ARP)
return netif_xmit(m, port);
hashkey = neigh_hashkey(nexhop_addr, port);
neighbour = neigh_lookup_entry(&nexhop_addr, port, hashkey);
if (neighbour) {
if ((neighbour->state == DPVS_NUD_S_NONE) ||
(neighbour->state == DPVS_NUD_S_SEND)) {
if (neighbour->que_num > arp_unres_qlen) {
/*don't need arp request now,
since neighbour will not be confirmed
and it will be released late*/
rte_pktmbuf_free(m);
RTE_LOG(ERR, NEIGHBOUR, "[%s] arp_unres_queue is full, drop packet\n", __func__);
return EDPVS_DROP;
}
m_buf = rte_zmalloc("neigh_new_mbuf",
sizeof(struct neighbour_mbuf_entry), RTE_CACHE_LINE_SIZE);
if (!m_buf) {
rte_pktmbuf_free(m);
return EDPVS_DROP;
}
m_buf->m = m;
list_add_tail(&m_buf->neigh_mbuf_list, &neighbour->queue_list);
neighbour->que_num++;
if (neighbour->state == DPVS_NUD_S_NONE) {
neigh_arp_confirm(neighbour);
neigh_entry_state_trans(neighbour, 0);
}
return EDPVS_OK;
}
else if ((neighbour->state == DPVS_NUD_S_REACHABLE) ||
(neighbour->state == DPVS_NUD_S_PROBE) ||
(neighbour->state == DPVS_NUD_S_DELAY)) {
neigh_fill_mac(neighbour, m);
netif_xmit(m, neighbour->port);
if (neighbour->state == DPVS_NUD_S_PROBE) {
neigh_arp_confirm(neighbour);
neigh_entry_state_trans(neighbour, 0);
}
return EDPVS_OK;
}
return EDPVS_IDLE;
}
else{
neighbour = neigh_add_table(nexhop_addr, NULL, port, hashkey, 0);
if(!neighbour){
RTE_LOG(ERR, NEIGHBOUR, "[%s] add neighbour wrong\n", __func__);
rte_pktmbuf_free(m);
return EDPVS_NOMEM;
}
if(neighbour->que_num > arp_unres_qlen){
rte_pktmbuf_free(m);
return EDPVS_DROP;
}
m_buf = rte_zmalloc("neigh_new_mbuf",
sizeof(struct neighbour_mbuf_entry), RTE_CACHE_LINE_SIZE);
if(!m_buf){
rte_pktmbuf_free(m);
return EDPVS_DROP;
}
m_buf->m = m;
list_add_tail(&m_buf->neigh_mbuf_list, &neighbour->queue_list);
neighbour->que_num++;
if (neighbour->state == DPVS_NUD_S_NONE) {
neigh_arp_confirm(neighbour);
neigh_entry_state_trans(neighbour, 0);
}
return EDPVS_OK;
}
}
- 根据下一跳地址,网卡计算 hashkey, 从 arp 表中找到 neighbour
- 判断是否有对应的 arp 表项。如果 arp 表项处于可用状态 DPVS_NUD_S_REACHABLE 等等,那么调用
neigh_fill_mac
填充下一跳的 mac 后调用netif_xmit
发送数据包到网卡 - 如果有 arp 表项,但不可用,将 mbuf 入队,缓存下来等待下一次发送。队列满了丢弃。
- arp 表项不存在,或不可用,都可能会触发 arp 广播。
neigh_arp_confirm
发送 arp request 广播到网卡。修改 arp 表项状态并重置定时器
总结
暂时代码不涉及 ipv6, 大致 arp 和 路由交互操作就这些。关路出家,难免有错~~