diff --git a/src/iothread.cpp b/src/iothread.cpp index 62e760e8b..6ce2fec9a 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -57,14 +58,18 @@ struct work_request_t { }; struct main_thread_request_t { - relaxed_atomic_bool_t done{false}; + // The function to execute. void_function_t func; + // Set by the main thread when the work is done. + std::promise done{}; + explicit main_thread_request_t(void_function_t &&f) : func(f) {} // No moving OR copying // main_thread_requests are always stack allocated, and we deal in pointers to them void operator=(const main_thread_request_t &) = delete; + void operator=(main_thread_request_t &&) = delete; main_thread_request_t(const main_thread_request_t &) = delete; main_thread_request_t(main_thread_request_t &&) = delete; }; @@ -138,11 +143,8 @@ static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS)) static owning_lock> s_result_queue; // "Do on main thread" support. -static std::mutex s_main_thread_performer_lock; // protects the main thread requests -static std::condition_variable s_main_thread_performer_cond; // protects the main thread requests - -/// The queue of main thread requests. This queue contains pointers to structs that are -/// stack-allocated on the requesting thread. +// The queue of main thread requests. This queue contains pointers to structs that are +// stack-allocated on the requesting thread. static owning_lock> s_main_thread_request_queue; // Pipes used for notifying. @@ -370,28 +372,13 @@ static void iothread_service_main_thread_requests() { std::queue request_queue; s_main_thread_request_queue.acquire()->swap(request_queue); - if (!request_queue.empty()) { - // Perform each of the functions. Note we are NOT responsible for deleting these. They are - // stack allocated in their respective threads! - while (!request_queue.empty()) { - main_thread_request_t *req = request_queue.front(); - request_queue.pop(); - req->func(); - req->done = true; - } - - // Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked. - // Note we must do this while holding the lock. Otherwise we race with the waiting threads: - // - // 1. waiting thread checks for done, sees false - // 2. main thread performs request, sets done to true, posts to condition - // 3. waiting thread unlocks lock, waits on condition (forever) - // - // Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid - // posting before the waiting thread is waiting. - // TODO: revisit this logic, this feels sketchy. - scoped_lock broadcast_lock(s_main_thread_performer_lock); - s_main_thread_performer_cond.notify_all(); + // Perform each of the functions. Note we are NOT responsible for deleting these. They are + // stack allocated in their respective threads! + while (!request_queue.empty()) { + main_thread_request_t *req = request_queue.front(); + request_queue.pop(); + req->func(); + req->done.set_value(); } } @@ -424,21 +411,11 @@ void iothread_perform_on_main(void_function_t &&func) { // Append it. Ensure we don't hold the lock after. s_main_thread_request_queue.acquire()->push(&req); - // Tell the pipe. + // Tell the pipe and then wait until our future is set. const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE; int notify_fd = get_notify_pipes().write; assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); - - // Wait on the condition, until we're done. - std::unique_lock perform_lock(s_main_thread_performer_lock); - while (!req.done) { - // It would be nice to support checking for cancellation here, but the clients need a - // deterministic way to clean up to avoid leaks - s_main_thread_performer_cond.wait(perform_lock); - } - - // Ok, the request must now be done. - assert(req.done); + req.done.get_future().wait(); } bool make_detached_pthread(void *(*func)(void *), void *param) {