ostree_fetcher: new function _ostree_fetcher_contents_membuf_sync

Move code from ostree-repo-pull.c

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2014-11-03 16:53:19 +01:00 committed by Colin Walters
parent 5c9e83c02c
commit d5d73debd8
3 changed files with 166 additions and 108 deletions

View File

@ -755,3 +755,102 @@ _ostree_fetcher_get_n_requests (OstreeFetcher *self)
{
return self->total_requests;
}
typedef struct
{
GInputStream *result_stream;
GMainLoop *loop;
GError **error;
gpointer user_data;
}
FetchUriSyncData;
static gboolean
run_mainloop_monitor_fetcher (FetchUriSyncData *data)
{
g_main_loop_run (data->loop);
return TRUE;
}
static void
fetch_uri_sync_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
FetchUriSyncData *data = user_data;
data->result_stream = _ostree_fetcher_stream_uri_finish ((OstreeFetcher*)object,
result, data->error);
g_main_loop_quit (data->loop);
}
gboolean
_ostree_fetcher_contents_membuf_sync (OstreeFetcher *fetcher,
SoupURI *uri,
gboolean add_nul,
gboolean allow_noent,
GBytes **out_contents,
GMainLoop *loop,
gpointer user_data,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
const guint8 nulchar = 0;
gs_free char *ret_contents = NULL;
gs_unref_object GMemoryOutputStream *buf = NULL;
FetchUriSyncData data;
g_assert (error != NULL);
data.result_stream = NULL;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
data.user_data = user_data;
data.loop = loop;
data.error = error;
_ostree_fetcher_stream_uri_async (fetcher, uri,
OSTREE_MAX_METADATA_SIZE,
cancellable,
fetch_uri_sync_on_complete, &data);
run_mainloop_monitor_fetcher (&data);
if (!data.result_stream)
{
if (allow_noent)
{
if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
{
g_clear_error (error);
ret = TRUE;
*out_contents = NULL;
}
}
goto out;
}
buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
if (g_output_stream_splice ((GOutputStream*)buf, data.result_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
cancellable, error) < 0)
goto out;
if (add_nul)
{
if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
goto out;
}
if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
goto out;
ret = TRUE;
*out_contents = g_memory_output_stream_steal_as_bytes (buf);
out:
g_clear_object (&(data.result_stream));
return ret;
}

View File

