pull: Download and checksum asynchronously

This is quite a noticeable speedup when downloading loose objects.
This commit is contained in:
Colin Walters 2012-05-20 16:21:57 -04:00
parent df54408e5d
commit e485bace01
6 changed files with 813 additions and 144 deletions

View File

@ -53,6 +53,8 @@ if USE_LIBSOUP_GNOME
bin_PROGRAMS += ostree-pull
ostree_pull_SOURCES = src/ostree/ot-main.h \
src/ostree/ot-main.c \
src/ostree/ostree-fetcher.h \
src/ostree/ostree-fetcher.c \
src/ostree/ostree-pull.c
ostree_pull_CFLAGS = $(ostree_bin_shared_cflags) $(OT_DEP_SOUP_CFLAGS)

View File

@ -358,6 +358,50 @@ ot_gio_checksum_stream (GInputStream *in,
return ot_gio_splice_get_checksum (NULL, in, out_csum, cancellable, error);
}
static void
checksum_stream_thread (GSimpleAsyncResult *result,
GObject *object,
GCancellable *cancellable)
{
GError *error = NULL;
guchar *csum;
if (!ot_gio_checksum_stream ((GInputStream*)object, &csum,
cancellable, &error))
g_simple_async_result_take_error (result, error);
else
g_simple_async_result_set_op_res_gpointer (result, csum, g_free);
}
void
ot_gio_checksum_stream_async (GInputStream *in,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *result;
result = g_simple_async_result_new ((GObject*) in,
callback, user_data,
ot_gio_checksum_stream_async);
g_simple_async_result_run_in_thread (result, checksum_stream_thread, io_priority, cancellable);
g_object_unref (result);
}
guchar *
ot_gio_checksum_stream_finish (GInputStream *in,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ot_gio_checksum_stream_async);
return g_memdup (g_simple_async_result_get_op_res_gpointer (simple), 32);
}
gboolean
ot_gfile_merge_dirs (GFile *destination,
GFile *src,

View File

@ -88,6 +88,16 @@ gboolean ot_gio_checksum_stream (GInputStream *in,
GCancellable *cancellable,
GError **error);
void ot_gio_checksum_stream_async (GInputStream *in,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
guchar * ot_gio_checksum_stream_finish (GInputStream *in,
GAsyncResult *result,
GError **error);
gboolean ot_gfile_merge_dirs (GFile *destination,
GFile *src,
GCancellable *cancellable,

313
src/ostree/ostree-fetcher.c Normal file
View File

@ -0,0 +1,313 @@
/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
*
* Copyright (C) 2011 Colin Walters <walters@verbum.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Colin Walters <walters@verbum.org>
*/
#include "config.h"
#include "ostree-fetcher.h"
#include "ostree.h"
typedef enum {
OSTREE_FETCHER_STATE_PENDING,
OSTREE_FETCHER_STATE_DOWNLOADING,
OSTREE_FETCHER_STATE_COMPLETE
} OstreeFetcherState;
typedef struct {
OstreeFetcher *self;
SoupURI *uri;
OstreeFetcherState state;
SoupRequest *request;
GFile *tmpfile;
GInputStream *request_body;
GOutputStream *out_stream;
guint64 content_length;
GCancellable *cancellable;
GSimpleAsyncResult *result;
} OstreeFetcherPendingURI;
static void
pending_uri_free (OstreeFetcherPendingURI *pending)
{
g_clear_object (&pending->self);
g_clear_object (&pending->tmpfile);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_clear_object (&pending->out_stream);
g_clear_object (&pending->cancellable);
g_free (pending);
}
struct OstreeFetcher
{
GObject parent_instance;
GFile *tmpdir;
SoupSession *session;
SoupRequester *requester;
SoupMessage *sending_message;
GHashTable *message_to_request;
guint64 total_downloaded;
};
G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
static void
ostree_fetcher_finalize (GObject *object)
{
OstreeFetcher *self;
self = OSTREE_FETCHER (object);
g_clear_object (&self->session);
G_OBJECT_CLASS (ostree_fetcher_parent_class)->finalize (object);
}
static void
ostree_fetcher_class_init (OstreeFetcherClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->finalize = ostree_fetcher_finalize;
}
static void
on_request_started (SoupSession *session,
SoupMessage *msg,
SoupSocket *socket,
gpointer user_data)
{
OstreeFetcher *self = user_data;
self->sending_message = msg;
}
static void
on_request_unqueued (SoupSession *session,
SoupMessage *msg,
gpointer user_data)
{
OstreeFetcher *self = user_data;
if (msg == self->sending_message)
{
self->sending_message = NULL;
g_hash_table_remove (self->message_to_request, msg);
}
}
static void
ostree_fetcher_init (OstreeFetcher *self)
{
self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
NULL);
self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
g_signal_connect (self->session, "request-started",
G_CALLBACK (on_request_started), self);
g_signal_connect (self->session, "request-unqueued",
G_CALLBACK (on_request_unqueued), self);
self->message_to_request = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)g_object_unref, NULL);
}
OstreeFetcher *
ostree_fetcher_new (GFile *tmpdir)
{
OstreeFetcher *self = (OstreeFetcher*)g_object_new (OSTREE_TYPE_FETCHER, NULL);
self->tmpdir = g_object_ref (tmpdir);
return self;
}
static void
on_splice_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OstreeFetcherPendingURI *pending = user_data;
ot_lobj GFileInfo *file_info = NULL;
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
file_info = g_file_query_info (pending->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
NULL, NULL);
if (file_info)
pending->self->total_downloaded += g_file_info_get_size (file_info);
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_simple_async_result_complete (pending->result);
g_object_unref (pending->result);
}
static void
on_request_sent (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OstreeFetcherPendingURI *pending = user_data;
GError *local_error = NULL;
pending->request_body = soup_request_send_finish ((SoupRequest*) object,
result, &local_error);
if (!pending->request_body)
{
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
g_simple_async_result_take_error (pending->result, local_error);
g_simple_async_result_complete (pending->result);
}
else
{
GOutputStreamSpliceFlags flags = G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET;
pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
pending->content_length = soup_request_get_content_length (pending->request);
/* TODO - make this async */
if (!ostree_create_temp_regular_file (pending->self->tmpdir,
NULL, NULL,
&pending->tmpfile,
&pending->out_stream,
NULL, &local_error))
{
g_simple_async_result_take_error (pending->result, local_error);
g_simple_async_result_complete (pending->result);
return;
}
g_output_stream_splice_async (pending->out_stream, pending->request_body, flags, G_PRIORITY_DEFAULT,
pending->cancellable, on_splice_complete, pending);
}
}
void
ostree_fetcher_request_uri_async (OstreeFetcher *self,
SoupURI *uri,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
OstreeFetcherPendingURI *pending;
GError *local_error = NULL;
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->self = g_object_ref (self);
pending->uri = soup_uri_copy (uri);
pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
g_assert_no_error (local_error);
g_hash_table_insert (self->message_to_request,
soup_request_http_get_message ((SoupRequestHTTP*)pending->request),
pending);
pending->result = g_simple_async_result_new ((GObject*) self,
callback, user_data,
ostree_fetcher_request_uri_async);
g_simple_async_result_set_op_res_gpointer (pending->result, pending,
(GDestroyNotify) pending_uri_free);
soup_request_send_async (pending->request, cancellable,
on_request_sent, pending);
}
GFile *
ostree_fetcher_request_uri_finish (OstreeFetcher *self,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
OstreeFetcherPendingURI *pending;
g_return_val_if_fail (g_simple_async_result_is_valid (result, (GObject*)self, ostree_fetcher_request_uri_async), FALSE);
simple = G_SIMPLE_ASYNC_RESULT (result);
if (g_simple_async_result_propagate_error (simple, error))
return NULL;
pending = g_simple_async_result_get_op_res_gpointer (simple);
return g_object_ref (pending->tmpfile);
}
static char *
format_size_pair (guint64 start,
guint64 max)
{
if (max < 1024)
return g_strdup_printf ("%lu/%lu bytes",
(gulong) start,
(gulong) max);
else
return g_strdup_printf ("%.1f/%.1f KiB", ((double) start) / 1024,
((double) max) / 1024);
}
char *
ostree_fetcher_query_state_text (OstreeFetcher *self)
{
OstreeFetcherPendingURI *active;
if (self->sending_message)
active = g_hash_table_lookup (self->message_to_request, self->sending_message);
else
active = NULL;
if (active)
{
ot_lfree char *active_uri = soup_uri_to_string (active->uri, TRUE);
if (active->tmpfile)
{
ot_lobj GFileInfo *file_info = NULL;
file_info = g_file_query_info (active->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
NULL, NULL);
if (file_info)
{
ot_lfree char *size = format_size_pair (g_file_info_get_size (file_info),
active->content_length);
return g_strdup_printf ("Downloading %s [ %s, %.1f KiB downloaded ]",
active_uri, size, ((double)self->total_downloaded) / 1024);
}
}
else
{
return g_strdup_printf ("Requesting %s [ %.1f KiB downloaded ]",
active_uri, ((double)self->total_downloaded) / 1024);
}
}
return g_strdup_printf ("Idle [ %.1f KiB downloaded ]", ((double)self->total_downloaded) / 1024);
}

