core: Bound number of async imports by CPUs

I was doing a `--ex-unified-core` compose of FAW, and started hitting
the fd limit (1024), since we created hundreds of threads.  Since
it doesn't make sense to do that, let's bound the number of concurrent
imports by the number of CPUs.

This is another implementation of the "bounded async" pattern, like
c18628ecb8

Closes: #1317
Approved by: jlebon
This commit is contained in:
Colin Walters 2018-03-27 15:18:49 -04:00 committed by Atomic Bot
parent defa1dc38a
commit 2b7f33ca9d
2 changed files with 125 additions and 66 deletions

View File

@ -45,7 +45,13 @@ struct _RpmOstreeContext {
gboolean unprivileged;
OstreeSePolicy *sepolicy;
char *passwd_dir;
/* Used in async imports, not owned */
GVariant *rojig_xattr_table;
GHashTable *rojig_pkg_to_xattrs;
guint async_index; /* Offset into array if applicable */
guint n_async_running;
guint n_async_max;
gboolean async_running;
GCancellable *async_cancellable;
GError *async_error;

View File

@ -2269,6 +2269,10 @@ rpmostree_context_get_rojig_checksum (RpmOstreeContext *self)
return self->rojig_checksum;
}
static gboolean
async_imports_mainctx_iter (gpointer user_data);
/* Called on completion of an async import; runs on main thread */
static void
on_async_import_done (GObject *obj,
GAsyncResult *res,
@ -2288,10 +2292,97 @@ on_async_import_done (GObject *obj,
g_assert_cmpint (self->n_async_pkgs_imported, <, self->pkgs_to_import->len);
self->n_async_pkgs_imported++;
g_assert_cmpint (self->n_async_running, >, 0);
self->n_async_running--;
rpmostree_output_progress_n_items ("Importing", self->n_async_pkgs_imported,
self->pkgs_to_import->len);
if (self->n_async_pkgs_imported == self->pkgs_to_import->len)
self->async_running = FALSE;
async_imports_mainctx_iter (self);
}
/* Queue an asynchronous import of a package */
static gboolean
start_async_import_one_package (RpmOstreeContext *self, DnfPackage *pkg,
GCancellable *cancellable,
GError **error)
{
GVariant *rojig_xattrs = NULL;
if (self->rojig_pkg_to_xattrs)
{
rojig_xattrs = g_hash_table_lookup (self->rojig_pkg_to_xattrs, pkg);
if (!rojig_xattrs)
g_error ("Failed to find rojig xattrs for %s", dnf_package_get_nevra (pkg));
}
glnx_fd_close int fd = -1;
if (!rpmostree_context_consume_package (self, pkg, &fd, error))
return FALSE;
/* Only set SKIP_EXTRANEOUS for packages we know need it, so that
* people doing custom composes don't have files silently discarded.
* (This will also likely need to be configurable).
*/
const char *pkg_name = dnf_package_get_name (pkg);
int flags = 0;
if (g_str_equal (pkg_name, "filesystem") ||
g_str_equal (pkg_name, "rootfiles"))
flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS;
{ gboolean docs;
g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs));
if (!docs)
flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS;
}
/* TODO - tweak the unpacker flags for containers */
OstreeRepo *ostreerepo = get_pkgcache_repo (self);
g_autoptr(RpmOstreeImporter) unpacker =
rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags,
self->sepolicy, error);
if (!unpacker)
return FALSE;
if (rojig_xattrs)
{
g_assert (!self->sepolicy);
rpmostree_importer_set_rojig_mode (unpacker, self->rojig_xattr_table, rojig_xattrs);
}
rpmostree_importer_run_async (unpacker, cancellable, on_async_import_done, self);
return TRUE;
}
/* First function run on mainloop, and called after completion as well. Ensures
* that we have a bounded number of tasks concurrently executing until
* finishing.
*/
static gboolean
async_imports_mainctx_iter (gpointer user_data)
{
RpmOstreeContext *self = user_data;
while (self->async_index < self->pkgs_to_import->len &&
self->n_async_running < self->n_async_max &&
self->async_error == NULL)
{
DnfPackage *pkg = self->pkgs_to_import->pdata[self->async_index];
if (!start_async_import_one_package (self, pkg, self->async_cancellable, &self->async_error))
{
g_cancellable_cancel (self->async_cancellable);
break;
}
self->async_index++;
self->n_async_running++;
}
if (self->n_async_running == 0)
{
self->async_running = FALSE;
g_main_context_wakeup (g_main_context_get_thread_default ());
}
return FALSE;
}
gboolean
@ -2318,73 +2409,35 @@ rpmostree_context_import_rojig (RpmOstreeContext *self,
if (!rpmostree_repo_auto_transaction_start (&txn, repo, TRUE, cancellable, error))
return FALSE;
{
self->async_running = TRUE;
self->async_cancellable = cancellable;
self->rojig_xattr_table = rojig_xattr_table;
self->rojig_pkg_to_xattrs = rojig_pkg_to_xattrs;
self->async_running = TRUE;
self->async_index = 0;
self->n_async_running = 0;
/* We're CPU bound, so just use processors */
self->n_async_max = g_get_num_processors ();
self->async_cancellable = cancellable;
for (guint i = 0; i < self->pkgs_to_import->len; i++)
{
DnfPackage *pkg = self->pkgs_to_import->pdata[i];
GVariant *rojig_xattrs = NULL;
if (rojig_pkg_to_xattrs)
{
rojig_xattrs = g_hash_table_lookup (rojig_pkg_to_xattrs, pkg);
if (!rojig_xattrs)
g_error ("Failed to find rojig xattrs for %s", dnf_package_get_nevra (pkg));
}
glnx_fd_close int fd = -1;
if (!rpmostree_context_consume_package (self, pkg, &fd, error))
return FALSE;
/* Only set SKIP_EXTRANEOUS for packages we know need it, so that
* people doing custom composes don't have files silently discarded.
* (This will also likely need to be configurable).
*/
const char *pkg_name = dnf_package_get_name (pkg);
int flags = 0;
if (g_str_equal (pkg_name, "filesystem") ||
g_str_equal (pkg_name, "rootfiles"))
flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS;
{ gboolean docs;
g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs));
if (!docs)
flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS;
}
/* TODO - tweak the unpacker flags for containers */
OstreeRepo *ostreerepo = get_pkgcache_repo (self);
g_autoptr(RpmOstreeImporter) unpacker =
rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags,
self->sepolicy, error);
if (!unpacker)
return FALSE;
if (rojig_xattrs)
{
g_assert (!self->sepolicy);
rpmostree_importer_set_rojig_mode (unpacker, rojig_xattr_table, rojig_xattrs);
}
rpmostree_importer_run_async (unpacker, cancellable, on_async_import_done, self);
}
/* Wait for all of the imports to complete */
GMainContext *mainctx = g_main_context_get_thread_default ();
self->async_error = NULL;
while (self->async_running)
g_main_context_iteration (mainctx, TRUE);
if (self->async_error)
{
g_propagate_error (error, g_steal_pointer (&self->async_error));
return FALSE;
}
rpmostree_output_progress_end ();
/* Process imports */
GMainContext *mainctx = g_main_context_get_thread_default ();
{ g_autoptr(GSource) src = g_timeout_source_new (0);
g_source_set_priority (src, G_PRIORITY_HIGH);
g_source_set_callback (src, async_imports_mainctx_iter, self, NULL);
g_source_attach (src, mainctx); /* Note takes a ref */
}
self->async_error = NULL;
while (self->async_running)
g_main_context_iteration (mainctx, TRUE);
if (self->async_error)
{
g_propagate_error (error, g_steal_pointer (&self->async_error));
return FALSE;
}
rpmostree_output_progress_end ();
if (!ostree_repo_commit_transaction (repo, NULL, cancellable, error))
return FALSE;
txn.initialized = FALSE;