DPDK stack源码分析

2020-01-26  本文已影响0人  fooboo

在之前分析过dpdk中的相关一些开源实现,然后当时的版本是16.07.2,现在最新的是19.11,代码方面还是有很大的区别,新增不少内容,不过这里不列举,有兴趣相关可以自己看下。这里把有变化的无锁环形队列在这里分析下,接着分析下stack实现,比较简单。这儿是早期的DPDK中无锁环形队列实现

因为早期的struct rte_ring中的struct prod和struct cons有几个数据成员是冗余的,完全可以提取出来作为struct rte_ring的成员:

 64 /* structure to hold a pair of head/tail values and other metadata */
 65 struct rte_ring_headtail {
 66     volatile uint32_t head;  /**< Prod/consumer head. */
 67     volatile uint32_t tail;  /**< Prod/consumer tail. */
 68     uint32_t single;         /**< True if single prod/cons */
 69 };

 81 struct rte_ring {
 88     int flags;               /**< Flags supplied at creation. */
 91     uint32_t size;           /**< Size of ring. */
 92     uint32_t mask;           /**< Mask (size-1) of ring. */
 93     uint32_t capacity;       /**< Usable size of ring */
 94 
 95     char pad0 __rte_cache_aligned; /**< empty cache line */
 96 
 97     /** Ring producer status. */
 98     struct rte_ring_headtail prod __rte_cache_aligned;
 99     char pad1 __rte_cache_aligned; /**< empty cache line */
100 
101     /** Ring consumer status. */
102     struct rte_ring_headtail cons __rte_cache_aligned;
103     char pad2 __rte_cache_aligned; /**< empty cache line */
104 };

这里在创建的时候根据RING_F_EXACT_SZ来决定是否要调整在使用时的大小,在最坏情况会浪费一些存储空间“Worst case, if a power-of-2 size is requested, half the ring space will be wasted.”,所以在使用的过程中,还是需要认真分析下代码和api的说明:

 96     if (flags & RING_F_EXACT_SZ) {
 97         r->size = rte_align32pow2(count + 1);
 98         r->mask = r->size - 1;
 99         r->capacity = count;//实际只有count
100     } else {
101         if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
105             return -EINVAL;
106         }
107         r->size = count;
108         r->mask = count - 1;
109         r->capacity = r->mask;
110     }

 47 /* return the size of memory occupied by a ring */
 48 ssize_t
 49 rte_ring_get_memsize(unsigned count)
 50 {   
 51     ssize_t sz;
 52     
 53     /* count must be a power of 2 */ 
 54     if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
 58         return -EINVAL;
 59     }
 60     
 61     sz = sizeof(struct rte_ring) + count * sizeof(void *);//实际存储空间在struct rte_ring末尾
 62     sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 63     return sz;
 64 }

返回ring中元素的个数:

696 static inline unsigned
697 rte_ring_count(const struct rte_ring *r)
698 {   
699     uint32_t prod_tail = r->prod.tail;
700     uint32_t cons_tail = r->cons.tail;
701     uint32_t count = (prod_tail - cons_tail) & r->mask;
702     return (count > r->capacity) ? r->capacity : count;
703 }

因为都是无符号数,所以count一定是>=0的,不管prod_tail是在cons_tail后面,还是在前面,算出来的元素个数肯定是<= capacity的,但是因为有RING_F_EXACT_SZ的情况下,会直接返回capacity。

