pull: Stop using GMainLoop

First of all, what we were doing with having GMainLoop in the internal
APIs is wrong.  Synchronous APIs should always create their own main
context and not iterate the caller's.  Doing the latter creates
potential for evil reentrancy issues.  Sync API should block, async
API is for not blocking.

Now that's out of the way, fix the pull code to do the clean

```
while (termination_condition (state))
  g_main_context_iteration (mainctx, TRUE);
```

model for looping.  This is a lot easier to understand and ultimately
more reliable than having other code call `g_main_loop_quit()`, as the
loop condition is in exactly one place.

We can also remove the idle source which only fired once.

Note we have to add a hack here to discard the synchronous session and
create a new one which we only use async.

https://bugzilla.gnome.org/show_bug.cgi?id=753336
This commit is contained in:
Colin Walters 2015-08-12 17:03:52 -04:00
parent 5c20ea920e
commit 9f3d586993
6 changed files with 133 additions and 243 deletions

View File

@ -714,7 +714,7 @@ _ostree_fetcher_bytes_transferred (OstreeFetcher *self)
typedef struct
{
GInputStream *result_stream;
GMainLoop *loop;
gboolean done;
GError **error;
}
FetchUriSyncData;
@ -728,7 +728,7 @@ fetch_uri_sync_on_complete (GObject *object,
data->result_stream = ostree_fetcher_stream_uri_finish ((OstreeFetcher*)object,
result, data->error);
g_main_loop_quit (data->loop);
data->done = TRUE;
}
gboolean
@ -737,7 +737,6 @@ _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
gboolean add_nul,
gboolean allow_noent,
GBytes **out_contents,
GMainLoop *loop,
guint64 max_size,
GCancellable *cancellable,
GError **error)
@ -746,6 +745,7 @@ _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
const guint8 nulchar = 0;
g_autofree char *ret_contents = NULL;
g_autoptr(GMemoryOutputStream) buf = NULL;
g_autoptr(GMainContext) mainctx = NULL;
FetchUriSyncData data;
g_assert (error != NULL);
@ -754,7 +754,10 @@ _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
data.loop = loop;
mainctx = g_main_context_new ();
g_main_context_push_thread_default (mainctx);
data.done = FALSE;
data.error = error;
ostree_fetcher_stream_uri_async (fetcher, uri,
@ -762,8 +765,9 @@ _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
OSTREE_FETCHER_DEFAULT_PRIORITY,
cancellable,
fetch_uri_sync_on_complete, &data);
while (!data.done)
g_main_context_iteration (mainctx, TRUE);
g_main_loop_run (loop);
if (!data.result_stream)
{
if (allow_noent)
@ -796,6 +800,8 @@ _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
ret = TRUE;
*out_contents = g_memory_output_stream_steal_as_bytes (buf);
out:
if (mainctx)
g_main_context_pop_thread_default (mainctx);
g_clear_object (&(data.result_stream));
return ret;
}

View File

@ -87,7 +87,6 @@ gboolean _ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
gboolean add_nul,
gboolean allow_noent,
GBytes **out_contents,
GMainLoop *loop,
guint64 max_size,
GCancellable *cancellable,
GError **error);

View File

