ovs中handler和revalidator线程
给ovs添加网桥后,ovs-vswitchd进程就会自动生成若干个handler和revalidator线程,如下所示:
root@master:~# ovs-vsctl add-br br1
root@master:~# ovs-vsctl show
ed1aefeb-6dbe-4634-bda8-bf97dac313e5
Bridge "br1"
Port "br1"
Interface "br1"
type: internal
root@master:~# ps -ef | grep ovs
root 11443 1 0 22:33 ? 00:00:00 ovs-vswitchd unix:/usr/local/var/run/openvswitch/db.sock
root@master:~# top -H -p 11443
top - 22:40:15 up 1 day, 9:46, 1 user, load average: 0.20, 0.19, 0.13
Threads: 6 total, 0 running, 6 sleeping, 0 stopped, 0 zombie
%Cpu(s): 1.7 us, 1.7 sy, 0.0 ni, 96.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 7961.7 total, 4198.7 free, 427.4 used, 3335.5 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7247.0 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
11443 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.14 ovs-vswitchd
11737 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.00 urcu4
17118 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.00 handler24
17119 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.00 handler23
17120 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.00 revalidator25
17121 root 20 0 374132 4324 3396 S 0.0 0.1 0:00.00 revalidator22
handler和revalidator线程个数
线程个数由如下代码决定:
a. 如果配置了线程个数,则使用配置的值。使用如下命令配置,实时生效
ovs-vsctl --no-wait set Open_vSwitch . other_config:n-handler-threads=4
ovs-vsctl --no-wait set Open_vSwitch . other_config:n-revalidator-threads=4
b. 如果没有配置,则根据cpu个数来定。n_revalidators 等于cpu个数除4后加1,n_handlers 等于cpu个数减去n_revalidators 个数。
上面的测试环境只有四个cpu,所以有两个revalidator线程和两个handler。
static void
bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
ofproto_set_threads(
smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
smap_get_int(&ovs_cfg->other_config, "n-revalidator-threads", 0));
void
ofproto_set_threads(int n_handlers_, int n_revalidators_)
{
int threads = MAX(count_cpu_cores(), 2);
n_revalidators = MAX(n_revalidators_, 0);
n_handlers = MAX(n_handlers_, 0);
if (!n_revalidators) {
n_revalidators = n_handlers
? MAX(threads - (int) n_handlers, 1)
: threads / 4 + 1;
}
if (!n_handlers) {
n_handlers = MAX(threads - (int) n_revalidators, 1);
}
}
线程的作用
handler 线程
我们知道,ovs包含三个流表:microflow,megaflow和openflow。其中openflow流表是由用户或者controller配置的,其他两个流表是报文触发创建,一条流的首包查找microflow,megaflow失败后,会走慢速路径查找openflow流表(流表规则为normal的话,就会变为mac学习,根据fdb转发),然后将查找到的转发信息,再下发到microflow,megaflow,这样后面的报文就可以根据microflow,megaflow进行转发,这称为快速路径。这里的慢速路径的处理就是handler线程的工作内容。
有两点需要注意的是,一个是对于普通的ovs来说,microflow和megaflow存在于openvswitch.ko内核模块中,而对于ovs+dpdk来说,microflow和megaflow存在于用户态的ovs-vswitchd进程中。当然在ovs+dpdk模式下,也可以加载openvswitch.ko内核模块,这样两种模式可以共同存在,创建两种类型的桥。
另一个是handler线程只对普通ovs下,从内核upcall的消息进行处理。ovs+dpdk下,handler线程也存在,但是一直处于堵塞状态,实际上什么也没干。ovs+dpdk下的慢速路径的处理直接由收包线程执行,比如pmd线程。
revalidator线程
考虑下面这几种情况
a. 比如已经通过handler线程下发了一条megaflow流表(可通过ovs-appctl dpctl/dump-flows进行查看),如果一段时间内没有流量使用此流表,10s后就会被删除。
b. 又比如已经下发了一条megaflow流表,10s内将其对应的openflow流表删除,则这条megaflow流表也要立即被删除。
c. 再比如已经下发了一条megaflow流表,10s内修改其对应openflow流表的action后,则这条megaflow流表的action也要相应改变。
revalidator线程就是干上面的工作的,当然还有其他没考虑到的情况,总结来说就是revalidator线程用于megaflow流表的超时删除,响应openflow流表的改动,还有就是周期获取datapath流表的统计信息,一方面是为了确认流表是否被使用了,另一方面用来响应查看统计信息的命令(ovs-ofctl dump-flows br1)。
可以使用下面两条命令设置megaflow流表最大值(默认ofproto_flow_limit 20万)和流表超时时间(默认ofproto_max_idle 10s)
ovs-vsctl set Open_vSwitch . other_config:flow-limit=2
ovs-vsctl set Open_vSwitch . other_config:max-idle=1000000
revalidator线程在普通ovs和ovs+dpdk模式下都生效。
源码分析
handler线程
handler线程的主循环如下,只要exit_latch不设置就一直存在。
static void *
udpif_upcall_handler(void *arg)
{
struct handler *handler = arg;
struct udpif *udpif = handler->udpif;
while (!latch_is_set(&handler->udpif->exit_latch)) {
//recv_upcalls用于接收upcall消息。只有kernel path提供了recv函数dpif_netlink_recv,所以说handler线程对dpdk path不生效。
//recv_upcalls返回值大于0,说明已经处理过upcall消息,防止
//丢失后续的upcall消息,线程不能堵塞,需要再次调用recv_upcalls
//尝试接收upcall消息,所以需要调用poll_immediate_wake将
//timeout_when设置为最小值LLONG_MIN,这样poll_block调用poll函数就能立即返回,继续执行recv_upcalls。
//recv_upcalls返回值小于0,说明没有upcall消息,dpif_recv_wait调用等待upcall消息即可。
if (recv_upcalls(handler)) {
poll_immediate_wake();
} else {
//等待upcall消息
dpif_recv_wait(udpif->dpif, handler->handler_id);
//等待exit消息
latch_wait(&udpif->exit_latch);
}
//堵塞在poll函数上,超时时间timeout_when为最大值,等待事件发生
poll_block();
}
return NULL;
}
主处理函数recv_upcalls
static size_t
recv_upcalls(struct handler *handler)
{
struct udpif *udpif = handler->udpif;
uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8];
struct ofpbuf recv_bufs[UPCALL_MAX_BATCH];
struct dpif_upcall dupcalls[UPCALL_MAX_BATCH];
struct upcall upcalls[UPCALL_MAX_BATCH];
struct flow flows[UPCALL_MAX_BATCH];
size_t n_upcalls, i;
n_upcalls = 0;
//每次最多处理50个upcall消息
while (n_upcalls < UPCALL_MAX_BATCH) {
struct ofpbuf *recv_buf = &recv_bufs[n_upcalls];
struct dpif_upcall *dupcall = &dupcalls[n_upcalls];
struct upcall *upcall = &upcalls[n_upcalls];
struct flow *flow = &flows[n_upcalls];
unsigned int mru;
int error;
ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],
sizeof recv_stubs[n_upcalls]);
//从datapath接收upcall消息存储在dupcall中
if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {
ofpbuf_uninit(recv_buf);
break;
}
//将dupcall消息中的key转换到flow中
if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)
== ODP_FIT_ERROR) {
goto free_dupcall;
}
if (dupcall->mru) {
mru = nl_attr_get_u16(dupcall->mru);
} else {
mru = 0;
}
error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
dupcall->type, dupcall->userdata, flow, mru,
&dupcall->ufid, PMD_ID_NULL);
if (error) {
if (error == ENODEV) {
/* Received packet on datapath port for which we couldn't
* associate an ofproto. This can happen if a port is removed
* while traffic is being received. Print a rate-limited
* message in case it happens frequently. */
dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
dupcall->key_len, NULL, 0, NULL, 0,
&dupcall->ufid, PMD_ID_NULL, NULL);
VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
"port %"PRIu32, flow->in_port.odp_port);
}
goto free_dupcall;
}
upcall->key = dupcall->key;
upcall->key_len = dupcall->key_len;
upcall->ufid = &dupcall->ufid;
upcall->out_tun_key = dupcall->out_tun_key;
upcall->actions = dupcall->actions;
pkt_metadata_from_flow(&dupcall->packet.md, flow);
flow_extract(&dupcall->packet, flow);
//开始处理,根据flow信息查找openflow流表
error = process_upcall(udpif, upcall,
&upcall->odp_actions, &upcall->wc);
if (error) {
goto cleanup;
}
n_upcalls++;
continue;
cleanup:
upcall_uninit(upcall);
free_dupcall:
dp_packet_uninit(&dupcall->packet);
ofpbuf_uninit(recv_buf);
}
if (n_upcalls) {
//如果查找openflow成功,则将相关信息安装到datapath中,后续的报文根据datapath中的流表转发
handle_upcalls(handler->udpif, upcalls, n_upcalls);
for (i = 0; i < n_upcalls; i++) {
dp_packet_uninit(&dupcalls[i].packet);
ofpbuf_uninit(&recv_bufs[i]);
upcall_uninit(&upcalls[i]);
}
}
return n_upcalls;
}
upcall消息有两种类型DPIF_UC_MISS和DPIF_UC_ACTION,前者表示查找流表失败,需要走慢速路径查找openflow流表,后者表示流表的action为将报文上送。这里只看一下DPIF_UC_MISS的情况。
static int
process_upcall(struct udpif *udpif, struct upcall *upcall,
struct ofpbuf *odp_actions, struct flow_wildcards *wc)
{
const struct nlattr *userdata = upcall->userdata;
const struct dp_packet *packet = upcall->packet;
const struct flow *flow = upcall->flow;
size_t actions_len = 0;
enum upcall_type upcall_type = classify_upcall(upcall->type, userdata);
//upcall类型为 MISS_UPCALL
switch (upcall_type) {
case MISS_UPCALL:
upcall_xlate(udpif, upcall, odp_actions, wc);
return 0;
...
}
}
static void
upcall_xlate(struct udpif *udpif, struct upcall *upcall,
struct ofpbuf *odp_actions, struct flow_wildcards *wc)
{
struct dpif_flow_stats stats;
struct xlate_in xin;
stats.n_packets = 1;
stats.n_bytes = dp_packet_size(upcall->packet);
stats.used = time_msec();
stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
xlate_in_init(&xin, upcall->ofproto,
ofproto_dpif_get_tables_version(upcall->ofproto),
upcall->flow, upcall->in_port, NULL,
stats.tcp_flags, upcall->packet, wc, odp_actions);
if (upcall->type == DPIF_UC_MISS) {
xin.resubmit_stats = &stats;
if (xin.frozen_state) {
/* We may install a datapath flow only if we get a reference to the
* recirculation context (otherwise we could have recirculation
* upcalls using recirculation ID for which no context can be
* found). We may still execute the flow's actions even if we
* don't install the flow. */
upcall->recirc = recirc_id_node_from_state(xin.frozen_state);
upcall->have_recirc_ref = recirc_id_node_try_ref_rcu(upcall->recirc);
}
} else {
/* For non-miss upcalls, we are either executing actions (one of which
* is an userspace action) for an upcall, in which case the stats have
* already been taken care of, or there's a flow in the datapath which
* this packet was accounted to. Presumably the revalidators will deal
* with pushing its stats eventually. */
}
upcall->dump_seq = seq_read(udpif->dump_seq);
upcall->reval_seq = seq_read(udpif->reval_seq);
//关键函数,查找openflow流表
xlate_actions(&xin, &upcall->xout);
if (wc) {
/* Convert the input port wildcard from OFP to ODP format. There's no
* real way to do this for arbitrary bitmasks since the numbering spaces
* aren't the same. However, flow translation always exact matches the
* whole thing, so we can do the same here. */
WC_MASK_FIELD(wc, in_port.odp_port);
}
upcall->xout_initialized = true;
if (!upcall->xout.slow) {
ofpbuf_use_const(&upcall->put_actions,
odp_actions->data, odp_actions->size);
} else {
uint32_t smid = upcall->ofproto->up.slowpath_meter_id;
uint32_t cmid = upcall->ofproto->up.controller_meter_id;
/* upcall->put_actions already initialized by upcall_receive(). */
compose_slow_path(udpif, &upcall->xout, upcall->flow,
upcall->flow->in_port.odp_port,
&upcall->put_actions, smid, cmid);
}
/* This function is also called for slow-pathed flows. As we are only
* going to create new datapath flows for actual datapath misses, there is
* no point in creating a ukey otherwise. */
if (upcall->type == DPIF_UC_MISS) {
upcall->ukey = ukey_create_from_upcall(upcall, wc);
}
}
static void
handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
size_t n_upcalls)
{
//ops 为upcall个数的两倍,其中一个type为DPIF_OP_FLOW_PUT,用于安装flow,
//另一个为DPIF_OP_EXECUTE,用于将upcall的报文安装action处理掉
struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
struct ukey_op ops[UPCALL_MAX_BATCH * 2];
size_t n_ops, n_opsp, i;
/* Handle the packets individually in order of arrival.
*
* - For SLOW_CFM, SLOW_LACP, SLOW_STP, SLOW_BFD, and SLOW_LLDP,
* translation is what processes received packets for these
* protocols.
*
* - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
* controller.
*
* - For SLOW_ACTION, translation executes the actions directly.
*
* The loop fills 'ops' with an array of operations to execute in the
* datapath. */
n_ops = 0;
for (i = 0; i < n_upcalls; i++) {
struct upcall *upcall = &upcalls[i];
const struct dp_packet *packet = upcall->packet;
struct ukey_op *op;
//should_install_flow决定是否install flow到datapath,比如是否超了flow_limit
if (should_install_flow(udpif, upcall)) {
struct udpif_key *ukey = upcall->ukey;
if (ukey_install(udpif, ukey)) {
upcall->ukey_persists = true;
put_op_init(&ops[n_ops++], ukey, DPIF_FP_CREATE);
}
}
if (upcall->odp_actions.size) {
op = &ops[n_ops++];
op->ukey = NULL;
op->dop.type = DPIF_OP_EXECUTE;
op->dop.u.execute.packet = CONST_CAST(struct dp_packet *, packet);
op->dop.u.execute.flow = upcall->flow;
odp_key_to_dp_packet(upcall->key, upcall->key_len,
op->dop.u.execute.packet);
op->dop.u.execute.actions = upcall->odp_actions.data;
op->dop.u.execute.actions_len = upcall->odp_actions.size;
op->dop.u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
op->dop.u.execute.probe = false;
op->dop.u.execute.mtu = upcall->mru;
}
}
/* Execute batch. */
n_opsp = 0;
for (i = 0; i < n_ops; i++) {
opsp[n_opsp++] = &ops[i].dop;
}
//和datapath交互,将flow安装到datapath,并处理upcall报文
dpif_operate(udpif->dpif, opsp, n_opsp);
//设置 ukey 状态,ukey会在revalidator线程中使用
for (i = 0; i < n_ops; i++) {
struct udpif_key *ukey = ops[i].ukey;
if (ukey) {
ovs_mutex_lock(&ukey->mutex);
if (ops[i].dop.error) {
transition_ukey(ukey, UKEY_EVICTED);
} else if (ukey->state < UKEY_OPERATIONAL) {
transition_ukey(ukey, UKEY_OPERATIONAL);
}
ovs_mutex_unlock(&ukey->mutex);
}
}
}
revalidator线程
revalidator线程在以下几种情况下才会将poll中从堵塞变为运行
a. need_revalidate被设置,说明网桥配置或者流表发生变化,需要重新计算flow
b. pause_latch被设置,说明此线程需要暂时停止运行
c. exit_latch被设置,说明需要将此线程退出,比如将线程数量改小后,会有一部分线程需要退出
d. timeout 超时后运行,最小值为500ms
static void *
udpif_revalidator(void *arg)
{
/* Used by all revalidators. */
struct revalidator *revalidator = arg;
struct udpif *udpif = revalidator->udpif;
//第一个revalidator线程作为leader
bool leader = revalidator == &udpif->revalidators[0];
/* Used only by the leader. */
long long int start_time = 0;
uint64_t last_reval_seq = 0;
size_t n_flows = 0;
revalidator->id = ovsthread_id_self();
for (;;) {
if (leader) {
uint64_t reval_seq;
recirc_run(); /* Recirculation cleanup. */
//获取 reval_seq,只有 need_revalidate 变为true后,reval_seq才会加1,进而唤醒revalidator线程
reval_seq = seq_read(udpif->reval_seq);
last_reval_seq = reval_seq;
//获取datapath中flow个数
n_flows = udpif_get_n_flows(udpif);
udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
/* Only the leader checks the pause latch to prevent a race where
* some threads think it's false and proceed to block on
* reval_barrier and others think it's true and block indefinitely
* on the pause_barrier */
//在dp_purge_cb清理所有ukey时,会设置pause_latch,暂停revalidator线程执行
udpif->pause = latch_is_set(&udpif->pause_latch);
/* Only the leader checks the exit latch to prevent a race where
* some threads think it's true and exit and others think it's
* false and block indefinitely on the reval_barrier */
//如果不需要revalidator线程了,会在udpif_stop_threads设置exit_latch,停止此线程
udpif->reval_exit = latch_is_set(&udpif->exit_latch);
start_time = time_msec();
if (!udpif->reval_exit) {
bool terse_dump;
//如果revalidator线程没有被停止,则创建dump,用来dump datapath中的flow
terse_dump = udpif_use_ufid(udpif);
udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
NULL);
}
}
/* Wait for the leader to start the flow dump. */
//等待所有revalidator线程都运行到此处,才会开始向下执行
ovs_barrier_block(&udpif->reval_barrier);
if (udpif->pause) {
revalidator_pause(revalidator);
}
//如果线程停止了,跳出循环
if (udpif->reval_exit) {
break;
}
//关键函数,下面会单独分析
revalidate(revalidator);
/* Wait for all flows to have been dumped before we garbage collect. */
ovs_barrier_block(&udpif->reval_barrier);
//删除状态为UKEY_EVICTED的ukey
revalidator_sweep(revalidator);
/* Wait for all revalidators to finish garbage collection. */
ovs_barrier_block(&udpif->reval_barrier);
if (leader) {
unsigned int flow_limit;
long long int duration;
atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
dpif_flow_dump_destroy(udpif->dump);
seq_change(udpif->dump_seq);
duration = MAX(time_msec() - start_time, 1);
udpif->dump_duration = duration;
//flow比较多的情况下,最大限制flow_limit会发生变化
//如果dump时间大于2000ms,则flow_limit需要除duration对于1000ms的倍数
//如果dump时间大于1300ms,小于2000ms,则flow_limit变为当前值的四分之三
//如果dump时间小于1000ms,并且当前flow个数大于2000,并且xxx,则flow_limit增加1000
if (duration > 2000) {
flow_limit /= duration / 1000;
} else if (duration > 1300) {
flow_limit = flow_limit * 3 / 4;
} else if (duration < 1000 && n_flows > 2000
&& flow_limit < n_flows * 1000 / duration) {
flow_limit += 1000;
}
//flow_limit最大值不能超过ofproto_flow_limit,即20万
flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
//保存flow_limit,handler线程安装flow时会判断是否超过此值
atomic_store_relaxed(&udpif->flow_limit, flow_limit);
if (duration > 2000) {
VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
duration);
}
//用于计算poll函数超时时间
poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
//将seq插入poll_loop节点,等待 seq_change
seq_wait(udpif->reval_seq, last_reval_seq);
//将exit_latch插入poll_loop节点
latch_wait(&udpif->exit_latch);
//将pause_latch插入poll_loop节点
latch_wait(&udpif->pause_latch);
//调用poll函数,获取poll_loop节点上发生的事件,如果有事件发生,则继续执行,否则就等待超时后再继续执行。
poll_block();
if (!latch_is_set(&udpif->pause_latch) &&
!latch_is_set(&udpif->exit_latch)) {
long long int now = time_msec();
/* Block again if we are woken up within 5ms of the last start
* time. */
start_time += 5;
if (now < start_time) {
poll_timer_wait_until(start_time);
latch_wait(&udpif->exit_latch);
latch_wait(&udpif->pause_latch);
poll_block();
}
}
}
}
return NULL;
}
static void
revalidate(struct revalidator *revalidator)
{
uint64_t odp_actions_stub[1024 / 8];
struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
struct udpif *udpif = revalidator->udpif;
struct dpif_flow_dump_thread *dump_thread;
uint64_t dump_seq, reval_seq;
unsigned int flow_limit;
dump_seq = seq_read(udpif->dump_seq);
reval_seq = seq_read(udpif->reval_seq);
atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
dump_thread = dpif_flow_dump_thread_create(udpif->dump);
for (;;) {
struct ukey_op ops[REVALIDATE_MAX_BATCH];
int n_ops = 0;
struct dpif_flow flows[REVALIDATE_MAX_BATCH];
const struct dpif_flow *f;
int n_dumped;
long long int max_idle;
long long int now;
size_t n_dp_flows;
bool kill_them_all;
//从datapath dump flow信息
n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
if (!n_dumped) {
break;
}
now = time_msec();
/* In normal operation we want to keep flows around until they have
* been idle for 'ofproto_max_idle' milliseconds. However:
*
* - If the number of datapath flows climbs above 'flow_limit',
* drop that down to 100 ms to try to bring the flows down to
* the limit.
*
* - If the number of datapath flows climbs above twice
* 'flow_limit', delete all the datapath flows as an emergency
* measure. (We reassess this condition for the next batch of
* datapath flows, so we will recover before all the flows are
* gone.) */
//再次获取datapath中flow个数
n_dp_flows = udpif_get_n_flows(udpif);
//如果当前flow个数超过flow_limit的两倍,则删除获取的所有flow
kill_them_all = n_dp_flows > flow_limit * 2;
//如果当前flow个数超过 flow_limit,则max_idle为100ms,否则为默认的10s.
//max_idle为超时时间,如果在超时时间内,此flow没有被使用,则超时后就把flow删除。默认10s删除一次。
//如果flow个数超过最大限制flow_limit了,超时时间修改为100ms,需要更快的删除。
max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
for (f = flows; f < &flows[n_dumped]; f++) {
long long int used = f->stats.used;
struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
enum reval_result result;
struct udpif_key *ukey;
bool already_dumped;
int error;
//获取锁失败,说明正在被其他revalidate线程处理,继续处理下一个
if (ukey_acquire(udpif, f, &ukey, &error)) {
if (error == EBUSY) {
/* Another thread is processing this flow, so don't bother
* processing it.*/
COVERAGE_INC(upcall_ukey_contention);
} else {
log_unexpected_flow(f, error);
if (error != ENOENT) {
delete_op_init__(udpif, &ops[n_ops++], f);
}
}
continue;
}
//dump_seq 相等,说明已经被dump过,或者刚创建ukey,继续处理下一个
already_dumped = ukey->dump_seq == dump_seq;
if (already_dumped) {
/* The flow has already been handled during this flow dump
* operation. Skip it. */
if (ukey->xcache) {
COVERAGE_INC(dumped_duplicate_flow);
} else {
COVERAGE_INC(dumped_new_flow);
}
ovs_mutex_unlock(&ukey->mutex);
continue;
}
if (ukey->state <= UKEY_OPERATIONAL) {
/* The flow is now confirmed to be in the datapath. */
transition_ukey(ukey, UKEY_OPERATIONAL);
} else {
VLOG_INFO("Unexpected ukey transition from state %d "
"(last transitioned from thread %u at %s)",
ukey->state, ukey->state_thread, ukey->state_where);
ovs_mutex_unlock(&ukey->mutex);
continue;
}
if (!used) {
used = ukey->created;
}
//used 表示flow最近被用的时间,如果超了max_idle这么长时间不被使用就会被删除。
//比如只创建了一条flow,max_idle就是默认的10s,如果10s内此flow没有流量,则会被删除。
if (kill_them_all || (used && used < now - max_idle)) {
result = UKEY_DELETE;
} else {
//验证ukey,比如只创建了一条flow,10s内openflow流表发生改变后,需要验证megaflow中flow是否需要改变,
//比如同一条流的action发生改变。
//udpif->reval_seq和ukey->reval_seq不一致,说明openflow流表发生变化
result = revalidate_ukey(udpif, ukey, &f->stats, &odp_actions,
reval_seq, &recircs);
}
ukey->dump_seq = dump_seq;
//如果结果不是keep,则需要根据结果(删除/修改)初始化ops
if (result != UKEY_KEEP) {
/* Takes ownership of 'recircs'. */
reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
&odp_actions);
}
ovs_mutex_unlock(&ukey->mutex);
}
//和datapath交互,删除或者修改flow
if (n_ops) {
/* Push datapath ops but defer ukey deletion to 'sweep' phase. */
push_dp_ops(udpif, ops, n_ops);
}
ovsrcu_quiesce();
}
dpif_flow_dump_thread_destroy(dump_thread);
ofpbuf_uninit(&odp_actions);
}
revalidate_ukey用来验证openflow流表是否发生改变,方法是使用datapath获取的flow信息重新查找openflow流表,根据结果判断是否和datapath的action是否一致,如果不一致说明说明需要修改或者删除datapath流表。
/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
* 'stats' for it.
*
* Returns a recommended action for 'ukey', options include:
* UKEY_DELETE The ukey should be deleted.
* UKEY_KEEP The ukey is fine as is.
* UKEY_MODIFY The ukey's actions should be changed but is otherwise
* fine. Callers should change the actions to those found
* in the caller supplied 'odp_actions' buffer. The
* recirculation references can be found in 'recircs' and
* must be handled by the caller.
*
* If the result is UKEY_MODIFY, then references to all recirc_ids used by the
* new flow will be held within 'recircs' (which may be none).
*
* The caller is responsible for both initializing 'recircs' prior this call,
* and ensuring any references are eventually freed.
*/
static enum reval_result
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
const struct dpif_flow_stats *stats,
struct ofpbuf *odp_actions, uint64_t reval_seq,
struct recirc_refs *recircs)
OVS_REQUIRES(ukey->mutex)
{
//最关键的判断,只有流表发生改变就会设置need_revalidate
bool need_revalidate = ukey->reval_seq != reval_seq;
enum reval_result result = UKEY_DELETE;
struct dpif_flow_stats push;
ofpbuf_clear(odp_actions);
push.used = stats->used;
push.tcp_flags = stats->tcp_flags;
push.n_packets = (stats->n_packets > ukey->stats.n_packets
? stats->n_packets - ukey->stats.n_packets
: 0);
push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
? stats->n_bytes - ukey->stats.n_bytes
: 0);
//need_revalidate为true,说明openflow流表发生变化,需要验证mask/action是否改变
if (need_revalidate) {
if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
if (!ukey->xcache) {
ukey->xcache = xlate_cache_new();
} else {
xlate_cache_clear(ukey->xcache);
}
result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
odp_actions, recircs, ukey->xcache);
} /* else delete; too expensive to revalidate */
} else if (!push.n_packets || ukey->xcache
|| !populate_xcache(udpif, ukey, push.tcp_flags)) {
result = UKEY_KEEP;
}
/* Stats for deleted flows will be attributed upon flow deletion. Skip. */
if (result != UKEY_DELETE) {
xlate_push_stats(ukey->xcache, &push);
ukey->stats = *stats;
ukey->reval_seq = reval_seq;
}
return result;
}
配置流表后,会将 ofproto->backer->need_revalidate 设置为 REV_FLOW_TABLE,这样在 type_run 中就可以将reval_seq加1,唤醒revalidator线程进行处理(修改datapath流表)
handle_flow_mod__
ofproto_bump_tables_version(ofproto);
++ofproto->tables_version;
//set_tables_version
ofproto->ofproto_class->set_tables_version(ofproto, ofproto->tables_version);
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
/* Use memory_order_release to signify that any prior memory accesses can
* not be reordered to happen after this atomic store. This makes sure the
* new version is properly set up when the readers can read this 'version'
* value. */
atomic_store_explicit(&ofproto->tables_version, version,
memory_order_release);
/* 'need_revalidate' can be reordered to happen before the atomic_store
* above, but it does not matter as this variable is not accessed by other
* threads. */
ofproto->backer->need_revalidate = REV_FLOW_TABLE;
static int
type_run(const char *type)
if (backer->need_revalidate) {
udpif_revalidate(backer->udpif);
//seq加1,并且唤醒正在等待seq的线程
seq_change(udpif->reval_seq);
}