其他入队和出队等一些操作和原来的实现差不多思想,只不过这里在获取偏移量的时候多了一个屏障原语,在某些平台上效果不一样,感受一下,比如在__rte_ring_move_prod_head的时候:

 65     do {
 66         /* Reset n to the initial burst count */
 67         n = max;
 69         *old_head = r->prod.head;
 70 
 71         /* add rmb barrier to avoid load/load reorder in weak
 72          * memory model. It is noop on x86
 73          */
 74         rte_smp_rmb();
 75 
 76         /*
 77          *  The subtraction is done between two unsigned 32bits value
 78          * (the result is always modulo 32 bits even if we have
 79          * *old_head > cons_tail). So 'free_entries' is always between 0
 80          * and capacity (which is < size).
 81          */
 82         *free_entries = (capacity + r->cons.tail - *old_head);
 64     *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
 65     do {
 66         /* Reset n to the initial burst count */
 67         n = max;
 69         /* Ensure the head is read before tail */
 70         __atomic_thread_fence(__ATOMIC_ACQUIRE);
 71 
 72         /* load-acquire synchronize with store-release of ht->tail
 73          * in update_tail.
 74          */
 75         cons_tail = __atomic_load_n(&r->cons.tail,
 76                     __ATOMIC_ACQUIRE);
 77 
 83         *free_entries = (capacity + cons_tail - *old_head);

stack

dpdk中的stack有两种,一种是加锁实现的一种是无锁:

 74 /* The RTE stack structure contains the LIFO structure itself, plus metadata
 75  * such as its name and memzone pointer.
 76  */
 77 struct rte_stack {
 82     uint32_t capacity; /**< Usable size of the stack. */
 83     uint32_t flags; /**< Flags supplied at creation. */
 85     union {
 86         struct rte_stack_lf stack_lf; /**< Lock-free LIFO structure. */
 87         struct rte_stack_std stack_std; /**< LIFO structure. */
 88     };
 89 } __rte_cache_aligned;

其中加锁实现的比较简单:

 68 struct rte_stack_std {
 69     rte_spinlock_t lock; /**< LIFO lock */
 70     uint32_t len; /**< LIFO len */
 71     void *objs[]; /**< LIFO pointer table */
 72 };

无锁的实现较复杂些,用list来组织:

 36 struct rte_stack_lf_elem {
 37     void *data;         /**< Data pointer */
 38     struct rte_stack_lf_elem *next; /**< Next pointer */
 39 };
 40 
 41 struct rte_stack_lf_head {
 42     struct rte_stack_lf_elem *top; /**< Stack top */
 43     uint64_t cnt; /**< Modification counter for avoiding ABA problem */
 44 };
 45 
 46 struct rte_stack_lf_list {
 47     /** List head */
 48     struct rte_stack_lf_head head __rte_aligned(16);
 49     /** List len */
 50     uint64_t len;
 51 };
 52 
 53 /* Structure containing two lock-free LIFO lists: the stack itself and a list
 54  * of free linked-list elements.
 55  */
 56 struct rte_stack_lf {
 57     /** LIFO list of elements */
 58     struct rte_stack_lf_list used __rte_cache_aligned;
 59     /** LIFO list of free elements */
 60     struct rte_stack_lf_list free __rte_cache_aligned;
 61     /** LIFO elements */
 62     struct rte_stack_lf_elem elems[] __rte_cache_aligned;
 63 };

这里直接贴上加锁的实现,因为代码比较简单:

 23 static __rte_always_inline unsigned int
 24 __rte_stack_std_push(struct rte_stack *s, void * const *obj_table,
 25              unsigned int n)
 26 {
 27     struct rte_stack_std *stack = &s->stack_std;
 28     unsigned int index;
 29     void **cache_objs;
 30     
 31     rte_spinlock_lock(&stack->lock);
 32     cache_objs = &stack->objs[stack->len];
 33         
 34     /* Is there sufficient space in the stack? */
 35     if ((stack->len + n) > s->capacity) {
 36         rte_spinlock_unlock(&stack->lock);
 37         return 0;
 38     }
 39 
 40     /* Add elements back into the cache */
 41     for (index = 0; index < n; ++index, obj_table++)
 42         cache_objs[index] = *obj_table;
 43 
 44     stack->len += n;
 45 
 46     rte_spinlock_unlock(&stack->lock);
 47     return n;
 48 }

 63 static __rte_always_inline unsigned int
 64 __rte_stack_std_pop(struct rte_stack *s, void **obj_table, unsigned int n)
 65 {
 66     struct rte_stack_std *stack = &s->stack_std;
 67     unsigned int index, len;
 68     void **cache_objs;
 69 
 70     rte_spinlock_lock(&stack->lock);
 71 
 72     if (unlikely(n > stack->len)) {
 73         rte_spinlock_unlock(&stack->lock);
 74         return 0;
 75     }
 76         
 77     cache_objs = stack->objs;
 78         
 79     for (index = 0, len = stack->len - 1; index < n;
 80             ++index, len--, obj_table++)
 81         *obj_table = cache_objs[len];
 82 
 83     stack->len -= n;
 84     rte_spinlock_unlock(&stack->lock);
 85 
 86     return n;
 87 }

push元素的时候,直接加spinlock并算出还有多少的空间,最后设置大小并解锁;pop元素,加spinlock并算出能否pop指定数量的元素,最后pop指定元素并减少大小并解锁;因为这里是要指定的n,如果要pop 5个但只有3个,这里并不pop 3个而是直接返回0个。

无锁的stack实现较复杂,先来看push元素的时候要做些什么:

 30 __rte_experimental
 31 static __rte_always_inline unsigned int
 32 __rte_stack_lf_push(struct rte_stack *s,
 33             void * const *obj_table,
 34             unsigned int n)
 35 {
 36     struct rte_stack_lf_elem *tmp, *first, *last = NULL;
 37     unsigned int i;
 38 
 39     if (unlikely(n == 0))
 40         return 0;
 41 
 42     /* Pop n free elements */
 43     first = __rte_stack_lf_pop_elems(&s->stack_lf.free, n, NULL, &last);
 44     if (unlikely(first == NULL))
 45         return 0;
 46 
 47     /* Construct the list elements */
 48     for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
 49         tmp->data = obj_table[n - i - 1];
 50 
 51     /* Push them to the used list */
 52     __rte_stack_lf_push_elems(&s->stack_lf.used, first, last, n);
 53 
 54     return n;
 55 }

 74 static __rte_always_inline struct rte_stack_lf_elem *
 75 __rte_stack_lf_pop_elems(struct rte_stack_lf_list *list,
 76              unsigned int num,
 77              void **obj_table,
 78              struct rte_stack_lf_elem **last)
 79 {   
 80     struct rte_stack_lf_head old_head;
 81     int success;
 82 
 83     /* Reserve num elements, if available */
 84     while (1) {
 85         uint64_t len = rte_atomic64_read((rte_atomic64_t *)&list->len);
 86     
 87         /* Does the list contain enough elements? */
 88         if (unlikely(len < num))
 89             return NULL;
 90 
 91         if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
 92                     len, len - num))
 93             break;
 94     }
 95 
 96     old_head = list->head;
 98     /* Pop num elements */
 99     do {
100         struct rte_stack_lf_head new_head;
101         struct rte_stack_lf_elem *tmp;
102         unsigned int i;
103 
104         /* An acquire fence (or stronger) is needed for weak memory
105          * models to ensure the LF LIFO element reads are properly
106          * ordered with respect to the head pointer read.
107          */
108         rte_smp_mb();
109 
110         rte_prefetch0(old_head.top);
111 
112         tmp = old_head.top;
113 
114         /* Traverse the list to find the new head. A next pointer will
115          * either point to another element or NULL; if a thread
116          * encounters a pointer that has already been popped, the CAS
117          * will fail.
118          */
119         for (i = 0; i < num && tmp != NULL; i++) {
120             rte_prefetch0(tmp->next);
121             if (obj_table)
122                 obj_table[i] = tmp->data;
123             if (last)
124                 *last = tmp;
125             tmp = tmp->next;
126         }
127 
128         /* If NULL was encountered, the list was modified while
129          * traversing it. Retry.
130          */
131         if (i != num)
132             continue;
133 
134         new_head.top = tmp;
135         new_head.cnt = old_head.cnt + 1;
136 
137         /* old_head is updated on failure */
138         success = rte_atomic128_cmp_exchange(
139                 (rte_int128_t *)&list->head,
140                 (rte_int128_t *)&old_head,
141                 (rte_int128_t *)&new_head,
142                 1, __ATOMIC_RELEASE,
143                 __ATOMIC_RELAXED);
144     } while (success == 0);
145 
146     return old_head.top;
147 }

 33 static __rte_always_inline void
 34 __rte_stack_lf_push_elems(struct rte_stack_lf_list *list,
 35               struct rte_stack_lf_elem *first,
 36               struct rte_stack_lf_elem *last,
 37               unsigned int num)
 38 {
 39     struct rte_stack_lf_head old_head;
 40     int success;
 41 
 42     old_head = list->head;
 43 
 44     do {
 45         struct rte_stack_lf_head new_head;
 46 
 47         /* An acquire fence (or stronger) is needed for weak memory
 48          * models to establish a synchronized-with relationship between
 49          * the list->head load and store-release operations (as part of
 50          * the rte_atomic128_cmp_exchange()).
 51          */
 52         rte_smp_mb();
 53 
 54         /* Swing the top pointer to the first element in the list and
 55          * make the last element point to the old top.
 56          */
 57         new_head.top = first;
 58         new_head.cnt = old_head.cnt + 1;
 59 
 60         last->next = old_head.top;
 62         /* old_head is updated on failure */
 63         success = rte_atomic128_cmp_exchange(
 64                 (rte_int128_t *)&list->head,
 65                 (rte_int128_t *)&old_head,
 66                 (rte_int128_t *)&new_head,
 67                 1, __ATOMIC_RELEASE,
 68                 __ATOMIC_RELAXED);
 69     } while (success == 0);
 70 
 71     rte_atomic64_add((rte_atomic64_t *)&list->len, num);
 72 }

