pull: Rework delta superblock fetches to be async

For the pending libcurl port, the backend is a bit more
sensitive to the main context setup.  The delta superblock
fetch here is a synchronous request in the section that's
supposed to be async.

Now, libsoup definitely supports mixing sync and async requests, but it wasn't
hard to help the libcurl port here by making this one async. Now fetchers are
either sync or async.

Closes: #636
Approved by: jlebon
This commit is contained in:
Colin Walters 2017-01-03 10:45:58 -05:00 committed by Atomic Bot
parent ced22f6c9b
commit d9f43cd2fb

View File

@ -67,6 +67,7 @@ typedef struct {
gint n_scanned_metadata;
gboolean gpg_verify;
gboolean require_static_deltas;
gboolean gpg_verify_summary;
gboolean has_tombstone_commits;
@ -178,6 +179,10 @@ update_progress (gpointer user_data)
if (! pull_data->progress)
return FALSE;
/* In dry run, we only emit progress once metadata is done */
if (pull_data->dry_run && pull_data->n_outstanding_metadata_fetches > 0)
return TRUE;
outstanding_writes = pull_data->n_outstanding_content_write_requests +
pull_data->n_outstanding_metadata_write_requests +
pull_data->n_outstanding_deltapart_write_requests;
@ -1416,75 +1421,6 @@ load_remote_repo_config (OtPullData *pull_data,
return ret;
}
static gboolean
request_static_delta_superblock_sync (OtPullData *pull_data,
const char *from_revision,
const char *to_revision,
GVariant **out_delta_superblock,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
g_autoptr(GVariant) ret_delta_superblock = NULL;
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
g_autoptr(GBytes) delta_superblock_data = NULL;
if (!_ostree_fetcher_mirrored_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, FALSE, TRUE,
&delta_superblock_data,
OSTREE_MAX_METADATA_SIZE,
pull_data->cancellable, error))
goto out;
if (delta_superblock_data)
{
{
g_autofree gchar *delta = NULL;
g_autofree guchar *ret_csum = NULL;
guchar *summary_csum;
g_autoptr (GInputStream) summary_is = NULL;
summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL),
g_bytes_get_size (delta_superblock_data),
NULL);
if (!ot_gio_checksum_stream (summary_is, &ret_csum, cancellable, error))
goto out;
delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL);
summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta);
/* At this point we've GPG verified the data, so in theory
* could trust that they provided the right data, but let's
* make this a hard error.
*/
if (pull_data->gpg_verify_summary && !summary_csum)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)");
goto out;
}
if (summary_csum && memcmp (summary_csum, ret_csum, 32))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta);
goto out;
}
}
ret_delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT,
delta_superblock_data, FALSE));
}
ret = TRUE;
if (out_delta_superblock)
*out_delta_superblock = g_steal_pointer (&ret_delta_superblock);
out:
return ret;
}
static gboolean
process_one_static_delta_fallback (OtPullData *pull_data,
gboolean delta_byteswap,
@ -1749,6 +1685,100 @@ process_one_static_delta (OtPullData *pull_data,
return ret;
}
typedef struct {
OtPullData *pull_data;
char *from_revision;
char *to_revision;
} FetchDeltaSuperData;
static void
on_superblock_fetched (GObject *src,
GAsyncResult *res,
gpointer data)
{
FetchDeltaSuperData *fdata = data;
OtPullData *pull_data = fdata->pull_data;
GError *local_error = NULL;
GError **error = &local_error;
g_autoptr(GBytes) delta_superblock_data = NULL;
const char *from_revision = fdata->from_revision;
const char *to_revision = fdata->to_revision;
if (!_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)src,
res,
&delta_superblock_data,
error))
{
if (!g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
goto out;
g_clear_error (&local_error);
if (pull_data->require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Static deltas required, but none found for %s to %s",
from_revision, to_revision);
goto out;
}
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
}
else
{
g_autofree gchar *delta = NULL;
g_autofree guchar *ret_csum = NULL;
guchar *summary_csum;
g_autoptr (GInputStream) summary_is = NULL;
g_autoptr(GVariant) delta_superblock = NULL;
summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL),
g_bytes_get_size (delta_superblock_data),
NULL);
if (!ot_gio_checksum_stream (summary_is, &ret_csum, pull_data->cancellable, error))
goto out;
delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL);
summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta);
/* At this point we've GPG verified the data, so in theory
* could trust that they provided the right data, but let's
* make this a hard error.
*/
if (pull_data->gpg_verify_summary && !summary_csum)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)");
goto out;
}
if (summary_csum && memcmp (summary_csum, ret_csum, 32))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta);
goto out;
}
delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT,
delta_superblock_data, FALSE));
g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock));
if (!process_one_static_delta (pull_data, from_revision, to_revision, delta_superblock,
pull_data->cancellable, error))
goto out;
}
out:
g_free (fdata->from_revision);
g_free (fdata->to_revision);
g_free (fdata);
g_assert (pull_data->n_outstanding_metadata_fetches > 0);
pull_data->n_outstanding_metadata_fetches--;
pull_data->n_fetched_metadata++;
check_outstanding_requests_handle_error (pull_data, local_error);
}
static gboolean
validate_variant_is_csum (GVariant *csum,
GError **error)
@ -2344,7 +2374,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_autofree char **override_commit_ids = NULL;
GSource *update_timeout = NULL;
gboolean disable_static_deltas = FALSE;
gboolean require_static_deltas = FALSE;
gboolean opt_gpg_verify_set = FALSE;
gboolean opt_gpg_verify_summary_set = FALSE;
const char *url_override = NULL;
@ -2368,7 +2397,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_variant_lookup (options, "gpg-verify-summary", "b", &pull_data->gpg_verify_summary);
(void) g_variant_lookup (options, "depth", "i", &pull_data->maxdepth);
(void) g_variant_lookup (options, "disable-static-deltas", "b", &disable_static_deltas);
(void) g_variant_lookup (options, "require-static-deltas", "b", &require_static_deltas);
(void) g_variant_lookup (options, "require-static-deltas", "b", &pull_data->require_static_deltas);
(void) g_variant_lookup (options, "override-commit-ids", "^a&s", &override_commit_ids);
(void) g_variant_lookup (options, "dry-run", "b", &pull_data->dry_run);
(void) g_variant_lookup (options, "override-url", "&s", &url_override);
@ -2386,11 +2415,11 @@ ostree_repo_pull_with_options (OstreeRepo *self,
for (i = 0; dirs_to_pull != NULL && dirs_to_pull[i] != NULL; i++)
g_return_val_if_fail (dirs_to_pull[i][0] == '/', FALSE);
g_return_val_if_fail (!(disable_static_deltas && require_static_deltas), FALSE);
g_return_val_if_fail (!(disable_static_deltas && pull_data->require_static_deltas), FALSE);
/* We only do dry runs with static deltas, because we don't really have any
* in-advance information for bare fetches.
*/
g_return_val_if_fail (!pull_data->dry_run || require_static_deltas, FALSE);
g_return_val_if_fail (!pull_data->dry_run || pull_data->require_static_deltas, FALSE);
pull_data->is_mirror = (flags & OSTREE_REPO_PULL_FLAGS_MIRROR) > 0;
pull_data->is_commit_only = (flags & OSTREE_REPO_PULL_FLAGS_COMMIT_ONLY) > 0;
@ -2655,7 +2684,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
/* For local pulls, default to disabling static deltas so that the
* exact object files are copied.
*/
if (pull_data->remote_repo_local && !require_static_deltas)
if (pull_data->remote_repo_local && !pull_data->require_static_deltas)
disable_static_deltas = TRUE;
pull_data->static_delta_superblocks = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref);
@ -2710,7 +2739,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
goto out;
}
if (!bytes_summary && require_static_deltas)
if (!bytes_summary && pull_data->require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Fetch configured to require static deltas, but no summary found");
@ -2939,7 +2968,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_autofree char *from_revision = NULL;
const char *ref = key;
const char *to_revision = value;
g_autoptr(GVariant) delta_superblock = NULL;
if (!ostree_repo_resolve_rev (pull_data->repo, ref, TRUE,
&from_revision, error))
@ -2948,31 +2976,25 @@ ostree_repo_pull_with_options (OstreeRepo *self,
if (!(disable_static_deltas || mirroring_into_archive || pull_data->is_commit_only) &&
(from_revision == NULL || g_strcmp0 (from_revision, to_revision) != 0))
{
if (!request_static_delta_superblock_sync (pull_data, from_revision, to_revision,
&delta_superblock, cancellable, error))
goto out;
}
if (!delta_superblock)
{
if (require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Static deltas required, but none found for %s to %s",
from_revision, to_revision);
goto out;
}
g_debug ("no delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision);
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
fdata->pull_data = pull_data;
fdata->from_revision = g_strdup (from_revision);
fdata->to_revision = g_strdup (to_revision);
_ostree_fetcher_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, 0,
OSTREE_MAX_METADATA_SIZE,
0, pull_data->cancellable,
on_superblock_fetched, fdata);
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
else
{
g_debug ("processing delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision);
g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock));
if (!process_one_static_delta (pull_data, from_revision, to_revision,
delta_superblock,
cancellable, error))
goto out;
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
}
}