66214ed000
Also return VDO_SUCCESS from vdo_make_funnel_queue. Signed-off-by: Mike Snitzer <snitzer@kernel.org> Signed-off-by: Chung Chung <cchung@redhat.com> Signed-off-by: Matthew Sakai <msakai@redhat.com>
280 lines
8.0 KiB
C
280 lines
8.0 KiB
C
// SPDX-License-Identifier: GPL-2.0-only
|
|
/*
|
|
* Copyright 2023 Red Hat
|
|
*/
|
|
|
|
#include "funnel-requestqueue.h"
|
|
|
|
#include <linux/atomic.h>
|
|
#include <linux/compiler.h>
|
|
#include <linux/wait.h>
|
|
|
|
#include "funnel-queue.h"
|
|
#include "logger.h"
|
|
#include "memory-alloc.h"
|
|
#include "thread-utils.h"
|
|
|
|
/*
|
|
* This queue will attempt to handle requests in reasonably sized batches instead of reacting
|
|
* immediately to each new request. The wait time between batches is dynamically adjusted up or
|
|
* down to try to balance responsiveness against wasted thread run time.
|
|
*
|
|
* If the wait time becomes long enough, the queue will become dormant and must be explicitly
|
|
* awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
|
|
* queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
|
|
* wakeup of the worker thread.
|
|
*
|
|
* When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
|
|
* decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
|
|
* going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
|
|
* more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
|
|
* queue's "next" field update isn't visible yet to make the entry accessible, its existence will
|
|
* kick the worker thread out of dormant mode and back into timer-based mode.
|
|
*
|
|
* Unbatched requests are used to communicate between different zone threads and will also cause
|
|
* the queue to awaken immediately.
|
|
*/
|
|
|
|
enum {
|
|
NANOSECOND = 1,
|
|
MICROSECOND = 1000 * NANOSECOND,
|
|
MILLISECOND = 1000 * MICROSECOND,
|
|
DEFAULT_WAIT_TIME = 20 * MICROSECOND,
|
|
MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
|
|
MAXIMUM_WAIT_TIME = MILLISECOND,
|
|
MINIMUM_BATCH = 32,
|
|
MAXIMUM_BATCH = 64,
|
|
};
|
|
|
|
struct uds_request_queue {
|
|
/* Wait queue for synchronizing producers and consumer */
|
|
struct wait_queue_head wait_head;
|
|
/* Function to process a request */
|
|
uds_request_queue_processor_fn processor;
|
|
/* Queue of new incoming requests */
|
|
struct funnel_queue *main_queue;
|
|
/* Queue of old requests to retry */
|
|
struct funnel_queue *retry_queue;
|
|
/* The thread id of the worker thread */
|
|
struct thread *thread;
|
|
/* True if the worker was started */
|
|
bool started;
|
|
/* When true, requests can be enqueued */
|
|
bool running;
|
|
/* A flag set when the worker is waiting without a timeout */
|
|
atomic_t dormant;
|
|
};
|
|
|
|
static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
|
|
{
|
|
struct funnel_queue_entry *entry;
|
|
|
|
entry = vdo_funnel_queue_poll(queue->retry_queue);
|
|
if (entry != NULL)
|
|
return container_of(entry, struct uds_request, queue_link);
|
|
|
|
entry = vdo_funnel_queue_poll(queue->main_queue);
|
|
if (entry != NULL)
|
|
return container_of(entry, struct uds_request, queue_link);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static inline bool are_queues_idle(struct uds_request_queue *queue)
|
|
{
|
|
return vdo_is_funnel_queue_idle(queue->retry_queue) &&
|
|
vdo_is_funnel_queue_idle(queue->main_queue);
|
|
}
|
|
|
|
/*
|
|
* Determine if there is a next request to process, and return it if there is. Also return flags
|
|
* indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
|
|
* the thread did sleep before returning a new request.
|
|
*/
|
|
static inline bool dequeue_request(struct uds_request_queue *queue,
|
|
struct uds_request **request_ptr, bool *waited_ptr)
|
|
{
|
|
struct uds_request *request = poll_queues(queue);
|
|
|
|
if (request != NULL) {
|
|
*request_ptr = request;
|
|
return true;
|
|
}
|
|
|
|
if (!READ_ONCE(queue->running)) {
|
|
/* Wake the worker thread so it can exit. */
|
|
*request_ptr = NULL;
|
|
return true;
|
|
}
|
|
|
|
*request_ptr = NULL;
|
|
*waited_ptr = true;
|
|
return false;
|
|
}
|
|
|
|
static void wait_for_request(struct uds_request_queue *queue, bool dormant,
|
|
unsigned long timeout, struct uds_request **request,
|
|
bool *waited)
|
|
{
|
|
if (dormant) {
|
|
wait_event_interruptible(queue->wait_head,
|
|
(dequeue_request(queue, request, waited) ||
|
|
!are_queues_idle(queue)));
|
|
return;
|
|
}
|
|
|
|
wait_event_interruptible_hrtimeout(queue->wait_head,
|
|
dequeue_request(queue, request, waited),
|
|
ns_to_ktime(timeout));
|
|
}
|
|
|
|
static void request_queue_worker(void *arg)
|
|
{
|
|
struct uds_request_queue *queue = arg;
|
|
struct uds_request *request = NULL;
|
|
unsigned long time_batch = DEFAULT_WAIT_TIME;
|
|
bool dormant = atomic_read(&queue->dormant);
|
|
bool waited = false;
|
|
long current_batch = 0;
|
|
|
|
for (;;) {
|
|
wait_for_request(queue, dormant, time_batch, &request, &waited);
|
|
if (likely(request != NULL)) {
|
|
current_batch++;
|
|
queue->processor(request);
|
|
} else if (!READ_ONCE(queue->running)) {
|
|
break;
|
|
}
|
|
|
|
if (dormant) {
|
|
/*
|
|
* The queue has been roused from dormancy. Clear the flag so enqueuers can
|
|
* stop broadcasting. No fence is needed for this transition.
|
|
*/
|
|
atomic_set(&queue->dormant, false);
|
|
dormant = false;
|
|
time_batch = DEFAULT_WAIT_TIME;
|
|
} else if (waited) {
|
|
/*
|
|
* We waited for this request to show up. Adjust the wait time to smooth
|
|
* out the batch size.
|
|
*/
|
|
if (current_batch < MINIMUM_BATCH) {
|
|
/*
|
|
* If the last batch of requests was too small, increase the wait
|
|
* time.
|
|
*/
|
|
time_batch += time_batch / 4;
|
|
if (time_batch >= MAXIMUM_WAIT_TIME) {
|
|
atomic_set(&queue->dormant, true);
|
|
dormant = true;
|
|
}
|
|
} else if (current_batch > MAXIMUM_BATCH) {
|
|
/*
|
|
* If the last batch of requests was too large, decrease the wait
|
|
* time.
|
|
*/
|
|
time_batch -= time_batch / 4;
|
|
if (time_batch < MINIMUM_WAIT_TIME)
|
|
time_batch = MINIMUM_WAIT_TIME;
|
|
}
|
|
current_batch = 0;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Ensure that we process any remaining requests that were enqueued before trying to shut
|
|
* down. The corresponding write barrier is in uds_request_queue_finish().
|
|
*/
|
|
smp_rmb();
|
|
while ((request = poll_queues(queue)) != NULL)
|
|
queue->processor(request);
|
|
}
|
|
|
|
int uds_make_request_queue(const char *queue_name,
|
|
uds_request_queue_processor_fn processor,
|
|
struct uds_request_queue **queue_ptr)
|
|
{
|
|
int result;
|
|
struct uds_request_queue *queue;
|
|
|
|
result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
|
|
if (result != VDO_SUCCESS)
|
|
return result;
|
|
|
|
queue->processor = processor;
|
|
queue->running = true;
|
|
atomic_set(&queue->dormant, false);
|
|
init_waitqueue_head(&queue->wait_head);
|
|
|
|
result = vdo_make_funnel_queue(&queue->main_queue);
|
|
if (result != VDO_SUCCESS) {
|
|
uds_request_queue_finish(queue);
|
|
return result;
|
|
}
|
|
|
|
result = vdo_make_funnel_queue(&queue->retry_queue);
|
|
if (result != VDO_SUCCESS) {
|
|
uds_request_queue_finish(queue);
|
|
return result;
|
|
}
|
|
|
|
result = vdo_create_thread(request_queue_worker, queue, queue_name,
|
|
&queue->thread);
|
|
if (result != VDO_SUCCESS) {
|
|
uds_request_queue_finish(queue);
|
|
return result;
|
|
}
|
|
|
|
queue->started = true;
|
|
*queue_ptr = queue;
|
|
return UDS_SUCCESS;
|
|
}
|
|
|
|
static inline void wake_up_worker(struct uds_request_queue *queue)
|
|
{
|
|
if (wq_has_sleeper(&queue->wait_head))
|
|
wake_up(&queue->wait_head);
|
|
}
|
|
|
|
void uds_request_queue_enqueue(struct uds_request_queue *queue,
|
|
struct uds_request *request)
|
|
{
|
|
struct funnel_queue *sub_queue;
|
|
bool unbatched = request->unbatched;
|
|
|
|
sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
|
|
vdo_funnel_queue_put(sub_queue, &request->queue_link);
|
|
|
|
/*
|
|
* We must wake the worker thread when it is dormant. A read fence isn't needed here since
|
|
* we know the queue operation acts as one.
|
|
*/
|
|
if (atomic_read(&queue->dormant) || unbatched)
|
|
wake_up_worker(queue);
|
|
}
|
|
|
|
void uds_request_queue_finish(struct uds_request_queue *queue)
|
|
{
|
|
if (queue == NULL)
|
|
return;
|
|
|
|
/*
|
|
* This memory barrier ensures that any requests we queued will be seen. The point is that
|
|
* when dequeue_request() sees the following update to the running flag, it will also be
|
|
* able to see any change we made to a next field in the funnel queue entry. The
|
|
* corresponding read barrier is in request_queue_worker().
|
|
*/
|
|
smp_wmb();
|
|
WRITE_ONCE(queue->running, false);
|
|
|
|
if (queue->started) {
|
|
wake_up_worker(queue);
|
|
vdo_join_threads(queue->thread);
|
|
}
|
|
|
|
vdo_free_funnel_queue(queue->main_queue);
|
|
vdo_free_funnel_queue(queue->retry_queue);
|
|
vdo_free(queue);
|
|
}
|