以上push元素时,过程是有些复杂,先从free中pop出可存放一定数量元素的空间:先原子判断len是否达到n,如果不够直接返回null,否则cas设置直到成功,当然可能过程会set失败则继续判断等,这里假设成功后进行后续操作;取出rte_stack_lf_elem并设置,此时会进行预取到cache各层级,并对cnt加1如注释说明中防止aba问题,最后cas设置free新的head。然后把元素依次设置tmp->data = obj_table[n - i - 1]。最后需要更新到used:这里设置新的head,并把老的top更新到last->next,进行cas设置新的head和len。

 69 __rte_experimental
 70 static __rte_always_inline unsigned int
 71 __rte_stack_lf_pop(struct rte_stack *s, void **obj_table, unsigned int n)
 72 {
 73     struct rte_stack_lf_elem *first, *last = NULL;
 74 
 75     if (unlikely(n == 0))
 76         return 0;
 77 
 78     /* Pop n used elements */
 79     first = __rte_stack_lf_pop_elems(&s->stack_lf.used,
 80                      n, obj_table, &last);
 81     if (unlikely(first == NULL))
 82         return 0;
 83 
 84     /* Push the list elements to the free list */
 85     __rte_stack_lf_push_elems(&s->stack_lf.free, first, last, n);
 86 
 87     return n;
 88 }

