deltas: Use delta indexes when pulling

If there is no delta index in the summary, try to fetch the
delta index for the commit we're going to and use that to find the
delta (if any).
This commit is contained in:
Alexander Larsson 2020-09-01 14:52:49 +02:00
parent 625606a7ec
commit df7f07fc6c
2 changed files with 252 additions and 66 deletions

View File

@ -78,7 +78,8 @@ typedef struct {
char *summary_sig_etag; char *summary_sig_etag;
guint64 summary_sig_last_modified; /* seconds since the epoch */ guint64 summary_sig_last_modified; /* seconds since the epoch */
GVariant *summary; GVariant *summary;
GHashTable *summary_deltas_checksums; GHashTable *summary_deltas_checksums; /* Filled from summary and delta indexes */
gboolean summary_has_deltas; /* True if the summary existed and had a delta index */
GHashTable *ref_original_commits; /* Maps checksum to commit, used by timestamp checks */ GHashTable *ref_original_commits; /* Maps checksum to commit, used by timestamp checks */
GHashTable *verified_commits; /* Set<checksum> of commits that have been verified */ GHashTable *verified_commits; /* Set<checksum> of commits that have been verified */
GHashTable *signapi_verified_commits; /* Map<checksum,verification> of commits that have been signapi verified */ GHashTable *signapi_verified_commits; /* Map<checksum,verification> of commits that have been signapi verified */
@ -93,6 +94,7 @@ typedef struct {
GHashTable *requested_fallback_content; /* Maps checksum to itself */ GHashTable *requested_fallback_content; /* Maps checksum to itself */
GHashTable *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */ GHashTable *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */
GHashTable *pending_fetch_content; /* Map<checksum,FetchObjectData> */ GHashTable *pending_fetch_content; /* Map<checksum,FetchObjectData> */
GHashTable *pending_fetch_delta_indexes; /* Set<FetchDeltaIndexData> */
GHashTable *pending_fetch_delta_superblocks; /* Set<FetchDeltaSuperData> */ GHashTable *pending_fetch_delta_superblocks; /* Set<FetchDeltaSuperData> */
GHashTable *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */ GHashTable *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */
guint n_outstanding_metadata_fetches; guint n_outstanding_metadata_fetches;

View File

@ -104,6 +104,14 @@ typedef struct {
guint n_retries_remaining; guint n_retries_remaining;
} FetchDeltaSuperData; } FetchDeltaSuperData;
typedef struct {
OtPullData *pull_data;
char *from_revision;
char *to_revision;
OstreeCollectionRef *requested_ref; /* (nullable) */
guint n_retries_remaining;
} FetchDeltaIndexData;
static void static void
variant_or_null_unref (gpointer data) variant_or_null_unref (gpointer data)
{ {
@ -116,6 +124,8 @@ static void start_fetch_deltapart (OtPullData *pull_data,
FetchStaticDeltaData *fetch); FetchStaticDeltaData *fetch);
static void start_fetch_delta_superblock (OtPullData *pull_data, static void start_fetch_delta_superblock (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data); FetchDeltaSuperData *fetch_data);
static void start_fetch_delta_index (OtPullData *pull_data,
FetchDeltaIndexData *fetch_data);
static gboolean fetcher_queue_is_full (OtPullData *pull_data); static gboolean fetcher_queue_is_full (OtPullData *pull_data);
static void queue_scan_one_metadata_object (OtPullData *pull_data, static void queue_scan_one_metadata_object (OtPullData *pull_data,
const char *csum, const char *csum,
@ -135,6 +145,8 @@ static void queue_scan_one_metadata_object_c (OtPullData *pull_da
static void enqueue_one_object_request_s (OtPullData *pull_data, static void enqueue_one_object_request_s (OtPullData *pull_data,
FetchObjectData *fetch_data); FetchObjectData *fetch_data);
static void enqueue_one_static_delta_index_request_s (OtPullData *pull_data,
FetchDeltaIndexData *fetch_data);
static void enqueue_one_static_delta_superblock_request_s (OtPullData *pull_data, static void enqueue_one_static_delta_superblock_request_s (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data); FetchDeltaSuperData *fetch_data);
static void enqueue_one_static_delta_part_request_s (OtPullData *pull_data, static void enqueue_one_static_delta_part_request_s (OtPullData *pull_data,
@ -149,6 +161,11 @@ static gboolean scan_one_metadata_object (OtPullData *pull_data,
GCancellable *cancellable, GCancellable *cancellable,
GError **error); GError **error);
static void scan_object_queue_data_free (ScanObjectQueueData *scan_data); static void scan_object_queue_data_free (ScanObjectQueueData *scan_data);
static gboolean initiate_delta_request (OtPullData *pull_data,
const OstreeCollectionRef *ref,
const char *to_revision,
const char *delta_from_revision,
GError **error);
static gboolean static gboolean
update_progress (gpointer user_data) update_progress (gpointer user_data)
@ -287,6 +304,7 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL); g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
g_queue_clear (&pull_data->scan_object_queue); g_queue_clear (&pull_data->scan_object_queue);
g_hash_table_remove_all (pull_data->pending_fetch_metadata); g_hash_table_remove_all (pull_data->pending_fetch_metadata);
g_hash_table_remove_all (pull_data->pending_fetch_delta_indexes);
g_hash_table_remove_all (pull_data->pending_fetch_delta_superblocks); g_hash_table_remove_all (pull_data->pending_fetch_delta_superblocks);
g_hash_table_remove_all (pull_data->pending_fetch_deltaparts); g_hash_table_remove_all (pull_data->pending_fetch_deltaparts);
g_hash_table_remove_all (pull_data->pending_fetch_content); g_hash_table_remove_all (pull_data->pending_fetch_content);
@ -320,6 +338,16 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
g_variant_unref (objname); g_variant_unref (objname);
} }
/* Next, process delta index requests */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_delta_indexes);
while (!fetcher_queue_is_full (pull_data) &&
g_hash_table_iter_next (&hiter, &key, &value))
{
FetchDeltaIndexData *fetch = key;
g_hash_table_iter_steal (&hiter);
start_fetch_delta_index (pull_data, g_steal_pointer (&fetch));
}
/* Next, process delta superblock requests */ /* Next, process delta superblock requests */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_delta_superblocks); g_hash_table_iter_init (&hiter, pull_data->pending_fetch_delta_superblocks);
while (!fetcher_queue_is_full (pull_data) && while (!fetcher_queue_is_full (pull_data) &&
@ -2469,6 +2497,16 @@ fetch_delta_super_data_free (FetchDeltaSuperData *fetch_data)
g_free (fetch_data); g_free (fetch_data);
} }
static void
fetch_delta_index_data_free (FetchDeltaIndexData *fetch_data)
{
g_free (fetch_data->from_revision);
g_free (fetch_data->to_revision);
if (fetch_data->requested_ref)
ostree_collection_ref_free (fetch_data->requested_ref);
g_free (fetch_data);
}
static void static void
set_required_deltas_error (GError **error, set_required_deltas_error (GError **error,
const char *from_revision, const char *from_revision,
@ -2629,6 +2667,146 @@ validate_variant_is_csum (GVariant *csum,
return ostree_validate_structureof_csum_v (csum, error); return ostree_validate_structureof_csum_v (csum, error);
} }
static gboolean
collect_available_deltas_for_pull (OtPullData *pull_data,
GVariant *deltas,
GError **error)
{
gsize n;
n = deltas ? g_variant_n_children (deltas) : 0;
for (gsize i = 0; i < n; i++)
{
const char *delta;
g_autoptr(GVariant) csum_v = NULL;
g_autoptr(GVariant) ref = g_variant_get_child_value (deltas, i);
g_variant_get_child (ref, 0, "&s", &delta);
g_variant_get_child (ref, 1, "v", &csum_v);
if (!validate_variant_is_csum (csum_v, error))
return FALSE;
guchar *csum_data = g_malloc (OSTREE_SHA256_DIGEST_LEN);
memcpy (csum_data, ostree_checksum_bytes_peek (csum_v), 32);
g_hash_table_insert (pull_data->summary_deltas_checksums,
g_strdup (delta),
csum_data);
}
return TRUE;
}
static void
on_delta_index_fetched (GObject *src,
GAsyncResult *res,
gpointer data)
{
FetchDeltaIndexData *fetch_data = data;
OtPullData *pull_data = fetch_data->pull_data;
g_autoptr(GError) local_error = NULL;
GError **error = &local_error;
g_autoptr(GBytes) delta_index_data = NULL;
const char *from_revision = fetch_data->from_revision;
const char *to_revision = fetch_data->to_revision;
if (!_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)src,
res,
&delta_index_data,
NULL, NULL, NULL,
error))
{
if (!g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
goto out;
g_clear_error (&local_error);
/* below call to initiate_delta_request() will fail finding the delta and fall back to commit */
}
else
{
g_autoptr(GVariant) delta_index = g_variant_ref_sink (g_variant_new_from_bytes (G_VARIANT_TYPE_VARDICT, delta_index_data, FALSE));
g_autoptr(GVariant) deltas = g_variant_lookup_value (delta_index, OSTREE_SUMMARY_STATIC_DELTAS, G_VARIANT_TYPE ("a{sv}"));
if (!collect_available_deltas_for_pull (pull_data, deltas, error))
goto out;
}
if (!initiate_delta_request (pull_data,
fetch_data->requested_ref,
to_revision,
from_revision,
&local_error))
goto out;
out:
g_assert (pull_data->n_outstanding_metadata_fetches > 0);
pull_data->n_outstanding_metadata_fetches--;
if (local_error == NULL)
pull_data->n_fetched_metadata++;
if (_ostree_fetcher_should_retry_request (local_error, fetch_data->n_retries_remaining--))
enqueue_one_static_delta_index_request_s (pull_data, g_steal_pointer (&fetch_data));
else
check_outstanding_requests_handle_error (pull_data, &local_error);
g_clear_pointer (&fetch_data, fetch_delta_index_data_free);
}
static void
start_fetch_delta_index (OtPullData *pull_data,
FetchDeltaIndexData *fetch_data)
{
g_autofree char *delta_name =
_ostree_get_relative_static_delta_index_path (fetch_data->to_revision);
_ostree_fetcher_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT,
NULL, 0,
OSTREE_MAX_METADATA_SIZE,
0, pull_data->cancellable,
on_delta_index_fetched,
g_steal_pointer (&fetch_data));
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
static void
enqueue_one_static_delta_index_request_s (OtPullData *pull_data,
FetchDeltaIndexData *fetch_data)
{
if (fetcher_queue_is_full (pull_data))
{
g_debug ("queuing fetch of static delta index to %s",
fetch_data->to_revision);
g_hash_table_add (pull_data->pending_fetch_delta_indexes,
g_steal_pointer (&fetch_data));
}
else
{
start_fetch_delta_index (pull_data, g_steal_pointer (&fetch_data));
}
}
/* Start a request for a static delta index */
static void
enqueue_one_static_delta_index_request (OtPullData *pull_data,
const char *to_revision,
const char *from_revision,
const OstreeCollectionRef *ref)
{
FetchDeltaIndexData *fdata = g_new0(FetchDeltaIndexData, 1);
fdata->pull_data = pull_data;
fdata->from_revision = g_strdup (from_revision);
fdata->to_revision = g_strdup (to_revision);
fdata->requested_ref = (ref != NULL) ? ostree_collection_ref_dup (ref) : NULL;
fdata->n_retries_remaining = pull_data->n_network_retries;
enqueue_one_static_delta_index_request_s (pull_data, g_steal_pointer (&fdata));
}
static gboolean static gboolean
_ostree_repo_verify_summary (OstreeRepo *self, _ostree_repo_verify_summary (OstreeRepo *self,
const char *name, const char *name,
@ -3259,47 +3437,12 @@ reinitialize_fetcher (OtPullData *pull_data, const char *remote_name,
return TRUE; return TRUE;
} }
/*
* initiate_request:
* @ref: Optional ref name and collection ID
* @to_revision: Target commit revision we want to fetch
*
* Start a request for either a ref or a commit. In the
* ref case, we know both the name and the target commit.
*
* This function primarily handles the semantics around
* `disable_static_deltas` and `require_static_deltas`.
*/
static gboolean static gboolean
initiate_request (OtPullData *pull_data, initiate_delta_request (OtPullData *pull_data,
const OstreeCollectionRef *ref, const OstreeCollectionRef *ref,
const char *to_revision, const char *to_revision,
const char *delta_from_revision,
GError **error) GError **error)
{
g_autofree char *delta_from_revision = NULL;
/* Are deltas disabled? OK, just start an object fetch and be done */
if (pull_data->disable_static_deltas)
{
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref);
return TRUE;
}
/* If doing a delta from a ref, look up the from-revision, since we need it
* on most paths below. */
if (ref != NULL)
{
g_autofree char *refspec = NULL;
if (pull_data->remote_name != NULL)
refspec = g_strdup_printf ("%s:%s", pull_data->remote_name, ref->ref_name);
if (!ostree_repo_resolve_rev (pull_data->repo,
refspec ?: ref->ref_name, TRUE,
&delta_from_revision, error))
return FALSE;
}
/* If we have a summary, we can use the newer logic */
if (pull_data->summary)
{ {
DeltaSearchResult deltares; DeltaSearchResult deltares;
@ -3349,6 +3492,61 @@ initiate_request (OtPullData *pull_data,
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref); queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref);
} }
} }
return TRUE;
}
/*
* initiate_request:
* @ref: Optional ref name and collection ID
* @to_revision: Target commit revision we want to fetch
*
* Start a request for either a ref or a commit. In the
* ref case, we know both the name and the target commit.
*
* This function primarily handles the semantics around
* `disable_static_deltas` and `require_static_deltas`.
*/
static gboolean
initiate_request (OtPullData *pull_data,
const OstreeCollectionRef *ref,
const char *to_revision,
GError **error)
{
g_autofree char *delta_from_revision = NULL;
/* Are deltas disabled? OK, just start an object fetch and be done */
if (pull_data->disable_static_deltas)
{
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref);
return TRUE;
}
/* If doing a delta from a ref, look up the from-revision, since we need it
* on most paths below. */
if (ref != NULL)
{
g_autofree char *refspec = NULL;
if (pull_data->remote_name != NULL)
refspec = g_strdup_printf ("%s:%s", pull_data->remote_name, ref->ref_name);
if (!ostree_repo_resolve_rev (pull_data->repo,
refspec ?: ref->ref_name, TRUE,
&delta_from_revision, error))
return FALSE;
}
/* If we have a summary, we can use the newer logic */
if (pull_data->summary)
{
if (!pull_data->summary_has_deltas)
{
enqueue_one_static_delta_index_request (pull_data, to_revision, delta_from_revision, ref);
}
else
{
if (!initiate_delta_request (pull_data, ref, to_revision, delta_from_revision, error))
return FALSE;
}
} }
else if (ref != NULL) else if (ref != NULL)
{ {
@ -3657,6 +3855,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal, pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
(GDestroyNotify)g_variant_unref, (GDestroyNotify)g_variant_unref,
(GDestroyNotify)fetch_object_data_free); (GDestroyNotify)fetch_object_data_free);
pull_data->pending_fetch_delta_indexes = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) fetch_delta_index_data_free, NULL);
pull_data->pending_fetch_delta_superblocks = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) fetch_delta_super_data_free, NULL); pull_data->pending_fetch_delta_superblocks = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) fetch_delta_super_data_free, NULL);
pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL); pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL);
@ -4314,25 +4513,9 @@ ostree_repo_pull_with_options (OstreeRepo *self,
} }
deltas = g_variant_lookup_value (additional_metadata, OSTREE_SUMMARY_STATIC_DELTAS, G_VARIANT_TYPE ("a{sv}")); deltas = g_variant_lookup_value (additional_metadata, OSTREE_SUMMARY_STATIC_DELTAS, G_VARIANT_TYPE ("a{sv}"));
n = deltas ? g_variant_n_children (deltas) : 0; pull_data->summary_has_deltas = deltas != NULL && g_variant_n_children (deltas) > 0;
for (i = 0; i < n; i++) if (!collect_available_deltas_for_pull (pull_data, deltas, error))
{
const char *delta;
g_autoptr(GVariant) csum_v = NULL;
g_autoptr(GVariant) ref = g_variant_get_child_value (deltas, i);
g_variant_get_child (ref, 0, "&s", &delta);
g_variant_get_child (ref, 1, "v", &csum_v);
if (!validate_variant_is_csum (csum_v, error))
goto out; goto out;
guchar *csum_data = g_malloc (OSTREE_SHA256_DIGEST_LEN);
memcpy (csum_data, ostree_checksum_bytes_peek (csum_v), 32);
g_hash_table_insert (pull_data->summary_deltas_checksums,
g_strdup (delta),
csum_data);
}
} }
if (pull_data->summary && if (pull_data->summary &&
@ -4900,6 +5083,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_delta_indexes, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_delta_superblocks, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->pending_fetch_delta_superblocks, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL); g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);