@ -81,16 +81,25 @@ GFile *_ostree_fetcher_request_uri_with_partial_finish (OstreeFetcher *self,
GError **error);
void _ostree_fetcher_stream_uri_async (OstreeFetcher *self,
SoupURI *uri,
guint64 max_size,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
SoupURI *uri,
guint64 max_size,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GInputStream *_ostree_fetcher_stream_uri_finish (OstreeFetcher *self,
GAsyncResult *result,
GError **error);
gboolean _ostree_fetcher_contents_membuf_sync (OstreeFetcher *fetcher,
SoupURI *uri,
gboolean add_nul,
gboolean allow_noent,
GBytes **out_contents,
GMainLoop *loop,
gpointer user_data,
GCancellable *cancellable,
GError **error);
G_END_DECLS
#endif

View File

@ -143,18 +143,29 @@ suburi_new (SoupURI *base,
static gboolean
update_progress (gpointer user_data)
{
OtPullData *pull_data = user_data;
guint outstanding_writes = pull_data->n_outstanding_content_write_requests +
OtPullData *pull_data;
guint outstanding_writes;
guint outstanding_fetches;
guint64 bytes_transferred;
guint fetched;
guint requested;
guint n_scanned_metadata;
guint64 start_time;
pull_data = user_data;
if (! pull_data->progress)
return FALSE;
outstanding_writes = pull_data->n_outstanding_content_write_requests +
pull_data->n_outstanding_metadata_write_requests;
guint outstanding_fetches = pull_data->n_outstanding_content_fetches +
outstanding_fetches = pull_data->n_outstanding_content_fetches +
pull_data->n_outstanding_metadata_fetches;
guint64 bytes_transferred = _ostree_fetcher_bytes_transferred (pull_data->fetcher);
guint fetched = pull_data->n_fetched_metadata + pull_data->n_fetched_content;
guint requested = pull_data->n_requested_metadata + pull_data->n_requested_content;
guint n_scanned_metadata = pull_data->n_scanned_metadata;
guint64 start_time = pull_data->start_time;
g_assert (pull_data->progress);
bytes_transferred = _ostree_fetcher_bytes_transferred (pull_data->fetcher);
fetched = pull_data->n_fetched_metadata + pull_data->n_fetched_content;
requested = pull_data->n_requested_metadata + pull_data->n_requested_content;
n_scanned_metadata = pull_data->n_scanned_metadata;
start_time = pull_data->start_time;
ostree_async_progress_set_uint (pull_data->progress, "outstanding-fetches", outstanding_fetches);
ostree_async_progress_set_uint (pull_data->progress, "outstanding-writes", outstanding_writes);
@ -231,50 +242,11 @@ idle_check_outstanding_requests (gpointer user_data)
return FALSE;
}
static gboolean
run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
GSource *update_timeout = NULL;
GSource *idle_src;
if (pull_data->progress)
{
update_timeout = g_timeout_source_new_seconds (1);
g_source_set_priority (update_timeout, G_PRIORITY_HIGH);
g_source_set_callback (update_timeout, update_progress, pull_data, NULL);
g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
g_source_unref (update_timeout);
}
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, idle_check_outstanding_requests, pull_data, NULL);
g_source_attach (idle_src, pull_data->main_context);
g_main_loop_run (pull_data->loop);
if (update_timeout)
g_source_destroy (update_timeout);
return !pull_data->caught_error;
}
typedef struct {
OtPullData *pull_data;
GInputStream *result_stream;
} OstreeFetchUriSyncData;
static void
fetch_uri_sync_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OstreeFetchUriSyncData *data = user_data;
data->result_stream = _ostree_fetcher_stream_uri_finish ((OstreeFetcher*)object,
result, data->pull_data->async_error);
data->pull_data->fetching_sync_uri = NULL;
g_main_loop_quit (data->pull_data->loop);
}
static gboolean
fetch_uri_contents_membuf_sync (OtPullData *pull_data,
SoupURI *uri,
@ -284,59 +256,18 @@ fetch_uri_contents_membuf_sync (OtPullData *pull_data,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
const guint8 nulchar = 0;
gs_free char *ret_contents = NULL;
gs_unref_object GMemoryOutputStream *buf = NULL;
OstreeFetchUriSyncData fetch_data = { 0, };
g_assert (error != NULL);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
fetch_data.pull_data = pull_data;
gboolean ret;
pull_data->fetching_sync_uri = uri;
_ostree_fetcher_stream_uri_async (pull_data->fetcher, uri,
OSTREE_MAX_METADATA_SIZE,
cancellable,
fetch_uri_sync_on_complete, &fetch_data);
run_mainloop_monitor_fetcher (pull_data);
if (!fetch_data.result_stream)
{
if (allow_noent)
{
if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
{
g_clear_error (error);
ret = TRUE;
*out_contents = NULL;
}
}
goto out;
}
buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
if (g_output_stream_splice ((GOutputStream*)buf, fetch_data.result_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
cancellable, error) < 0)
goto out;
if (add_nul)
{
if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
goto out;
}
if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
goto out;
ret = TRUE;
*out_contents = g_memory_output_stream_steal_as_bytes (buf);
out:
g_clear_object (&(fetch_data.result_stream));
ret = _ostree_fetcher_contents_membuf_sync (pull_data->fetcher,
uri,
add_nul,
allow_noent,
out_contents,
pull_data->loop,
pull_data,
cancellable,
error);
pull_data->fetching_sync_uri = NULL;
return ret;
}
@ -410,7 +341,7 @@ request_metalink_sync (OtPullData *pull_data,
pull_data->fetching_sync_uri = _ostree_metalink_get_uri (metalink);
_ostree_metalink_request_async (metalink, cancellable, on_metalink_fetched, &data);
run_mainloop_monitor_fetcher (pull_data);
g_main_loop_run (pull_data->loop);
return data.success;
}
@ -1278,6 +1209,8 @@ ostree_repo_pull_with_options (OstreeRepo *self,
const char *dir_to_pull = NULL;
char **refs_to_fetch = NULL;
gboolean is_mirror;
GSource *update_timeout = NULL;
GSource *idle_src;
if (options)
{
@ -1634,8 +1567,23 @@ ostree_repo_pull_with_options (OstreeRepo *self,
process_one_static_delta_meta (pull_data, pull_data->static_delta_metas->pdata[i]);
}
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, idle_check_outstanding_requests, pull_data, NULL);
g_source_attach (idle_src, pull_data->main_context);
g_source_unref (idle_src);
if (pull_data->progress)
{
update_timeout = g_timeout_source_new_seconds (1);
g_source_set_priority (update_timeout, G_PRIORITY_HIGH);
g_source_set_callback (update_timeout, update_progress, pull_data, NULL);
g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
g_source_unref (update_timeout);
}
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
g_main_loop_run (pull_data->loop);
if (pull_data->caught_error)
goto out;
g_assert_cmpint (pull_data->n_outstanding_metadata_fetches, ==, 0);
@ -1713,6 +1661,8 @@ ostree_repo_pull_with_options (OstreeRepo *self,
ret = TRUE;
out:
g_main_context_unref (pull_data->main_context);
if (update_timeout)
g_source_destroy (update_timeout);
if (pull_data->loop)
g_main_loop_unref (pull_data->loop);
g_strfreev (configured_branches);