From f63666de2ba9c1c3ac0ec57fc5d3032514ec80f1 Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Tue, 16 Apr 2019 14:58:08 +0200 Subject: [PATCH 1/5] xsk: fix XDP socket ring buffer memory ordering The ring buffer code of XDP sockets is missing a memory barrier on the consumer side between the load of the data and the write that signals that it is ok for the producer to put new data into the buffer. On architectures that does not guarantee that stores are not reordered with older loads, the producer might put data into the ring before the consumer had the chance to read it. As IA does guarantee this ordering, it would only need a compiler barrier here, but there are no primitives in Linux for this specific case (hinder writes to be ordered before older reads) so I had to add a smp_mb() here which will translate into a run-time synch operation on IA. Added a longish comment in the code explaining what each barrier in the ring implementation accomplishes and what would happen if we removed one of them. Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- net/xdp/xsk_queue.h | 56 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 610c0bdc0c2b..88b9ae24658d 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -43,6 +43,48 @@ struct xsk_queue { u64 invalid_descs; }; +/* The structure of the shared state of the rings are the same as the + * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion + * ring, the kernel is the producer and user space is the consumer. For + * the Tx and fill rings, the kernel is the consumer and user space is + * the producer. + * + * producer consumer + * + * if (LOAD ->consumer) { LOAD ->producer + * (A) smp_rmb() (C) + * STORE $data LOAD $data + * smp_wmb() (B) smp_mb() (D) + * STORE ->producer STORE ->consumer + * } + * + * (A) pairs with (D), and (B) pairs with (C). + * + * Starting with (B), it protects the data from being written after + * the producer pointer. If this barrier was missing, the consumer + * could observe the producer pointer being set and thus load the data + * before the producer has written the new data. The consumer would in + * this case load the old data. + * + * (C) protects the consumer from speculatively loading the data before + * the producer pointer actually has been read. If we do not have this + * barrier, some architectures could load old data as speculative loads + * are not discarded as the CPU does not know there is a dependency + * between ->producer and data. + * + * (A) is a control dependency that separates the load of ->consumer + * from the stores of $data. In case ->consumer indicates there is no + * room in the buffer to store $data we do not. So no barrier is needed. + * + * (D) protects the load of the data to be observed to happen after the + * store of the consumer pointer. If we did not have this memory + * barrier, the producer could observe the consumer pointer being set + * and overwrite the data with a new value before the consumer got the + * chance to read the old value. The consumer would thus miss reading + * the old entry and very likely read the new entry twice, once right + * now and again after circling through the ring. + */ + /* Common functions operating for both RXTX and umem queues */ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) @@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); @@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_tail, 1) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_tail++ & q->ring_mask] = addr; /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ WRITE_ONCE(q->ring->producer, q->prod_tail); return 0; @@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_head++ & q->ring_mask] = addr; return 0; } @@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, u32 nb_entries) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail += nb_entries; WRITE_ONCE(q->ring->producer, q->prod_tail); @@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q) if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ q->prod_head++; return 0; } @@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, struct xdp_desc *desc) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); /* Order consumer and data */ - smp_rmb(); + smp_rmb(); /* C, matches B */ } return xskq_validate_desc(q, desc); @@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ idx = (q->prod_head++) & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; @@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, static inline void xskq_produce_flush_desc(struct xsk_queue *q) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail = q->prod_head, WRITE_ONCE(q->ring->producer, q->prod_tail); From d5e63fdd443378531fd1a67a15a720a799635d93 Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Tue, 16 Apr 2019 14:58:09 +0200 Subject: [PATCH 2/5] libbpf: fix XDP socket ring buffer memory ordering The ring buffer code of XDP sockets is missing a memory barrier on the consumer side between the load of the data and the write that signals that it is ok for the producer to put new data into the buffer. On architectures that does not guarantee that stores are not reordered with older loads, the producer might put data into the ring before the consumer had the chance to read it. As IA does guarantee this ordering, it would only need a compiler barrier here, but there are no primitives in barrier.h for this specific case (hinder writes to be ordered before older reads) so I had to add a smp_mb() here which will translate into a run-time synch operation on IA. Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- tools/lib/bpf/xsk.h | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h index a497f00e2962..1b35c40dff73 100644 --- a/tools/lib/bpf/xsk.h +++ b/tools/lib/bpf/xsk.h @@ -36,6 +36,10 @@ struct name { \ DEFINE_XSK_RING(xsk_ring_prod); DEFINE_XSK_RING(xsk_ring_cons); +/* For a detailed explanation on the memory barriers associated with the + * ring, please take a look at net/xdp/xsk_queue.h. + */ + struct xsk_umem; struct xsk_socket; @@ -116,8 +120,8 @@ static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb) { - /* Make sure everything has been written to the ring before signalling - * this to the kernel. + /* Make sure everything has been written to the ring before indicating + * this to the kernel by writing the producer pointer. */ smp_wmb(); @@ -144,6 +148,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb) { + /* Make sure data has been read before indicating we are done + * with the entries by updating the consumer pointer. + */ + smp_mb(); + *cons->consumer += nb; } From a06d729646e87afc5ce06e539aecf762cc26c6e3 Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Tue, 16 Apr 2019 14:58:10 +0200 Subject: [PATCH 3/5] libbpf: remove likely/unlikely in xsk.h This patch removes the use of likely and unlikely in xsk.h since they create a dependency on Linux headers as reported by several users. There have also been reports that the use of these decreases performance as the compiler puts the code on two different cache lines instead of on a single one. All in all, I think we are better off without them. Fixes: 1cad07884239 ("libbpf: add support for using AF_XDP sockets") Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- tools/lib/bpf/xsk.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h index 1b35c40dff73..f264f24f06ac 100644 --- a/tools/lib/bpf/xsk.h +++ b/tools/lib/bpf/xsk.h @@ -109,7 +109,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb) static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, size_t nb, __u32 *idx) { - if (unlikely(xsk_prod_nb_free(prod, nb) < nb)) + if (xsk_prod_nb_free(prod, nb) < nb) return 0; *idx = prod->cached_prod; @@ -133,7 +133,7 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, { size_t entries = xsk_cons_nb_avail(cons, nb); - if (likely(entries > 0)) { + if (entries > 0) { /* Make sure we do not speculatively read the data before * we have received the packet buffers from the ring. */ From b7e3a28019c92ffe1f55de278c5641de33b6259a Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Tue, 16 Apr 2019 14:58:11 +0200 Subject: [PATCH 4/5] libbpf: remove dependency on barrier.h in xsk.h The use of smp_rmb() and smp_wmb() creates a Linux header dependency on barrier.h that is unnecessary in most parts. This patch implements the two small defines that are needed from barrier.h. As a bonus, the new implementations are faster than the default ones as they default to sfence and lfence for x86, while we only need a compiler barrier in our case. Just as it is when the same ring access code is compiled in the kernel. Fixes: 1cad07884239 ("libbpf: add support for using AF_XDP sockets") Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- tools/lib/bpf/libbpf_util.h | 25 +++++++++++++++++++++++++ tools/lib/bpf/xsk.h | 7 ++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/tools/lib/bpf/libbpf_util.h b/tools/lib/bpf/libbpf_util.h index 81ecda0cb9c9..71fbb2898b87 100644 --- a/tools/lib/bpf/libbpf_util.h +++ b/tools/lib/bpf/libbpf_util.h @@ -23,6 +23,31 @@ do { \ #define pr_info(fmt, ...) __pr(LIBBPF_INFO, fmt, ##__VA_ARGS__) #define pr_debug(fmt, ...) __pr(LIBBPF_DEBUG, fmt, ##__VA_ARGS__) +/* Use these barrier functions instead of smp_[rw]mb() when they are + * used in a libbpf header file. That way they can be built into the + * application that uses libbpf. + */ +#if defined(__i386__) || defined(__x86_64__) +# define libbpf_smp_rmb() asm volatile("" : : : "memory") +# define libbpf_smp_wmb() asm volatile("" : : : "memory") +# define libbpf_smp_mb() \ + asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc") +#elif defined(__aarch64__) +# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory") +# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") +# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +#elif defined(__arm__) +/* These are only valid for armv7 and above */ +# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") +# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +#else +# warning Architecture missing native barrier functions in libbpf_util.h. +# define libbpf_smp_rmb() __sync_synchronize() +# define libbpf_smp_wmb() __sync_synchronize() +# define libbpf_smp_mb() __sync_synchronize() +#endif + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h index f264f24f06ac..2377c7a7f1b1 100644 --- a/tools/lib/bpf/xsk.h +++ b/tools/lib/bpf/xsk.h @@ -16,6 +16,7 @@ #include #include "libbpf.h" +#include "libbpf_util.h" #ifdef __cplusplus extern "C" { @@ -123,7 +124,7 @@ static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb) /* Make sure everything has been written to the ring before indicating * this to the kernel by writing the producer pointer. */ - smp_wmb(); + libbpf_smp_wmb(); *prod->producer += nb; } @@ -137,7 +138,7 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, /* Make sure we do not speculatively read the data before * we have received the packet buffers from the ring. */ - smp_rmb(); + libbpf_smp_rmb(); *idx = cons->cached_cons; cons->cached_cons += entries; @@ -151,7 +152,7 @@ static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb) /* Make sure data has been read before indicating we are done * with the entries by updating the consumer pointer. */ - smp_mb(); + libbpf_smp_mb(); *cons->consumer += nb; } From 2c5935f1b2b642cee8e1562396ec8a7781fc4c6d Mon Sep 17 00:00:00 2001 From: Magnus Karlsson Date: Tue, 16 Apr 2019 14:58:12 +0200 Subject: [PATCH 5/5] libbpf: optimize barrier for XDP socket rings The full memory barrier in the XDP socket rings on the consumer side between the load of the data and the store of the consumer ring is there to protect the store from being executed before the load of the data. If this was allowed to happen, the producer might overwrite the data field with a new entry before the consumer got the chance to read it. On x86, stores are guaranteed not to be reordered with older loads, so it does not need a full memory barrier here. A compile time barrier would be enough. This patch introdcues a new primitive in libbpf_util.h that implements a new barrier type (libbpf_smp_rwmb) hindering stores to be reordered with older loads. It is then used in the XDP socket ring access code in libbpf to improve performance. Signed-off-by: Magnus Karlsson Signed-off-by: Alexei Starovoitov --- tools/lib/bpf/libbpf_util.h | 5 +++++ tools/lib/bpf/xsk.h | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/lib/bpf/libbpf_util.h b/tools/lib/bpf/libbpf_util.h index 71fbb2898b87..172b707e007b 100644 --- a/tools/lib/bpf/libbpf_util.h +++ b/tools/lib/bpf/libbpf_util.h @@ -32,20 +32,25 @@ do { \ # define libbpf_smp_wmb() asm volatile("" : : : "memory") # define libbpf_smp_mb() \ asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc") +/* Hinders stores to be observed before older loads. */ +# define libbpf_smp_rwmb() asm volatile("" : : : "memory") #elif defined(__aarch64__) # define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory") # define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") # define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_rwmb() libbpf_smp_mb() #elif defined(__arm__) /* These are only valid for armv7 and above */ # define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory") # define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") # define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_rwmb() libbpf_smp_mb() #else # warning Architecture missing native barrier functions in libbpf_util.h. # define libbpf_smp_rmb() __sync_synchronize() # define libbpf_smp_wmb() __sync_synchronize() # define libbpf_smp_mb() __sync_synchronize() +# define libbpf_smp_rwmb() __sync_synchronize() #endif #ifdef __cplusplus diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h index 2377c7a7f1b1..82ea71a0f3ec 100644 --- a/tools/lib/bpf/xsk.h +++ b/tools/lib/bpf/xsk.h @@ -152,7 +152,7 @@ static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb) /* Make sure data has been read before indicating we are done * with the entries by updating the consumer pointer. */ - libbpf_smp_mb(); + libbpf_smp_rwmb(); *cons->consumer += nb; }