View File

@ -0,0 +1,64 @@
/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
*
* Copyright (C) 2012 Colin Walters <walters@verbum.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef _OSTREE_FETCHER
#define _OSTREE_FETCHER
#define LIBSOUP_USE_UNSTABLE_REQUEST_API
#include <libsoup/soup.h>
#include <libsoup/soup-requester.h>
#include <libsoup/soup-request-http.h>
G_BEGIN_DECLS
#define OSTREE_TYPE_FETCHER (ostree_fetcher_get_type ())
#define OSTREE_FETCHER(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), OSTREE_TYPE_FETCHER, OstreeFetcher))
#define OSTREE_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
#define OSTREE_IS_FETCHER(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), OSTREE_TYPE_FETCHER))
#define OSTREE_IS_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), OSTREE_TYPE_FETCHER))
#define OSTREE_FETCHER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
typedef struct OstreeFetcherClass OstreeFetcherClass;
typedef struct OstreeFetcher OstreeFetcher;
struct OstreeFetcherClass
{
GObjectClass parent_class;
};
GType ostree_fetcher_get_type (void) G_GNUC_CONST;
OstreeFetcher *ostree_fetcher_new (GFile *tmpdir);
char * ostree_fetcher_query_state_text (OstreeFetcher *self);
void ostree_fetcher_request_uri_async (OstreeFetcher *self,
SoupURI *uri,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GFile *ostree_fetcher_request_uri_finish (OstreeFetcher *self,
GAsyncResult *result,
GError **error);
G_END_DECLS
#endif

View File

@ -20,13 +20,49 @@
* Author: Colin Walters <walters@verbum.org>
*/
/**
* DESIGN:
*
* Pull refs
* For each ref:
* Pull commit
*
* Pull commits:
* For each commit:
* Verify checksum
* Import
* Traverse and queue dirtree/dirmeta
*
* Pull dirtrees:
* For each dirtree:
* Verify checksum
* Import
* Traverse and queue content/dirtree/dirmeta
*
* Pull content meta:
* For each content:
* Pull meta
* If contentcontent needed:
* Queue contentcontent
* else:
* Import
*
* Pull contentcontent:
* For each contentcontent
* Verify checksum
* Import
*
*
*/
#include "config.h"
#include <libsoup/soup.h>
#include "ostree.h"
#include "ot-main.h"
#include "ostree-fetcher.h"
gboolean verbose;
gboolean opt_prefer_loose;
gboolean opt_related;
@ -43,7 +79,7 @@ static GOptionEntry options[] = {
typedef struct {
OstreeRepo *repo;
char *remote_name;
SoupSession *session;
OstreeFetcher *fetcher;
SoupURI *base_uri;
gboolean fetched_packs;
@ -52,10 +88,22 @@ typedef struct {
GHashTable *file_checksums_to_fetch;
gboolean stdout_is_tty;
GMainLoop *loop;
guint64 dl_current_bytes;
guint64 dl_total_bytes;
/* Used in meta fetch phase */
guint outstanding_uri_requests;
guint outstanding_meta_requests;
/* Used in content fetch phase */
guint outstanding_filemeta_requests;
guint outstanding_filecontent_requests;
guint outstanding_checksum_requests;
GHashTable *loose_files;
GError **async_error;
gboolean caught_error;
gboolean stdout_is_tty;
} OtPullData;
static SoupURI *
@ -96,55 +144,87 @@ suburi_new (SoupURI *base,
return ret;
}
static gboolean
uri_fetch_update_status (gpointer user_data)
{
OtPullData *pull_data = user_data;
ot_lfree char *fetcher_status;
GString *status;
status = g_string_new ("");
if (pull_data->loose_files != NULL)
g_string_append_printf (status, "%u loose files to fetch: ",
g_hash_table_size (pull_data->loose_files)
+ pull_data->outstanding_filemeta_requests
+ pull_data->outstanding_filecontent_requests);
if (pull_data->outstanding_checksum_requests > 0)
g_string_append_printf (status, "Calculating %u checksums; ",
pull_data->outstanding_checksum_requests);
fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
g_string_append (status, fetcher_status);
g_print ("%s\n", status->str);
g_string_free (status, TRUE);
return TRUE;
}
static void
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
{
if (pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_meta_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 &&
pull_data->outstanding_checksum_requests == 0 &&
(pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
g_main_loop_quit (pull_data->loop);
if (error)
{
pull_data->caught_error = TRUE;
if (pull_data->async_error)
g_error_free (error);
else
g_propagate_error (pull_data->async_error, error);
}
}
static void
run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
GSource *update_timeout = NULL;
update_timeout = g_timeout_source_new_seconds (1);
g_source_set_callback (update_timeout, uri_fetch_update_status, pull_data, NULL);
g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
g_source_unref (update_timeout);
g_main_loop_run (pull_data->loop);
g_source_destroy (update_timeout);
}
typedef struct {
OtPullData *pull_data;
GOutputStream *stream;
gboolean had_error;
GError **error;
} OstreeSoupChunkData;
GFile *result_file;
} OstreeFetchUriData;
static void
sync_progress (OtPullData *pull_data)
uri_fetch_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
if (pull_data->stdout_is_tty)
{
g_print ("%c8%" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT " KiB",
0x1b, (pull_data->dl_current_bytes / 1024), (pull_data->dl_total_bytes / 1024));
}
}
OstreeFetchUriData *data = user_data;
GError *local_error = NULL;
static void
on_got_chunk (SoupMessage *msg,
SoupBuffer *buf,
gpointer user_data)
{
OstreeSoupChunkData *data = user_data;
gsize bytes_written;
data->pull_data->dl_current_bytes += buf->length;
sync_progress (data->pull_data);
if (!g_output_stream_write_all (data->stream, buf->data, buf->length,
&bytes_written, NULL, data->error))
{
data->had_error = TRUE;
soup_session_cancel_message (data->pull_data->session, msg, 500);
}
}
static void
on_got_content_length (SoupMessage *msg,
OtPullData *pull_data)
{
goffset size;
g_assert (msg->response_headers);
size = soup_message_headers_get_content_length (msg->response_headers);
if (size > 0)
pull_data->dl_total_bytes = (guint64) size;
sync_progress (pull_data);
data->result_file = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object,
result, &local_error);
data->pull_data->outstanding_uri_requests--;
check_outstanding_requests_handle_error (data->pull_data, local_error);
}
static gboolean
@ -156,67 +236,31 @@ fetch_uri (OtPullData *pull_data,
GError **error)
{
gboolean ret = FALSE;
guint response;
ot_lfree char *uri_string = NULL;
ot_lobj GFile *ret_temp_filename = NULL;
ot_lobj GOutputStream *output_stream = NULL;
ot_lobj SoupMessage *msg = NULL;
OstreeSoupChunkData chunkdata;
ot_lobj SoupRequest *request = NULL;
OstreeFetchUriData fetch_data;
if (!ostree_create_temp_regular_file (ostree_repo_get_tmpdir (pull_data->repo),
tmp_prefix, NULL,
&ret_temp_filename,
&output_stream,
NULL, error))
goto out;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
memset (&fetch_data, 0, sizeof (fetch_data));
fetch_data.pull_data = pull_data;
chunkdata.pull_data = pull_data;
chunkdata.stream = output_stream;
chunkdata.had_error = FALSE;
chunkdata.error = error;
uri_string = soup_uri_to_string (uri, FALSE);
g_print ("Fetching %s\n", uri_string);
if (pull_data->stdout_is_tty)
{
g_print ("%c7", 0x1B);
g_print ("0/? KiB");
pull_data->dl_current_bytes = 0;
pull_data->dl_total_bytes = 0;
sync_progress (pull_data);
}
pull_data->outstanding_uri_requests++;
ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
uri_fetch_on_complete, &fetch_data);
msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
run_mainloop_monitor_fetcher (pull_data);
soup_message_body_set_accumulate (msg->response_body, FALSE);
soup_message_add_header_handler (msg, "got-headers",
"Content-Length",
G_CALLBACK (on_got_content_length),
pull_data);
g_signal_connect (msg, "got-chunk", G_CALLBACK (on_got_chunk), &chunkdata);
response = soup_session_send_message (pull_data->session, msg);
if (response != 200)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to retrieve '%s': %d %s",
uri_string, response, msg->reason_phrase);
goto out;
}
if (!g_output_stream_close (output_stream, NULL, error))
if (pull_data->caught_error)
goto out;
if (pull_data->stdout_is_tty)
g_print ("\n");
ret = TRUE;
ot_transfer_out_value (out_temp_filename, &ret_temp_filename);
ot_transfer_out_value (out_temp_filename, &fetch_data.result_file);
out:
if (ret_temp_filename)
(void) unlink (ot_gfile_get_path_cached (ret_temp_filename));
return ret;
}
@ -924,6 +968,231 @@ store_file_from_pack (OtPullData *pull_data,
return ret;
}
typedef struct {
OtPullData *pull_data;
gboolean fetching_content;
GFile *meta_path;
GFile *content_path;
char *checksum;
} OtFetchOneContentItemData;
static void
destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
{
if (data->meta_path)
(void) ot_gfile_unlink (data->meta_path, NULL, NULL);
g_clear_object (&data->meta_path);
if (data->content_path)
(void) ot_gfile_unlink (data->content_path, NULL, NULL);
g_clear_object (&data->content_path);
g_free (data->checksum);
g_free (data);
}
static void
content_fetch_on_checksum_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OtFetchOneContentItemData *data = user_data;
GError *local_error = NULL;
GError **error = &local_error;
guint64 length;
GCancellable *cancellable = NULL;
ot_lfree guchar *csum;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lvariant GVariant *xattrs = NULL;
ot_lobj GInputStream *content_input = NULL;
ot_lobj GInputStream *file_object_input = NULL;
ot_lfree char *checksum;
csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
if (!csum)
goto out;
if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
&file_meta, error))
goto out;
if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
goto out;
if (data->content_path)
{
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
&file_object_input, &length,
cancellable, error))
goto out;
checksum = ostree_checksum_from_bytes (csum);
if (strcmp (checksum, data->checksum) != 0)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Corrupted object %s (actual checksum is %s)",
data->checksum, checksum);
goto out;
}
if (!ostree_repo_stage_file_object_trusted (data->pull_data->repo, checksum,
FALSE, file_object_input, length,
cancellable, error))
goto out;
out:
data->pull_data->outstanding_checksum_requests--;
check_outstanding_requests_handle_error (data->pull_data, local_error);
destroy_fetch_one_content_item_data (data);
}
static void
enqueue_loose_meta_requests (OtPullData *pull_data);
static void
content_fetch_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OtFetchOneContentItemData *data = user_data;
GError *local_error = NULL;
GError **error = &local_error;
GCancellable *cancellable = NULL;
gboolean was_content_fetch = FALSE;
gboolean need_content_fetch = FALSE;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lobj GInputStream *content_input = NULL;
ot_lobj GInputStream *file_object_input = NULL;
ot_lvariant GVariant *xattrs = NULL;
was_content_fetch = data->fetching_content;
if (was_content_fetch)
{
data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
if (!data->content_path)
goto out;
}
else
{
data->meta_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
if (!data->meta_path)
goto out;
}
if (!was_content_fetch)
{
if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
&file_meta, error))
goto out;
if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
goto out;
if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
{
ot_lfree char *content_path = ostree_get_relative_archive_content_path (data->checksum);
SoupURI *content_uri;
content_uri = suburi_new (data->pull_data->base_uri, content_path, NULL);
data->pull_data->outstanding_filecontent_requests++;
need_content_fetch = TRUE;
data->fetching_content = TRUE;
ostree_fetcher_request_uri_async (data->pull_data->fetcher, content_uri, cancellable,
content_fetch_on_complete, data);
soup_uri_free (content_uri);
}
}
if (!need_content_fetch)
{
if (data->content_path)
{
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
}
if (file_meta == NULL)
{
if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
&file_meta, error))
goto out;
if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
goto out;
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
&file_object_input, NULL,
cancellable, error))
goto out;
data->pull_data->outstanding_checksum_requests++;
ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
content_fetch_on_checksum_complete, data);
}
out:
if (was_content_fetch)
data->pull_data->outstanding_filecontent_requests--;
else
{
data->pull_data->outstanding_filemeta_requests--;
enqueue_loose_meta_requests (data->pull_data);
}
check_outstanding_requests_handle_error (data->pull_data, local_error);
}
static void
enqueue_loose_meta_requests (OtPullData *pull_data)
{
GHashTableIter hash_iter;
gpointer key, value;
GCancellable *cancellable = NULL;
g_hash_table_iter_init (&hash_iter, pull_data->loose_files);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *checksum = key;
ot_lfree char *objpath = NULL;
SoupURI *obj_uri = NULL;
OtFetchOneContentItemData *one_item_data;
one_item_data = g_new0 (OtFetchOneContentItemData, 1);
one_item_data->pull_data = pull_data;
one_item_data->checksum = g_strdup (checksum);
one_item_data->fetching_content = FALSE;
objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, cancellable,
content_fetch_on_complete, one_item_data);
soup_uri_free (obj_uri);
pull_data->outstanding_filemeta_requests++;
g_hash_table_iter_remove (&hash_iter);
/* Don't let too many requests queue up; when we're fetching
* files we need to process the actual content.
*/
if (pull_data->outstanding_filemeta_requests > 20)
break;
}
}
static gboolean
fetch_content (OtPullData *pull_data,
GCancellable *cancellable,
@ -1015,51 +1284,16 @@ fetch_content (OtPullData *pull_data,
if (g_hash_table_size (loose_files) > 0)
g_print ("Fetching %u loose objects\n",
g_hash_table_size (loose_files));
g_hash_table_iter_init (&hash_iter, loose_files);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
pull_data->loose_files = loose_files;
if (g_hash_table_size (loose_files) > 0)
{
const char *checksum = key;
guint64 length;
ot_lobj GInputStream *file_object_input = NULL;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lvariant GVariant *xattrs = NULL;
ot_lobj GInputStream *content_input = NULL;
enqueue_loose_meta_requests (pull_data);
if (!fetch_loose_object (pull_data, checksum, OSTREE_OBJECT_TYPE_FILE, &temp_path,
cancellable, error))
goto out;
run_mainloop_monitor_fetcher (pull_data);
if (!ot_util_variant_map (temp_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
&file_meta, error))
goto out;
if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
goto out;
if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
{
ot_lfree char *content_path = ostree_get_relative_archive_content_path (checksum);
content_uri = suburi_new (pull_data->base_uri, content_path, NULL);
if (!fetch_uri (pull_data, content_uri, "filecontent", &content_temp_path,
cancellable, error))
goto out;
content_input = (GInputStream*)g_file_read (content_temp_path, cancellable, error);
if (!content_input)
goto out;
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
&file_object_input, &length,
cancellable, error))
goto out;
if (!ostree_repo_stage_file_object (pull_data->repo, checksum,
file_object_input, length,
cancellable, error))
if (pull_data->caught_error)
goto out;
}
@ -1203,6 +1437,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
if (!ostree_repo_check (repo, error))
goto out;
pull_data->async_error = error;
pull_data->loop = g_main_loop_new (NULL, FALSE);
pull_data->repo = repo;
pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
@ -1215,10 +1452,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
pull_data->stdout_is_tty = isatty (1);
pull_data->remote_name = g_strdup (argv[1]);
pull_data->session = soup_session_sync_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_COOKIE_JAR,
NULL);
pull_data->fetcher = ostree_fetcher_new (ostree_repo_get_tmpdir (pull_data->repo));
config = ostree_repo_get_config (repo);
remote_key = g_strdup_printf ("remote \"%s\"", pull_data->remote_name);
@ -1384,10 +1618,12 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
ret = TRUE;
out:
if (pull_data->loop)
g_main_loop_unref (pull_data->loop);
g_strfreev (configured_branches);
if (context)
g_option_context_free (context);
g_clear_object (&pull_data->session);
g_clear_object (&pull_data->fetcher);
g_free (pull_data->remote_name);
if (pull_data->base_uri)
soup_uri_free (pull_data->base_uri);