diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c index 79d48b71..48656625 100644 --- a/src/libotutil/ot-worker-queue.c +++ b/src/libotutil/ot-worker-queue.c @@ -31,21 +31,16 @@ struct OtWorkerQueue { GCond cond; GQueue queue; - volatile gint holds; - char *thread_name; gboolean complete; + gboolean is_idle; gboolean destroyed; GThread *worker; OtWorkerQueueFunc work_func; OtWorkerQueueFunc work_data; - - GMainContext *idle_context; - OtWorkerQueueIdleFunc idle_callback; - gpointer idle_data; }; static gpointer @@ -61,6 +56,8 @@ ot_worker_queue_new (const char *thread_name, g_cond_init (&queue->cond); g_queue_init (&queue->queue); + queue->is_idle = TRUE; + queue->thread_name = g_strdup (thread_name); queue->work_func = func; queue->work_data = data; @@ -72,40 +69,6 @@ void ot_worker_queue_start (OtWorkerQueue *queue) { queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue); - ot_worker_queue_push (queue, queue); /* Self marks end of (initial) queue */ -} - -void -ot_worker_queue_hold (OtWorkerQueue *queue) -{ - g_atomic_int_inc (&queue->holds); -} - -static gboolean -invoke_idle_callback (gpointer user_data) -{ - OtWorkerQueue *queue = user_data; - queue->idle_callback (queue->idle_data); - return FALSE; -} - -void -ot_worker_queue_release (OtWorkerQueue *queue) -{ - if (!g_atomic_int_dec_and_test (&queue->holds)) - return; - - g_mutex_lock (&queue->mutex); - - if (!g_queue_peek_tail_link (&queue->queue)) - { - if (queue->idle_callback) - g_main_context_invoke (queue->idle_context, - invoke_idle_callback, - queue); - } - - g_mutex_unlock (&queue->mutex); } void @@ -114,6 +77,7 @@ ot_worker_queue_push (OtWorkerQueue *queue, { g_mutex_lock (&queue->mutex); g_queue_push_head (&queue->queue, data); + queue->is_idle = FALSE; g_cond_signal (&queue->cond); g_mutex_unlock (&queue->mutex); } @@ -131,11 +95,7 @@ ot_worker_queue_thread_main (gpointer user_data) while (!g_queue_peek_tail_link (&queue->queue)) { - if (queue->idle_callback && queue->complete && - g_atomic_int_get (&queue->holds) == 0) - g_main_context_invoke (queue->idle_context, - invoke_idle_callback, - queue); + queue->is_idle = TRUE; g_cond_wait (&queue->cond, &queue->mutex); } @@ -146,27 +106,20 @@ ot_worker_queue_thread_main (gpointer user_data) if (!item) break; - if (item == queue) - queue->complete = TRUE; - else - queue->work_func (item, queue->work_data); + queue->work_func (item, queue->work_data); } return NULL; } -void -ot_worker_queue_set_idle_callback (OtWorkerQueue *queue, - GMainContext *context, - OtWorkerQueueIdleFunc idle_callback, - gpointer data) +gboolean +ot_worker_queue_is_idle (OtWorkerQueue *queue) { - g_assert (!queue->worker); - if (!context) - context = g_main_context_default (); - queue->idle_context = g_main_context_ref (context); - queue->idle_callback = idle_callback; - queue->idle_data = data; + gboolean ret; + g_mutex_lock (&queue->mutex); + ret = queue->is_idle; + g_mutex_unlock (&queue->mutex); + return ret; } void @@ -180,7 +133,6 @@ ot_worker_queue_unref (OtWorkerQueue *queue) g_free (queue->thread_name); - g_main_context_unref (queue->idle_context); g_mutex_clear (&queue->mutex); g_cond_clear (&queue->cond); g_queue_clear (&queue->queue); diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-worker-queue.h index 590480e0..cfd7a923 100644 --- a/src/libotutil/ot-worker-queue.h +++ b/src/libotutil/ot-worker-queue.h @@ -31,7 +31,6 @@ typedef struct OtWorkerQueue OtWorkerQueue; typedef void (*OtWorkerQueueFunc) (gpointer data, gpointer user_data); -typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data); OtWorkerQueue *ot_worker_queue_new (const char *thread_name, OtWorkerQueueFunc func, @@ -39,13 +38,7 @@ OtWorkerQueue *ot_worker_queue_new (const char *thread_name, void ot_worker_queue_start (OtWorkerQueue *queue); -void ot_worker_queue_hold (OtWorkerQueue *queue); -void ot_worker_queue_release (OtWorkerQueue *queue); - -void ot_worker_queue_set_idle_callback (OtWorkerQueue *queue, - GMainContext *context, - OtWorkerQueueIdleFunc idle_callback, - gpointer data); +gboolean ot_worker_queue_is_idle (OtWorkerQueue *queue); void ot_worker_queue_push (OtWorkerQueue *queue, gpointer data); diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index 4ce781d4..f7fbf370 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -101,10 +101,10 @@ typedef struct { guint outstanding_uri_requests; GQueue queued_filemeta; - GThread *metadata_scan_thread; OtWorkerQueue *metadata_objects_to_scan; GHashTable *scanned_metadata; /* Maps object name to itself */ GHashTable *requested_content; /* Maps object name to itself */ + guint n_outstanding_metadata_fetches; guint n_fetched_content; guint outstanding_filemeta_requests; @@ -268,15 +268,23 @@ static void check_outstanding_requests_handle_error (OtPullData *pull_data, GError *error) { - if (!pull_data->metadata_scan_active && + if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) && pull_data->outstanding_uri_requests == 0 && pull_data->outstanding_filemeta_requests == 0 && pull_data->outstanding_filecontent_requests == 0 && + pull_data->n_outstanding_metadata_fetches == 0 && pull_data->outstanding_content_stage_requests == 0) g_main_loop_quit (pull_data->loop); throw_async_error (pull_data, error); } +static gboolean +idle_check_outstanding_requests (gpointer user_data) +{ + check_outstanding_requests_handle_error (user_data, NULL); + return FALSE; +} + static gboolean run_mainloop_monitor_fetcher (OtPullData *pull_data) { @@ -456,7 +464,6 @@ scan_dirtree_object (OtPullData *pull_data, g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum); g_atomic_int_inc (&pull_data->n_requested_content); - ot_worker_queue_hold (pull_data->metadata_objects_to_scan); g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data); } } @@ -736,8 +743,6 @@ idle_queue_content_request (gpointer user_data) process_one_file_request (data); } - ot_worker_queue_release (pull_data->metadata_objects_to_scan); - return FALSE; } @@ -756,10 +761,10 @@ on_metadata_staged (GObject *object, OtPullData *pull_data = fetch_data->pull_data; pull_data->n_fetched_metadata++; + pull_data->n_outstanding_metadata_fetches--; ot_worker_queue_push (pull_data->metadata_objects_to_scan, g_variant_ref (fetch_data->object)); - ot_worker_queue_release (pull_data->metadata_objects_to_scan); (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL); g_object_unref (fetch_data->temp_path); @@ -821,6 +826,7 @@ idle_fetch_metadata_object (gpointer data) objpath = ostree_get_relative_object_path (checksum, objtype, compressed); obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); + pull_data->n_outstanding_metadata_fetches++; ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, meta_fetch_on_complete, fetch_data); soup_uri_free (obj_uri); @@ -840,7 +846,6 @@ queue_metadata_object_fetch (OtPullData *pull_data, IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1); fetch_data->pull_data = pull_data; fetch_data->object = g_variant_ref (object); - ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan); g_idle_add (idle_fetch_metadata_object, fetch_data); } @@ -958,8 +963,9 @@ scan_one_metadata_object (OtPullData *pull_data, } g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object); g_atomic_int_inc (&pull_data->n_scanned_metadata); - } + g_idle_add (idle_check_outstanding_requests, pull_data); + } ret = TRUE; out: @@ -1032,15 +1038,6 @@ scan_one_metadata_object_dispatch (gpointer item, } } -static void -on_metadata_worker_idle (gpointer user_data) -{ - OtPullData *pull_data = user_data; - - pull_data->metadata_scan_active = FALSE; - - check_outstanding_requests_handle_error (pull_data, NULL); -} static gboolean idle_start_worker (gpointer user_data) @@ -1362,8 +1359,6 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", scan_one_metadata_object_dispatch, pull_data); - ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan, - NULL, on_metadata_worker_idle, pull_data); g_hash_table_iter_init (&hash_iter, commits_to_fetch); while (g_hash_table_iter_next (&hash_iter, &key, &value)) @@ -1400,15 +1395,19 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256)); } } - - g_idle_add (idle_start_worker, pull_data); - + /* Start metadata thread, which kicks off further metadata requests * as well as content fetches. */ - if (!run_mainloop_monitor_fetcher (pull_data)) - goto out; + if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) + { + g_idle_add (idle_start_worker, pull_data); + /* Now await work completion */ + if (!run_mainloop_monitor_fetcher (pull_data)) + goto out; + } + if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error)) goto out; @@ -1418,15 +1417,15 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) const char *ref = key; const char *checksum = value; ot_lfree char *remote_ref = NULL; - + remote_ref = g_strdup_printf ("%s/%s", pull_data->remote_name, ref); - + if (!ostree_repo_write_ref (pull_data->repo, pull_data->remote_name, ref, checksum, error)) goto out; - + g_print ("remote %s is now %s\n", remote_ref, checksum); } - + end_time = g_get_monotonic_time (); bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);