pull: Hopefully squash race where we would exit early

This is a redesign (again) of the pull code.  It is simpler and
survives 20 minutes of testing in a loop, whereas the old code would
only go from 30 seconds to 2 minutes.

The problem with the old code was that there was a race where we might
determine idle state even when there are content requests in flight
between the metadata thread and the main one.

This code majorly reworks things - there's now only one IDLE message,
sent in a circle from the main thread, through the metadata scanner,
and back to the main one.

Crucially it's only sent when the *main* thread is idle.  Previously
we were looking at whether the metadata scanner is idle, but that
doesn't make a lot of sense.  First let's make sure the main thread is
idle, then verify that the metadata one is.

This closes the loop because we'll have ensured we get any pending
requests.

https://bugzilla.gnome.org/show_bug.cgi?id=706456
This commit is contained in:
Colin Walters 2014-01-19 18:12:44 -05:00
parent 3802a0679b
commit 3cd866556c

View File

@ -63,8 +63,7 @@
typedef struct {
enum {
PULL_MSG_SCAN_IDLE,
PULL_MSG_MAIN_IDLE,
PULL_MSG_IDLE,
PULL_MSG_FETCH,
PULL_MSG_FETCH_DETACHED_METADATA,
PULL_MSG_SCAN,
@ -103,8 +102,9 @@ typedef struct {
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */
guint metadata_scan_idle : 1; /* TRUE if we passed through an idle message */
guint idle_serial; /* Incremented when we get a SCAN_IDLE message */
guint checking_metadata_scan_complete : 1;
guint metadata_scan_complete : 1;
guint idle_serial;
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_write_requests;
guint n_outstanding_content_fetches;
@ -220,8 +220,7 @@ pull_worker_message_new (int msgtype, gpointer data)
msg->t = msgtype;
switch (msgtype)
{
case PULL_MSG_SCAN_IDLE:
case PULL_MSG_MAIN_IDLE:
case PULL_MSG_IDLE:
msg->d.idle_serial = GPOINTER_TO_UINT (data);
break;
case PULL_MSG_SCAN:
@ -254,6 +253,24 @@ throw_async_error (OtPullData *pull_data,
}
}
static gboolean
termination_condition (OtPullData *pull_data,
gboolean current_fetch_idle,
gboolean current_write_idle)
{
/* This is true in the phase when we're fetching refs */
if (pull_data->metadata_objects_to_scan == NULL)
{
if (!pull_data->fetching_sync_uri)
return TRUE;
}
else if (pull_data->metadata_scan_complete && current_fetch_idle && current_write_idle)
{
return TRUE;
}
return FALSE;
}
static void
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
@ -263,22 +280,24 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
gboolean current_write_idle = (pull_data->n_outstanding_metadata_write_requests == 0 &&
pull_data->n_outstanding_content_write_requests == 0);
g_debug ("pull: scan: %u fetching: %u staging: %u",
!pull_data->metadata_scan_idle, !current_fetch_idle, !current_write_idle);
g_debug ("pull: scanning: %u fetching: %u staging: %u",
!pull_data->metadata_scan_complete, !current_fetch_idle, !current_write_idle);
throw_async_error (pull_data, error);
/* This is true in the phase when we're fetching refs */
if (pull_data->metadata_objects_to_scan == NULL)
if (pull_data->metadata_objects_to_scan &&
!pull_data->checking_metadata_scan_complete &&
!pull_data->metadata_scan_complete &&
(current_fetch_idle && current_write_idle))
{
if (!pull_data->fetching_sync_uri)
g_main_loop_quit (pull_data->loop);
return;
}
else if (pull_data->metadata_scan_idle && current_fetch_idle && current_write_idle)
{
g_main_loop_quit (pull_data->loop);
pull_data->checking_metadata_scan_complete = TRUE;
pull_data->idle_serial++;
g_debug ("Sending new MSG_IDLE with serial %u", pull_data->idle_serial);
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
}
else if (termination_condition (pull_data, current_fetch_idle, current_write_idle))
g_main_loop_quit (pull_data->loop);
}
static gboolean
@ -437,6 +456,7 @@ scan_dirtree_object (OtPullData *pull_data,
{
g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
g_debug ("queued fetch of content %s", file_checksum);
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_FETCH,
ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE)));
@ -606,6 +626,7 @@ on_metadata_writed (GObject *object,
OstreeObjectType objtype;
gs_free char *checksum = NULL;
gs_free guchar *csum = NULL;
gs_free char *stringified_object = NULL;
if (!ostree_repo_write_metadata_finish ((OstreeRepo*)object, result,
&csum, error))
@ -616,7 +637,8 @@ on_metadata_writed (GObject *object,
ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype));
g_debug ("write of %s complete", ostree_object_to_string (checksum, objtype));
stringified_object = ostree_object_to_string (checksum, objtype);
g_debug ("write of %s complete", stringified_object);
if (strcmp (checksum, expected_checksum) != 0)
{
@ -626,7 +648,7 @@ on_metadata_writed (GObject *object,
goto out;
}
pull_data->metadata_scan_idle = FALSE;
pull_data->metadata_scan_complete = FALSE;
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_SCAN,
g_variant_ref (fetch_data->object)));
@ -893,7 +915,7 @@ on_metadata_objects_to_scan_ready (gint fd,
g_variant_unref (msg->d.item);
g_free (msg);
}
else if (msg->t == PULL_MSG_MAIN_IDLE)
else if (msg->t == PULL_MSG_IDLE)
{
g_free (last_idle_msg);
last_idle_msg = msg;
@ -910,16 +932,11 @@ on_metadata_objects_to_scan_ready (gint fd,
if (last_idle_msg)
{
g_debug ("pull: Processing PULL_MSG_MAIN_IDLE");
g_debug ("pull: Processing PULL_MSG_IDLE");
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
last_idle_msg);
}
/* When we have no queue to process, notify the main thread */
g_debug ("pull: Sending SCAN_IDLE");
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_SCAN_IDLE, GUINT_TO_POINTER (0)));
out:
if (local_error)
{
@ -1015,29 +1032,18 @@ on_metadata_objects_to_fetch_ready (gint fd,
if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
goto out;
if (msg->t == PULL_MSG_MAIN_IDLE)
if (msg->t == PULL_MSG_IDLE)
{
pull_data->checking_metadata_scan_complete = FALSE;
if (msg->d.idle_serial == pull_data->idle_serial)
{
g_assert (!pull_data->metadata_scan_idle);
pull_data->metadata_scan_idle = TRUE;
g_debug ("pull: metadata scan is idle");
}
}
else if (msg->t == PULL_MSG_SCAN_IDLE)
{
if (!pull_data->metadata_scan_idle)
{
g_debug ("pull: queue MAIN_IDLE");
pull_data->idle_serial++;
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
}
pull_data->metadata_scan_complete = TRUE;
}
else if (msg->t == PULL_MSG_FETCH || msg->t == PULL_MSG_FETCH_DETACHED_METADATA)
{
gboolean is_detached_meta;
pull_data->metadata_scan_complete = FALSE;
is_detached_meta = msg->t == PULL_MSG_FETCH_DETACHED_METADATA;
enqueue_one_object_request (pull_data, msg->d.item, is_detached_meta);
@ -1364,6 +1370,8 @@ ostree_repo_pull (OstreeRepo *self,
cancellable, error))
goto out;
g_debug ("resuming transaction: %s", pull_data->transaction_resuming ? "true" : " false");
pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
@ -1397,11 +1405,6 @@ ostree_repo_pull (OstreeRepo *self,
g_source_unref (queue_src);
}
/* Prime the message queue */
pull_data->idle_serial++;
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
goto out;