From 2b7f33ca9d993fd2297ad258058fab6f619f8a90 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 27 Mar 2018 15:18:49 -0400 Subject: [PATCH] core: Bound number of async imports by CPUs I was doing a `--ex-unified-core` compose of FAW, and started hitting the fd limit (1024), since we created hundreds of threads. Since it doesn't make sense to do that, let's bound the number of concurrent imports by the number of CPUs. This is another implementation of the "bounded async" pattern, like https://github.com/ostreedev/ostree/commit/c18628ecb8b2ef591db0444fa57052cba60807a8 Closes: #1317 Approved by: jlebon --- src/libpriv/rpmostree-core-private.h | 6 + src/libpriv/rpmostree-core.c | 185 +++++++++++++++++---------- 2 files changed, 125 insertions(+), 66 deletions(-) diff --git a/src/libpriv/rpmostree-core-private.h b/src/libpriv/rpmostree-core-private.h index 3683e417..e10973f9 100644 --- a/src/libpriv/rpmostree-core-private.h +++ b/src/libpriv/rpmostree-core-private.h @@ -45,7 +45,13 @@ struct _RpmOstreeContext { gboolean unprivileged; OstreeSePolicy *sepolicy; char *passwd_dir; + /* Used in async imports, not owned */ + GVariant *rojig_xattr_table; + GHashTable *rojig_pkg_to_xattrs; + guint async_index; /* Offset into array if applicable */ + guint n_async_running; + guint n_async_max; gboolean async_running; GCancellable *async_cancellable; GError *async_error; diff --git a/src/libpriv/rpmostree-core.c b/src/libpriv/rpmostree-core.c index c748b3d1..d78b865c 100644 --- a/src/libpriv/rpmostree-core.c +++ b/src/libpriv/rpmostree-core.c @@ -2269,6 +2269,10 @@ rpmostree_context_get_rojig_checksum (RpmOstreeContext *self) return self->rojig_checksum; } +static gboolean +async_imports_mainctx_iter (gpointer user_data); + +/* Called on completion of an async import; runs on main thread */ static void on_async_import_done (GObject *obj, GAsyncResult *res, @@ -2288,10 +2292,97 @@ on_async_import_done (GObject *obj, g_assert_cmpint (self->n_async_pkgs_imported, <, self->pkgs_to_import->len); self->n_async_pkgs_imported++; + g_assert_cmpint (self->n_async_running, >, 0); + self->n_async_running--; rpmostree_output_progress_n_items ("Importing", self->n_async_pkgs_imported, self->pkgs_to_import->len); - if (self->n_async_pkgs_imported == self->pkgs_to_import->len) - self->async_running = FALSE; + async_imports_mainctx_iter (self); +} + +/* Queue an asynchronous import of a package */ +static gboolean +start_async_import_one_package (RpmOstreeContext *self, DnfPackage *pkg, + GCancellable *cancellable, + GError **error) +{ + GVariant *rojig_xattrs = NULL; + if (self->rojig_pkg_to_xattrs) + { + rojig_xattrs = g_hash_table_lookup (self->rojig_pkg_to_xattrs, pkg); + if (!rojig_xattrs) + g_error ("Failed to find rojig xattrs for %s", dnf_package_get_nevra (pkg)); + } + + glnx_fd_close int fd = -1; + if (!rpmostree_context_consume_package (self, pkg, &fd, error)) + return FALSE; + + /* Only set SKIP_EXTRANEOUS for packages we know need it, so that + * people doing custom composes don't have files silently discarded. + * (This will also likely need to be configurable). + */ + const char *pkg_name = dnf_package_get_name (pkg); + + int flags = 0; + if (g_str_equal (pkg_name, "filesystem") || + g_str_equal (pkg_name, "rootfiles")) + flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS; + + { gboolean docs; + g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs)); + if (!docs) + flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS; + } + + /* TODO - tweak the unpacker flags for containers */ + OstreeRepo *ostreerepo = get_pkgcache_repo (self); + g_autoptr(RpmOstreeImporter) unpacker = + rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags, + self->sepolicy, error); + if (!unpacker) + return FALSE; + + if (rojig_xattrs) + { + g_assert (!self->sepolicy); + rpmostree_importer_set_rojig_mode (unpacker, self->rojig_xattr_table, rojig_xattrs); + } + + rpmostree_importer_run_async (unpacker, cancellable, on_async_import_done, self); + + return TRUE; +} + +/* First function run on mainloop, and called after completion as well. Ensures + * that we have a bounded number of tasks concurrently executing until + * finishing. + */ +static gboolean +async_imports_mainctx_iter (gpointer user_data) +{ + RpmOstreeContext *self = user_data; + + while (self->async_index < self->pkgs_to_import->len && + self->n_async_running < self->n_async_max && + self->async_error == NULL) + { + DnfPackage *pkg = self->pkgs_to_import->pdata[self->async_index]; + if (!start_async_import_one_package (self, pkg, self->async_cancellable, &self->async_error)) + { + g_cancellable_cancel (self->async_cancellable); + break; + } + self->async_index++; + self->n_async_running++; + } + + if (self->n_async_running == 0) + { + self->async_running = FALSE; + g_main_context_wakeup (g_main_context_get_thread_default ()); + } + + return FALSE; } gboolean @@ -2318,73 +2409,35 @@ rpmostree_context_import_rojig (RpmOstreeContext *self, if (!rpmostree_repo_auto_transaction_start (&txn, repo, TRUE, cancellable, error)) return FALSE; - { - self->async_running = TRUE; - self->async_cancellable = cancellable; + self->rojig_xattr_table = rojig_xattr_table; + self->rojig_pkg_to_xattrs = rojig_pkg_to_xattrs; + self->async_running = TRUE; + self->async_index = 0; + self->n_async_running = 0; + /* We're CPU bound, so just use processors */ + self->n_async_max = g_get_num_processors (); + self->async_cancellable = cancellable; - for (guint i = 0; i < self->pkgs_to_import->len; i++) - { - DnfPackage *pkg = self->pkgs_to_import->pdata[i]; - GVariant *rojig_xattrs = NULL; - if (rojig_pkg_to_xattrs) - { - rojig_xattrs = g_hash_table_lookup (rojig_pkg_to_xattrs, pkg); - if (!rojig_xattrs) - g_error ("Failed to find rojig xattrs for %s", dnf_package_get_nevra (pkg)); - } - - glnx_fd_close int fd = -1; - if (!rpmostree_context_consume_package (self, pkg, &fd, error)) - return FALSE; - - /* Only set SKIP_EXTRANEOUS for packages we know need it, so that - * people doing custom composes don't have files silently discarded. - * (This will also likely need to be configurable). - */ - const char *pkg_name = dnf_package_get_name (pkg); - - int flags = 0; - if (g_str_equal (pkg_name, "filesystem") || - g_str_equal (pkg_name, "rootfiles")) - flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS; - - { gboolean docs; - g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs)); - if (!docs) - flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS; - } - - /* TODO - tweak the unpacker flags for containers */ - OstreeRepo *ostreerepo = get_pkgcache_repo (self); - g_autoptr(RpmOstreeImporter) unpacker = - rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags, - self->sepolicy, error); - if (!unpacker) - return FALSE; - - if (rojig_xattrs) - { - g_assert (!self->sepolicy); - rpmostree_importer_set_rojig_mode (unpacker, rojig_xattr_table, rojig_xattrs); - } - - rpmostree_importer_run_async (unpacker, cancellable, on_async_import_done, self); - } - - /* Wait for all of the imports to complete */ - GMainContext *mainctx = g_main_context_get_thread_default (); - self->async_error = NULL; - while (self->async_running) - g_main_context_iteration (mainctx, TRUE); - if (self->async_error) - { - g_propagate_error (error, g_steal_pointer (&self->async_error)); - return FALSE; - } - - rpmostree_output_progress_end (); + /* Process imports */ + GMainContext *mainctx = g_main_context_get_thread_default (); + { g_autoptr(GSource) src = g_timeout_source_new (0); + g_source_set_priority (src, G_PRIORITY_HIGH); + g_source_set_callback (src, async_imports_mainctx_iter, self, NULL); + g_source_attach (src, mainctx); /* Note takes a ref */ } + self->async_error = NULL; + while (self->async_running) + g_main_context_iteration (mainctx, TRUE); + if (self->async_error) + { + g_propagate_error (error, g_steal_pointer (&self->async_error)); + return FALSE; + } + + rpmostree_output_progress_end (); + + if (!ostree_repo_commit_transaction (repo, NULL, cancellable, error)) return FALSE; txn.initialized = FALSE;