xsk: Update rings for load-acquire/store-release barriers
Currently, the AF_XDP rings uses general smp_{r,w,}mb() barriers on the kernel-side. On most modern architectures load-acquire/store-release barriers perform better, and results in simpler code for circular ring buffers. This change updates the XDP socket rings to use load-acquire/store-release barriers. It is important to note that changing from the old smp_{r,w,}mb() barriers, to load-acquire/store-release barriers does not break compatibility. The old semantics work with the new one, and vice versa. As pointed out by "Documentation/memory-barriers.txt" in the "SMP BARRIER PAIRING" section: "General barriers pair with each other, though they also pair with most other types of barriers, albeit without multicopy atomicity. An acquire barrier pairs with a release barrier, but both may also pair with other barriers, including of course general barriers." How different barriers behaves and pairs is outlined in "tools/memory-model/Documentation/cheatsheet.txt". In order to make sure that compatibility is not broken, LKMM herd7 based litmus tests can be constructed and verified. We generalize the XDP socket ring to a one entry ring, and create two scenarios; One where the ring is full, where only the consumer can proceed, followed by the producer. One where the ring is empty, where only the producer can proceed, followed by the consumer. Each scenario is then expanded to four different tests: general producer/general consumer, general producer/acqrel consumer, acqrel producer/general consumer, acqrel producer/acqrel consumer. In total eight tests. The empty ring test: C spsc-rb+empty // Simple one entry ring: // prod cons allowed action prod cons // 0 0 => prod => 1 0 // 0 1 => cons => 0 0 // 1 0 => cons => 1 1 // 1 1 => prod => 0 1 {} // We start at prod==0, cons==0, data==0, i.e. nothing has been // written to the ring. From here only the producer can start, and // should write 1. Afterwards, consumer can continue and read 1 to // data. Can we enter state prod==1, cons==1, but consumer observed // the incorrect value of 0? P0(int *prod, int *cons, int *data) { ... producer } P1(int *prod, int *cons, int *data) { ... consumer } exists( 1:d=0 /\ prod=1 /\ cons=1 ); The full ring test: C spsc-rb+full // Simple one entry ring: // prod cons allowed action prod cons // 0 0 => prod => 1 0 // 0 1 => cons => 0 0 // 1 0 => cons => 1 1 // 1 1 => prod => 0 1 { prod = 1; } // We start at prod==1, cons==0, data==1, i.e. producer has // written 0, so from here only the consumer can start, and should // consume 0. Afterwards, producer can continue and write 1 to // data. Can we enter state prod==0, cons==1, but consumer observed // the write of 1? P0(int *prod, int *cons, int *data) { ... producer } P1(int *prod, int *cons, int *data) { ... consumer } exists( 1:d=1 /\ prod=0 /\ cons=1 ); where P0 and P1 are: P0(int *prod, int *cons, int *data) { int p; p = READ_ONCE(*prod); if (READ_ONCE(*cons) == p) { WRITE_ONCE(*data, 1); smp_wmb(); WRITE_ONCE(*prod, p ^ 1); } } P0(int *prod, int *cons, int *data) { int p; p = READ_ONCE(*prod); if (READ_ONCE(*cons) == p) { WRITE_ONCE(*data, 1); smp_store_release(prod, p ^ 1); } } P1(int *prod, int *cons, int *data) { int c; int d = -1; c = READ_ONCE(*cons); if (READ_ONCE(*prod) != c) { smp_rmb(); d = READ_ONCE(*data); smp_mb(); WRITE_ONCE(*cons, c ^ 1); } } P1(int *prod, int *cons, int *data) { int c; int d = -1; c = READ_ONCE(*cons); if (smp_load_acquire(prod) != c) { d = READ_ONCE(*data); smp_store_release(cons, c ^ 1); } } The full LKMM litmus tests are found at [1]. On x86-64 systems the l2fwd AF_XDP xdpsock sample performance increases by 1%. This is mostly due to that the smp_mb() is removed, which is a relatively expensive operation on these platforms. Weakly-ordered platforms, such as ARM64 might benefit even more. [1] https://github.com/bjoto/litmus-xsk Signed-off-by: Björn Töpel <bjorn.topel@intel.com> Signed-off-by: Andrii Nakryiko <andrii@kernel.org> Acked-by: Toke Høiland-Jørgensen <toke@redhat.com> Link: https://lore.kernel.org/bpf/20210305094113.413544-2-bjorn.topel@gmail.com
This commit is contained in:
parent
299194a914
commit
a23b3f5697
@ -47,19 +47,18 @@ struct xsk_queue {
|
||||
u64 queue_empty_descs;
|
||||
};
|
||||
|
||||
/* The structure of the shared state of the rings are the same as the
|
||||
* ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
|
||||
* ring, the kernel is the producer and user space is the consumer. For
|
||||
* the Tx and fill rings, the kernel is the consumer and user space is
|
||||
* the producer.
|
||||
/* The structure of the shared state of the rings are a simple
|
||||
* circular buffer, as outlined in
|
||||
* Documentation/core-api/circular-buffers.rst. For the Rx and
|
||||
* completion ring, the kernel is the producer and user space is the
|
||||
* consumer. For the Tx and fill rings, the kernel is the consumer and
|
||||
* user space is the producer.
|
||||
*
|
||||
* producer consumer
|
||||
*
|
||||
* if (LOAD ->consumer) { LOAD ->producer
|
||||
* (A) smp_rmb() (C)
|
||||
* if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
|
||||
* STORE $data LOAD $data
|
||||
* smp_wmb() (B) smp_mb() (D)
|
||||
* STORE ->producer STORE ->consumer
|
||||
* STORE.rel ->producer (B) STORE.rel ->consumer (D)
|
||||
* }
|
||||
*
|
||||
* (A) pairs with (D), and (B) pairs with (C).
|
||||
@ -78,7 +77,8 @@ struct xsk_queue {
|
||||
*
|
||||
* (A) is a control dependency that separates the load of ->consumer
|
||||
* from the stores of $data. In case ->consumer indicates there is no
|
||||
* room in the buffer to store $data we do not. So no barrier is needed.
|
||||
* room in the buffer to store $data we do not. The dependency will
|
||||
* order both of the stores after the loads. So no barrier is needed.
|
||||
*
|
||||
* (D) protects the load of the data to be observed to happen after the
|
||||
* store of the consumer pointer. If we did not have this memory
|
||||
@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
|
||||
|
||||
static inline void __xskq_cons_release(struct xsk_queue *q)
|
||||
{
|
||||
smp_mb(); /* D, matches A */
|
||||
WRITE_ONCE(q->ring->consumer, q->cached_cons);
|
||||
smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
|
||||
}
|
||||
|
||||
static inline void __xskq_cons_peek(struct xsk_queue *q)
|
||||
{
|
||||
/* Refresh the local pointer */
|
||||
q->cached_prod = READ_ONCE(q->ring->producer);
|
||||
smp_rmb(); /* C, matches B */
|
||||
q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
|
||||
}
|
||||
|
||||
static inline void xskq_cons_get_entries(struct xsk_queue *q)
|
||||
@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
|
||||
|
||||
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
|
||||
{
|
||||
smp_wmb(); /* B, matches C */
|
||||
|
||||
WRITE_ONCE(q->ring->producer, idx);
|
||||
smp_store_release(&q->ring->producer, idx); /* B, matches C */
|
||||
}
|
||||
|
||||
static inline void xskq_prod_submit(struct xsk_queue *q)
|
||||
|
Loading…
Reference in New Issue
Block a user