从stack进行pop元素和push是相反的过程,具体代码一样。

 11 static __rte_always_inline unsigned int
 12 __rte_stack_lf_count(struct rte_stack *s)
 13 {   
 14     /* stack_lf_push() and stack_lf_pop() do not update the list's contents
 15      * and stack_lf->len atomically, which can cause the list to appear
 16      * shorter than it actually is if this function is called while other
 17      * threads are modifying the list.
 18      *
 19      * However, given the inherently approximate nature of the get_count
 20      * callback -- even if the list and its size were updated atomically,
 21      * the size could change between when get_count executes and when the
 22      * value is returned to the caller -- this is acceptable.
 23      *
 24      * The stack_lf->len updates are placed such that the list may appear to
 25      * have fewer elements than it does, but will never appear to have more
 26      * elements. If the mempool is near-empty to the point that this is a
 27      * concern, the user should consider increasing the mempool size.
 28      */
 29     return (unsigned int)rte_atomic64_read((rte_atomic64_t *)
 30             &s->stack_lf.used.len);
 31 }

由于这结构对于多线程同时push或pop或者push/pop同时进行,会出现各种操作序列,包括调用__rte_stack_lf_count时。这里以一个线程push,一个线程pop为例,其他情况同时push或pop时比较简单。而有线程在push或pop时,有另外线程在__rte_stack_lf_count时,因为更新同步有延迟,可能看到的并不一样,看__rte_stack_lf_count调用的具体场合,这里有兴趣自行分析各种情况。

其他的是创建和初始化:

  7 void
  8 rte_stack_lf_init(struct rte_stack *s, unsigned int count)
  9 {
 10     struct rte_stack_lf_elem *elems = s->stack_lf.elems;
 11     unsigned int i;
 12     
 13     for (i = 0; i < count; i++)
 14         __rte_stack_lf_push_elems(&s->stack_lf.free,
 15                       &elems[i], &elems[i], 1);
 16 }
 17         
 18 ssize_t 
 19 rte_stack_lf_get_memsize(unsigned int count)
 20 {
 21     ssize_t sz = sizeof(struct rte_stack);
 22     
 23     sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(struct rte_stack_lf_elem));
 24     
 25     /* Add padding to avoid false sharing conflicts caused by
 26      * next-line hardware prefetchers.
 27      */ 
 28     sz += 2 * RTE_CACHE_LINE_SIZE;
 29 
 30     return sz;
 31 }

由于过年在家因为病毒的复杂原因基本不出去,可能有更多的时间学些想学的,希望早点恢复正常。

上一篇下一篇

猜你喜欢

热点阅读