@ -56,7 +56,7 @@ typedef struct
{
OstreeMetalink *metalink;
GTask *task;
GCancellable *cancellable;
GMarkupParseContext *parser;
guint passthrough_depth;
@ -110,8 +110,7 @@ metalink_parser_start (GMarkupParseContext *context,
gpointer user_data,
GError **error)
{
GTask *task = user_data;
OstreeMetalinkRequest *self = g_task_get_task_data (task);
OstreeMetalinkRequest *self = user_data;
switch (self->state)
{
@ -273,8 +272,7 @@ metalink_parser_end (GMarkupParseContext *context,
gpointer user_data,
GError **error)
{
GTask *task = user_data;
OstreeMetalinkRequest *self = g_task_get_task_data (task);
OstreeMetalinkRequest *self = user_data;
switch (self->state)
{
@ -316,8 +314,7 @@ metalink_parser_text (GMarkupParseContext *context,
gpointer user_data,
GError **error)
{
GTask *task = user_data;
OstreeMetalinkRequest *self = g_task_get_task_data (task);
OstreeMetalinkRequest *self = user_data;
switch (self->state)
{
@ -414,9 +411,6 @@ _ostree_metalink_new (OstreeFetcher *fetcher,
return self;
}
static void
try_next_url (OstreeMetalinkRequest *self);
static gboolean
valid_hex_checksum (const char *s, gsize expected_len)
{
@ -425,47 +419,30 @@ valid_hex_checksum (const char *s, gsize expected_len)
return len == expected_len && s[len] == '\0';
}
static void
on_fetched_url (GObject *src,
GAsyncResult *res,
gpointer user_data)
static gboolean
try_one_url (OstreeMetalinkRequest *self,
SoupURI *uri,
GBytes **out_data,
GError **error)
{
GTask *task = user_data;
GCancellable *cancellable;
OstreeMetalinkRequest *self = g_task_get_task_data (task);
GError *local_error = NULL;
int parent_dfd = _ostree_fetcher_get_dfd (self->metalink->fetcher);
g_autoptr(GInputStream) instream = NULL;
g_autoptr(GOutputStream) outstream = NULL;
gboolean ret = FALSE;
g_autoptr(GBytes) bytes = NULL;
g_autofree char *path = NULL;
gssize n_bytes;
path = _ostree_fetcher_request_uri_with_partial_finish ((OstreeFetcher*)src, res, &local_error);
if (!path)
if (!_ostree_fetcher_request_uri_to_membuf (self->metalink->fetcher,
uri,
FALSE,
FALSE,
&bytes,
self->metalink->max_size,
self->cancellable,
error))
goto out;
cancellable = g_task_get_cancellable (task);
if (!ot_openat_read_stream (parent_dfd, path, FALSE, &instream,
cancellable, &local_error))
goto out;
outstream = g_memory_output_stream_new_resizable ();
n_bytes = g_output_stream_splice (outstream, instream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
cancellable, &local_error);
if (n_bytes < 0)
goto out;
bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (outstream));
n_bytes = g_bytes_get_size (bytes);
if (n_bytes != self->size)
{
g_set_error (&local_error, G_IO_ERROR, G_IO_ERROR_FAILED,
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Expected size is %" G_GUINT64_FORMAT " bytes but content is %" G_GSSIZE_FORMAT " bytes",
self->size, n_bytes);
goto out;
@ -479,7 +456,7 @@ on_fetched_url (GObject *src,
if (strcmp (self->verification_sha512, actual) != 0)
{
g_set_error (&local_error, G_IO_ERROR, G_IO_ERROR_FAILED,
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Expected checksum is %s but actual is %s",
self->verification_sha512, actual);
goto out;
@ -493,57 +470,28 @@ on_fetched_url (GObject *src,
if (strcmp (self->verification_sha256, actual) != 0)
{
g_set_error (&local_error, G_IO_ERROR, G_IO_ERROR_FAILED,
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Expected checksum is %s but actual is %s",
self->verification_sha256, actual);
goto out;
}
}
ret = TRUE;
if (out_data)
*out_data = g_bytes_ref (bytes);
out:
if (local_error)
{
g_free (self->last_metalink_error);
self->last_metalink_error = g_strdup (local_error->message);
g_clear_error (&local_error);
/* And here we iterate on the next one if we hit an error */
self->current_url_index++;
try_next_url (self);
}
else
{
self->result = g_bytes_ref (bytes);
g_task_return_boolean (self->task, TRUE);
}
}
static void
try_next_url (OstreeMetalinkRequest *self)
{
if (self->current_url_index >= self->urls->len)
{
g_task_return_new_error (self->task, G_IO_ERROR, G_IO_ERROR_FAILED,
"Exhausted %u metalink targets, last error: %s",
self->urls->len, self->last_metalink_error);
}
else
{
SoupURI *next = self->urls->pdata[self->current_url_index];
_ostree_fetcher_request_uri_with_partial_async (self->metalink->fetcher, next,
self->metalink->max_size,
OSTREE_FETCHER_DEFAULT_PRIORITY,
g_task_get_cancellable (self->task),
on_fetched_url, self->task);
}
return ret;
}
static gboolean
start_target_request_phase (OstreeMetalinkRequest *self,
GError **error)
try_metalink_targets (OstreeMetalinkRequest *self,
SoupURI **out_target_uri,
GBytes **out_data,
GError **error)
{
gboolean ret = FALSE;
SoupURI *target_uri;
if (!self->found_a_file_element)
{
@ -590,24 +538,40 @@ start_target_request_phase (OstreeMetalinkRequest *self,
goto out;
}
try_next_url (self);
for (self->current_url_index = 0;
self->current_url_index < self->urls->len;
self->current_url_index++)
{
GError *temp_error = NULL;
target_uri = self->urls->pdata[self->current_url_index];
if (try_one_url (self, target_uri, out_data, &temp_error))
break;
else
{
g_free (self->last_metalink_error);
self->last_metalink_error = g_strdup (temp_error->message);
g_clear_error (&temp_error);
}
}
if (self->current_url_index >= self->urls->len)
{
g_assert (self->last_metalink_error != NULL);
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Exhausted %u metalink targets, last error: %s",
self->urls->len, self->last_metalink_error);
goto out;
}
ret = TRUE;
if (out_target_uri)
*out_target_uri = soup_uri_copy (target_uri);
out:
return ret;
}
static void
ostree_metalink_request_unref (gpointer data)
{
OstreeMetalinkRequest *request = data;
g_object_unref (request->metalink);
g_clear_pointer (&request->result, g_bytes_unref);
g_free (request->last_metalink_error);
g_ptr_array_unref (request->urls);
g_free (request);
}
static const GMarkupParser metalink_parser = {
metalink_parser_start,
metalink_parser_end,
@ -625,115 +589,55 @@ typedef struct
GMainLoop *loop;
} FetchMetalinkSyncData;
static gboolean
ostree_metalink_request_finish (OstreeMetalink *self,
GAsyncResult *result,
SoupURI **out_target_uri,
GBytes **out_data,
GError **error)
{
OstreeMetalinkRequest *request;
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
request = g_task_get_task_data ((GTask*)result);
if (g_task_propagate_boolean ((GTask*)result, error))
{
g_assert_cmpint (request->current_url_index, <, request->urls->len);
if (out_target_uri != NULL)
*out_target_uri = request->urls->pdata[request->current_url_index];
if (out_data != NULL)
*out_data = g_bytes_ref (request->result);
return TRUE;
}
else
return FALSE;
}
static void
on_metalink_fetched (GObject *src,
GAsyncResult *result,
gpointer user_data)
{
FetchMetalinkSyncData *data = user_data;
data->success = ostree_metalink_request_finish ((OstreeMetalink*)src,
result,
data->out_target_uri,
data->out_data,
data->error);
g_main_loop_quit (data->loop);
}
static gboolean
on_metalink_bytes_read (OstreeMetalinkRequest *self,
OstreeMetalinkRequest *request,
FetchMetalinkSyncData *sync_data,
GBytes *bytes,
GError **error)
{
gsize len;
const guint8 *data = g_bytes_get_data (bytes, &len);
if (!g_markup_parse_context_parse (self->parser, (const char*)data, len, error))
return FALSE;
if (!start_target_request_phase (self, error))
return FALSE;
return TRUE;
}
gboolean
_ostree_metalink_request_sync (OstreeMetalink *self,
GMainLoop *loop,
SoupURI **out_target_uri,
GBytes **out_data,
SoupURI **fetching_sync_uri,
GCancellable *cancellable,
GError **error)
{
OstreeMetalinkRequest *request = g_new0 (OstreeMetalinkRequest, 1);
FetchMetalinkSyncData data = { 0, };
GTask *task = g_task_new (self, cancellable, on_metalink_fetched, &data);
GBytes *out_contents = NULL;
gboolean ret = FALSE;
data.out_target_uri = out_target_uri;
data.out_data = out_data;
data.loop = loop;
data.error = error;
OstreeMetalinkRequest request = { 0, };
g_autoptr(GMainContext) mainctx = NULL;
GBytes *out_contents = NULL;
gsize len;
const guint8 *data;
if (fetching_sync_uri != NULL)
*fetching_sync_uri = _ostree_metalink_get_uri (self);
request->metalink = g_object_ref (self);
request->urls = g_ptr_array_new_with_free_func ((GDestroyNotify) soup_uri_free);
request->task = task; /* Unowned */
mainctx = g_main_context_new ();
g_main_context_push_thread_default (mainctx);
request->parser = g_markup_parse_context_new (&metalink_parser, G_MARKUP_PREFIX_ERROR_POSITION, task, NULL);
request.metalink = g_object_ref (self);
request.urls = g_ptr_array_new_with_free_func ((GDestroyNotify) soup_uri_free);
request.parser = g_markup_parse_context_new (&metalink_parser, G_MARKUP_PREFIX_ERROR_POSITION, &request, NULL);
g_task_set_task_data (task, request, ostree_metalink_request_unref);
if (! _ostree_fetcher_request_uri_to_membuf (self->fetcher,
self->uri,
FALSE,
FALSE,
&out_contents,
loop,
self->max_size,
cancellable,
error))
if (!_ostree_fetcher_request_uri_to_membuf (self->fetcher,
self->uri,
FALSE,
FALSE,
&out_contents,
self->max_size,
cancellable,
error))
goto out;
if (! on_metalink_bytes_read (request, request, &data, out_contents, error))
data = g_bytes_get_data (out_contents, &len);
if (!g_markup_parse_context_parse (request.parser, (const char*)data, len, error))
goto out;
g_main_loop_run (data.loop);
ret = data.success;
if (!try_metalink_targets (&request, out_target_uri, out_data, error))
goto out;
ret = TRUE;
out:
if (mainctx)
g_main_context_pop_thread_default (mainctx);
g_clear_object (&request.metalink);
g_clear_pointer (&request.urls, g_ptr_array_unref);
g_clear_pointer (&request.parser, g_markup_parse_context_free);
return ret;
}

View File

@ -51,7 +51,6 @@ OstreeMetalink *_ostree_metalink_new (OstreeFetcher *fetcher,
SoupURI *_ostree_metalink_get_uri (OstreeMetalink *self);
gboolean _ostree_metalink_request_sync (OstreeMetalink *self,
GMainLoop *loop,
SoupURI **out_target_uri,
GBytes **out_data,
SoupURI **fetching_sync_uri,

View File

@ -45,7 +45,6 @@ typedef struct {
OstreeRepo *remote_repo_local;
GMainContext *main_context;
GMainLoop *loop;
GCancellable *cancellable;
OstreeAsyncProgress *progress;
@ -233,28 +232,9 @@ update_progress (gpointer user_data)
return TRUE;
}
static void
throw_async_error (OtPullData *pull_data,
GError *error)
{
if (error)
{
if (!pull_data->caught_error)
{
pull_data->caught_error = TRUE;
g_propagate_error (pull_data->async_error, error);
g_main_loop_quit (pull_data->loop);
}
else
{
g_error_free (error);
}
}
}
static void
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
/* The core logic function for whether we should continue the main loop */
static gboolean
pull_termination_condition (OtPullData *pull_data)
{
gboolean current_fetch_idle = (pull_data->n_outstanding_metadata_fetches == 0 &&
pull_data->n_outstanding_content_fetches == 0 &&
@ -264,30 +244,42 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
pull_data->n_outstanding_deltapart_write_requests == 0 );
gboolean current_idle = current_fetch_idle && current_write_idle;
throw_async_error (pull_data, error);
if (pull_data->caught_error)
return TRUE;
switch (pull_data->phase)
{
case OSTREE_PULL_PHASE_FETCHING_REFS:
if (!pull_data->fetching_sync_uri)
g_main_loop_quit (pull_data->loop);
return TRUE;
break;
case OSTREE_PULL_PHASE_FETCHING_OBJECTS:
if (current_idle && !pull_data->fetching_sync_uri)
{
g_debug ("pull: idle, exiting mainloop");
g_main_loop_quit (pull_data->loop);
return TRUE;
}
break;
}
return FALSE;
}
static gboolean
idle_check_outstanding_requests (gpointer user_data)
static void
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
{
check_outstanding_requests_handle_error (user_data, NULL);
return FALSE;
if (error)
{
if (!pull_data->caught_error)
{
pull_data->caught_error = TRUE;
g_propagate_error (pull_data->async_error, error);
}
else
{
g_error_free (error);
}
}
}
typedef struct {
@ -311,7 +303,6 @@ fetch_uri_contents_membuf_sync (OtPullData *pull_data,
add_nul,
allow_noent,
out_contents,
pull_data->loop,
OSTREE_MAX_METADATA_SIZE,
cancellable,
error);
@ -1638,7 +1629,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
const char *dir_to_pull = NULL;
char **refs_to_fetch = NULL;
GSource *update_timeout = NULL;
GSource *idle_src;
gboolean disable_static_deltas = FALSE;
if (options)
@ -1664,7 +1654,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
pull_data->async_error = error;
pull_data->main_context = g_main_context_ref_thread_default ();
pull_data->loop = g_main_loop_new (pull_data->main_context, FALSE);
pull_data->flags = flags;
pull_data->repo = self;
@ -1757,7 +1746,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
soup_uri_free (metalink_uri);
if (! _ostree_metalink_request_sync (metalink,
pull_data->loop,
&target_uri,
&summary_bytes,
&pull_data->fetching_sync_uri,
@ -2015,6 +2003,14 @@ ostree_repo_pull_with_options (OstreeRepo *self,
pull_data->phase = OSTREE_PULL_PHASE_FETCHING_OBJECTS;
/* Now discard the previous fetcher, as it was bound to a temporary main context
* for synchronous requests.
*/
g_clear_object (&pull_data->fetcher);
pull_data->fetcher = _ostree_repo_remote_new_fetcher (self, remote_name_or_baseurl, error);
if (pull_data->fetcher == NULL)
goto out;
if (!ostree_repo_prepare_transaction (pull_data->repo, &pull_data->transaction_resuming,
cancellable, error))
goto out;
@ -2069,22 +2065,18 @@ ostree_repo_pull_with_options (OstreeRepo *self,
}
}
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, idle_check_outstanding_requests, pull_data, NULL);
g_source_attach (idle_src, pull_data->main_context);
g_source_unref (idle_src);
if (pull_data->progress)
{
update_timeout = g_timeout_source_new_seconds (1);
g_source_set_priority (update_timeout, G_PRIORITY_HIGH);
g_source_set_callback (update_timeout, update_progress, pull_data, NULL);
g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
g_source_attach (update_timeout, pull_data->main_context);
g_source_unref (update_timeout);
}
/* Now await work completion */
g_main_loop_run (pull_data->loop);
while (!pull_termination_condition (pull_data))
g_main_context_iteration (pull_data->main_context, TRUE);
if (pull_data->caught_error)
goto out;
@ -2195,8 +2187,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_main_context_unref (pull_data->main_context);
if (update_timeout)
g_source_destroy (update_timeout);
if (pull_data->loop)
g_main_loop_unref (pull_data->loop);
g_strfreev (configured_branches);
g_clear_object (&pull_data->fetcher);
g_clear_object (&pull_data->remote_repo_local);

View File

@ -1676,7 +1676,6 @@ _ostree_preload_metadata_file (OstreeRepo *self,
SoupURI *base_uri,
const char *filename,
gboolean is_metalink,
GMainLoop *main_loop,
GBytes **out_bytes,
GCancellable *cancellable,
GError **error)
@ -1691,8 +1690,7 @@ _ostree_preload_metadata_file (OstreeRepo *self,
OSTREE_MAX_METADATA_SIZE,
base_uri);
_ostree_metalink_request_sync (metalink, main_loop,
NULL, out_bytes, NULL,
_ostree_metalink_request_sync (metalink, NULL, out_bytes, NULL,
cancellable, &local_error);
if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
@ -1719,7 +1717,6 @@ _ostree_preload_metadata_file (OstreeRepo *self,
ret = _ostree_fetcher_request_uri_to_membuf (fetcher, uri,
FALSE, TRUE,
out_bytes,
main_loop,
OSTREE_MAX_METADATA_SIZE,
cancellable, error);
soup_uri_free (uri);
@ -1743,16 +1740,12 @@ repo_remote_fetch_summary (OstreeRepo *self,
GError **error)
{
glnx_unref_object OstreeFetcher *fetcher = NULL;
g_autoptr(GMainLoop) main_loop = NULL;
gboolean ret = FALSE;
SoupURI *base_uri = NULL;
uint i;
const char *filenames[] = {"summary", "summary.sig"};
GBytes **outputs[] = {out_summary, out_signatures};
main_loop = g_main_loop_new (g_main_context_get_thread_default (), FALSE);
fetcher = _ostree_repo_remote_new_fetcher (self, name, error);
if (fetcher == NULL)
goto out;
@ -1785,7 +1778,6 @@ repo_remote_fetch_summary (OstreeRepo *self,
base_uri,
filenames[i],
metalink_url_string ? TRUE : FALSE,
main_loop,
outputs[i],
cancellable,
error))