lib/repo-pull: Support queuing delta superblock requests

Just like all the other requests made for delta parts and objects by the
pull code, use a queue for delta superblocks. Currently this doesn’t do
any prioritisation or retries after transient failures, but it could do
in future.

This means that delta superblocks are now subject to the parallel
request limit in the fetcher, which was a problem highlighted here:
https://github.com/ostreedev/ostree/pull/1453#discussion_r168321706.

Signed-off-by: Philip Withnall <withnall@endlessm.com>

Closes: #1600
Approved by: jlebon
This commit is contained in:
Philip Withnall 2018-05-28 16:55:05 +01:00 committed by Atomic Bot
parent 197644c406
commit f342e66c11

View File

@ -127,6 +127,7 @@ typedef struct {
GHashTable *requested_fallback_content; /* Maps checksum to itself */
GHashTable *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */
GHashTable *pending_fetch_content; /* Map<checksum,FetchObjectData> */
GHashTable *pending_fetch_delta_superblocks; /* Set<FetchDeltaSuperData> */
GHashTable *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_write_requests;
@ -206,6 +207,13 @@ typedef struct {
OstreeCollectionRef *requested_ref; /* (nullable) */
} ScanObjectQueueData;
typedef struct {
OtPullData *pull_data;
char *from_revision;
char *to_revision;
OstreeCollectionRef *requested_ref; /* (nullable) */
} FetchDeltaSuperData;
static void
variant_or_null_unref (gpointer data)
{
@ -216,6 +224,8 @@ variant_or_null_unref (gpointer data)
static void start_fetch (OtPullData *pull_data, FetchObjectData *fetch);
static void start_fetch_deltapart (OtPullData *pull_data,
FetchStaticDeltaData *fetch);
static void start_fetch_delta_superblock (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data);
static gboolean fetcher_queue_is_full (OtPullData *pull_data);
static void queue_scan_one_metadata_object (OtPullData *pull_data,
const char *csum,
@ -235,6 +245,8 @@ static void queue_scan_one_metadata_object_c (OtPullData *pull_da
static void enqueue_one_object_request_s (OtPullData *pull_data,
FetchObjectData *fetch_data);
static void enqueue_one_static_delta_superblock_request_s (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data);
static void enqueue_one_static_delta_part_request_s (OtPullData *pull_data,
FetchStaticDeltaData *fetch_data);
@ -391,6 +403,7 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
g_queue_clear (&pull_data->scan_object_queue);
g_hash_table_remove_all (pull_data->pending_fetch_metadata);
g_hash_table_remove_all (pull_data->pending_fetch_delta_superblocks);
g_hash_table_remove_all (pull_data->pending_fetch_deltaparts);
g_hash_table_remove_all (pull_data->pending_fetch_content);
}
@ -423,6 +436,16 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
g_variant_unref (objname);
}
/* Next, process delta superblock requests */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_delta_superblocks);
while (!fetcher_queue_is_full (pull_data) &&
g_hash_table_iter_next (&hiter, &key, &value))
{
FetchDeltaSuperData *fetch = key;
g_hash_table_iter_steal (&hiter);
start_fetch_delta_superblock (pull_data, g_steal_pointer (&fetch));
}
/* Now, process deltapart requests */
g_hash_table_iter_init (&hiter, pull_data->pending_fetch_deltaparts);
while (!fetcher_queue_is_full (pull_data) &&
@ -2648,13 +2671,6 @@ get_best_static_delta_start_for (OtPullData *pull_data,
return TRUE;
}
typedef struct {
OtPullData *pull_data;
char *from_revision;
char *to_revision;
OstreeCollectionRef *requested_ref; /* (nullable) */
} FetchDeltaSuperData;
static void
fetch_delta_super_data_free (FetchDeltaSuperData *fetch_data)
{
@ -2757,6 +2773,59 @@ on_superblock_fetched (GObject *src,
g_clear_pointer (&fetch_data, fetch_delta_super_data_free);
}
static void
start_fetch_delta_superblock (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data)
{
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (fetch_data->from_revision,
fetch_data->to_revision);
_ostree_fetcher_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT,
OSTREE_MAX_METADATA_SIZE,
0, pull_data->cancellable,
on_superblock_fetched,
g_steal_pointer (&fetch_data));
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
static void
enqueue_one_static_delta_superblock_request_s (OtPullData *pull_data,
FetchDeltaSuperData *fetch_data)
{
if (fetcher_queue_is_full (pull_data))
{
g_debug ("queuing fetch of static delta superblock %s-%s",
fetch_data->from_revision ?: "empty",
fetch_data->to_revision);
g_hash_table_add (pull_data->pending_fetch_delta_superblocks,
g_steal_pointer (&fetch_data));
}
else
{
start_fetch_delta_superblock (pull_data, g_steal_pointer (&fetch_data));
}
}
/* Start a request for a static delta */
static void
enqueue_one_static_delta_superblock_request (OtPullData *pull_data,
const char *from_revision,
const char *to_revision,
const OstreeCollectionRef *ref)
{
FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
fdata->pull_data = pull_data;
fdata->from_revision = g_strdup (from_revision);
fdata->to_revision = g_strdup (to_revision);
fdata->requested_ref = (ref != NULL) ? ostree_collection_ref_dup (ref) : NULL;
enqueue_one_static_delta_superblock_request_s (pull_data, g_steal_pointer (&fdata));
}
static gboolean
validate_variant_is_csum (GVariant *csum,
GError **error)
@ -3260,31 +3329,6 @@ reinitialize_fetcher (OtPullData *pull_data, const char *remote_name,
return TRUE;
}
/* Start a request for a static delta */
static void
initiate_delta_request (OtPullData *pull_data,
const char *from_revision,
const char *to_revision,
const OstreeCollectionRef *ref)
{
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
fdata->pull_data = pull_data;
fdata->from_revision = g_strdup (from_revision);
fdata->to_revision = g_strdup (to_revision);
fdata->requested_ref = (ref != NULL) ? ostree_collection_ref_dup (ref) : NULL;
_ostree_fetcher_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT,
OSTREE_MAX_METADATA_SIZE,
0, pull_data->cancellable,
on_superblock_fetched, fdata);
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
/*
* initiate_request:
* @ref: Optional ref name and collection ID
@ -3335,10 +3379,10 @@ initiate_request (OtPullData *pull_data,
}
break;
case DELTA_SEARCH_RESULT_FROM:
initiate_delta_request (pull_data, deltares.from_revision, to_revision, ref);
enqueue_one_static_delta_superblock_request (pull_data, deltares.from_revision, to_revision, ref);
break;
case DELTA_SEARCH_RESULT_SCRATCH:
initiate_delta_request (pull_data, NULL, to_revision, ref);
enqueue_one_static_delta_superblock_request (pull_data, NULL, to_revision, ref);
break;
case DELTA_SEARCH_RESULT_UNCHANGED:
{
@ -3390,14 +3434,14 @@ initiate_request (OtPullData *pull_data,
if (delta_from_revision && g_str_equal (delta_from_revision, to_revision))
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0, ref);
else
initiate_delta_request (pull_data, delta_from_revision ?: NULL, to_revision, ref);
enqueue_one_static_delta_superblock_request (pull_data, delta_from_revision ?: NULL, to_revision, ref);
}
else
{
/* Legacy path without a summary file - let's try a scratch delta, if that
* doesn't work, it'll drop down to object requests.
*/
initiate_delta_request (pull_data, NULL, to_revision, NULL);
enqueue_one_static_delta_superblock_request (pull_data, NULL, to_revision, NULL);
}
return TRUE;
@ -3597,6 +3641,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
(GDestroyNotify)g_variant_unref,
(GDestroyNotify)fetch_object_data_free);
pull_data->pending_fetch_delta_superblocks = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) fetch_delta_super_data_free, NULL);
pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL);
if (opt_localcache_repos && *opt_localcache_repos)
@ -4560,6 +4605,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_delta_superblocks, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
g_queue_foreach (&pull_data->scan_object_queue, (GFunc) scan_object_queue_data_free, NULL);
g_queue_clear (&pull_data->scan_object_queue);