static deltas: Process each part as soon as its done

Directly when we allocate a new part we finish the old one,
writing the compressed data to a temporary file and generating
the delta header for it.

When all these are done we loop over them and collect the headers,
sizes and either copy the tempfile data into the inlined superblock
or link the tempfiles to disk with the proper names.

Closes: #1309
Approved by: cgwalters
This commit is contained in:
Alexander Larsson 2017-10-25 22:32:02 +02:00 committed by Atomic Bot
parent cbbd159a5d
commit de0e015908

View File

@ -44,6 +44,7 @@ typedef enum {
} DeltaOpts;
typedef struct {
guint64 compressed_size;
guint64 uncompressed_size;
GPtrArray *objects;
GString *payload;
@ -52,6 +53,8 @@ typedef struct {
GPtrArray *modes;
GHashTable *xattr_set; /* GVariant(ayay) -> offset */
GPtrArray *xattrs;
GLnxTmpfile part_tmpf;
GVariant *header;
} OstreeStaticDeltaPartBuilder;
typedef struct {
@ -66,6 +69,8 @@ typedef struct {
guint n_bsdiff;
guint n_fallback;
gboolean swap_endian;
int parts_dfd;
DeltaOpts delta_opts;
} OstreeStaticDeltaBuilder;
/* Get an input stream for a GVariant */
@ -119,6 +124,9 @@ ostree_static_delta_part_builder_unref (OstreeStaticDeltaPartBuilder *part_build
g_ptr_array_unref (part_builder->modes);
g_hash_table_unref (part_builder->xattr_set);
g_ptr_array_unref (part_builder->xattrs);
glnx_tmpfile_clear (&part_builder->part_tmpf);
if (part_builder->header)
g_variant_unref (part_builder->header);
g_free (part_builder);
}
@ -200,10 +208,123 @@ xattr_chunk_equals (const void *one, const void *two)
return memcmp (g_variant_get_data (v1), g_variant_get_data (v2), l1) == 0;
}
static gboolean
finish_part (OstreeStaticDeltaBuilder *builder, GError **error)
{
OstreeStaticDeltaPartBuilder *part_builder = builder->parts->pdata[builder->parts->len - 1];
g_autofree guchar *part_checksum = NULL;
g_autoptr(GBytes) objtype_checksum_array = NULL;
g_autoptr(GBytes) checksum_bytes = NULL;
g_autoptr(GOutputStream) part_temp_outstream = NULL;
g_autoptr(GInputStream) part_in = NULL;
g_autoptr(GInputStream) part_payload_in = NULL;
g_autoptr(GMemoryOutputStream) part_payload_out = NULL;
g_autoptr(GConverterOutputStream) part_payload_compressor = NULL;
g_autoptr(GConverter) compressor = NULL;
g_autoptr(GVariant) delta_part_content = NULL;
g_autoptr(GVariant) delta_part = NULL;
g_autoptr(GVariant) delta_part_header = NULL;
g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER;
g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER;
guint8 compression_type_char;
g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)"));
g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)"));
guint j;
for (j = 0; j < part_builder->modes->len; j++)
g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]);
for (j = 0; j < part_builder->xattrs->len; j++)
g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]);
{
g_autoptr(GBytes) payload_b;
g_autoptr(GBytes) operations_b;
payload_b = g_string_free_to_bytes (part_builder->payload);
part_builder->payload = NULL;
operations_b = g_string_free_to_bytes (part_builder->operations);
part_builder->operations = NULL;
delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)",
&mode_builder, &xattr_builder,
ot_gvariant_new_ay_bytes (payload_b),
ot_gvariant_new_ay_bytes (operations_b));
g_variant_ref_sink (delta_part_content);
}
/* Hardcode xz for now */
compressor = (GConverter*)_ostree_lzma_compressor_new (NULL);
compression_type_char = 'x';
part_payload_in = variant_to_inputstream (delta_part_content);
part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor);
{
gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in,
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
NULL, error);
if (n_bytes_written < 0)
return FALSE;
}
g_clear_pointer (&delta_part_content, g_variant_unref);
{ g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out);
delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)",
compression_type_char,
ot_gvariant_new_ay_bytes (payload)));
}
if (!glnx_open_tmpfile_linkable_at (builder->parts_dfd, ".", O_RDWR | O_CLOEXEC,
&part_builder->part_tmpf, error))
return FALSE;
part_temp_outstream = g_unix_output_stream_new (part_builder->part_tmpf.fd, FALSE);
part_in = variant_to_inputstream (delta_part);
if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in,
&part_checksum,
NULL, error))
return FALSE;
checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN);
objtype_checksum_array = objtype_checksum_array_new (part_builder->objects);
delta_part_header = g_variant_new ("(u@aytt@ay)",
maybe_swap_endian_u32 (builder->swap_endian, OSTREE_DELTAPART_VERSION),
ot_gvariant_new_ay_bytes (checksum_bytes),
maybe_swap_endian_u64 (builder->swap_endian, (guint64) g_variant_get_size (delta_part)),
maybe_swap_endian_u64 (builder->swap_endian, part_builder->uncompressed_size),
ot_gvariant_new_ay_bytes (objtype_checksum_array));
g_variant_ref_sink (delta_part_header);
part_builder->header = g_variant_ref (delta_part_header);
part_builder->compressed_size = g_variant_get_size (delta_part);
if (builder->delta_opts & DELTAOPT_FLAG_VERBOSE)
{
g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n",
builder->parts->len, part_builder->objects->len,
part_builder->compressed_size,
part_builder->uncompressed_size);
}
return TRUE;
}
static OstreeStaticDeltaPartBuilder *
allocate_part (OstreeStaticDeltaBuilder *builder)
allocate_part (OstreeStaticDeltaBuilder *builder, GError **error)
{
OstreeStaticDeltaPartBuilder *part = g_new0 (OstreeStaticDeltaPartBuilder, 1);
if (builder->parts->len > 0)
{
if (!finish_part (builder, error))
return NULL;
}
part->objects = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref);
part->payload = g_string_new (NULL);
part->operations = g_string_new (NULL);
@ -351,7 +472,10 @@ process_one_object (OstreeRepo *repo,
if (current_part->objects->len > 0 &&
current_part->payload->len + content_size > builder->max_chunk_size_bytes)
{
*current_part_val = current_part = allocate_part (builder);
current_part = allocate_part (builder, error);
if (current_part == NULL)
return FALSE;
*current_part_val = current_part;
}
guint64 compressed_size;
@ -590,7 +714,10 @@ process_one_rollsum (OstreeRepo *repo,
if (current_part->objects->len > 0 &&
current_part->payload->len > builder->max_chunk_size_bytes)
{
*current_part_val = current_part = allocate_part (builder);
current_part = allocate_part (builder, error);
if (current_part == NULL)
return FALSE;
*current_part_val = current_part;
}
g_autoptr(GBytes) tmp_to = NULL;
@ -705,7 +832,10 @@ process_one_bsdiff (OstreeRepo *repo,
if (current_part->objects->len > 0 &&
current_part->payload->len > builder->max_chunk_size_bytes)
{
*current_part_val = current_part = allocate_part (builder);
current_part = allocate_part (builder, error);
if (current_part == NULL)
return FALSE;
*current_part_val = current_part;
}
g_autoptr(GBytes) tmp_from = NULL;
@ -977,7 +1107,9 @@ generate_delta_lowlatency (OstreeRepo *repo,
g_hash_table_size (modified_regfile_content));
}
current_part = allocate_part (builder);
current_part = allocate_part (builder, error);
if (current_part == NULL)
return FALSE;
/* Pack the metadata first */
g_hash_table_iter_init (&hashiter, new_reachable_metadata);
@ -1093,6 +1225,9 @@ generate_delta_lowlatency (OstreeRepo *repo,
return FALSE;
}
if (!finish_part (builder, error))
return FALSE;
return TRUE;
}
@ -1258,6 +1393,26 @@ ostree_repo_static_delta_generate (OstreeRepo *self,
&to_commit, error))
goto out;
if (opt_filename)
{
g_autofree char *dnbuf = g_strdup (opt_filename);
const char *dn = dirname (dnbuf);
if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error))
goto out;
}
else
{
tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3);
if (tmp_dfd < 0)
{
glnx_set_error_from_errno (error);
goto out;
}
}
builder.parts_dfd = tmp_dfd;
builder.delta_opts = delta_opts;
/* Ignore optimization flags */
if (!generate_delta_lowlatency (self, from, to, delta_opts, &builder,
cancellable, error))
@ -1331,153 +1486,45 @@ ostree_repo_static_delta_generate (OstreeRepo *self,
goto out;
}
if (opt_filename)
{
g_autofree char *dnbuf = g_strdup (opt_filename);
const char *dn = dirname (dnbuf);
if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error))
goto out;
}
else
{
tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3);
if (tmp_dfd < 0)
{
glnx_set_error_from_errno (error);
goto out;
}
}
part_headers = g_variant_builder_new (G_VARIANT_TYPE ("a" OSTREE_STATIC_DELTA_META_ENTRY_FORMAT));
part_temp_paths = g_ptr_array_new_with_free_func ((GDestroyNotify)glnx_tmpfile_clear);
for (i = 0; i < builder.parts->len; i++)
{
OstreeStaticDeltaPartBuilder *part_builder = builder.parts->pdata[i];
g_autoptr(GBytes) payload_b;
g_autoptr(GBytes) operations_b;
g_autofree guchar *part_checksum = NULL;
g_autoptr(GBytes) objtype_checksum_array = NULL;
g_autoptr(GBytes) checksum_bytes = NULL;
g_autoptr(GOutputStream) part_temp_outstream = NULL;
g_autoptr(GInputStream) part_in = NULL;
g_autoptr(GInputStream) part_payload_in = NULL;
g_autoptr(GMemoryOutputStream) part_payload_out = NULL;
g_autoptr(GConverterOutputStream) part_payload_compressor = NULL;
g_autoptr(GConverter) compressor = NULL;
g_autoptr(GVariant) delta_part_content = NULL;
g_autoptr(GVariant) delta_part = NULL;
g_autoptr(GVariant) delta_part_header = NULL;
g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER;
g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER;
guint8 compression_type_char;
g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)"));
g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)"));
{ guint j;
for (j = 0; j < part_builder->modes->len; j++)
g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]);
for (j = 0; j < part_builder->xattrs->len; j++)
g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]);
}
payload_b = g_string_free_to_bytes (part_builder->payload);
part_builder->payload = NULL;
operations_b = g_string_free_to_bytes (part_builder->operations);
part_builder->operations = NULL;
/* FIXME - avoid duplicating memory here */
delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)",
&mode_builder, &xattr_builder,
ot_gvariant_new_ay_bytes (payload_b),
ot_gvariant_new_ay_bytes (operations_b));
g_variant_ref_sink (delta_part_content);
/* Hardcode xz for now */
compressor = (GConverter*)_ostree_lzma_compressor_new (NULL);
compression_type_char = 'x';
part_payload_in = variant_to_inputstream (delta_part_content);
part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor);
{
gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in,
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
cancellable, error);
if (n_bytes_written < 0)
goto out;
}
/* FIXME - avoid duplicating memory here */
{ g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out);
delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)",
compression_type_char,
ot_gvariant_new_ay_bytes (payload)));
}
if (inline_parts)
{
g_autofree char *part_relpath = _ostree_get_relative_static_delta_part_path (from, to, i);
if (!ot_variant_builder_add (descriptor_builder, error, "{sv}", part_relpath, delta_part))
lseek (part_builder->part_tmpf.fd, 0, SEEK_SET);
if (!ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("{sv}"), error) ||
!ot_variant_builder_add (descriptor_builder, error, "s", part_relpath) ||
!ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("v"), error) ||
!ot_variant_builder_add_from_fd (descriptor_builder, G_VARIANT_TYPE ("(yay)"), part_builder->part_tmpf.fd, part_builder->compressed_size, error) ||
!ot_variant_builder_close (descriptor_builder, error) ||
!ot_variant_builder_close (descriptor_builder, error))
goto out;
}
else
{
GLnxTmpfile *part_tmpf = g_new0 (GLnxTmpfile, 1);
g_autofree char *partstr = g_strdup_printf ("%u", i);
if (!glnx_open_tmpfile_linkable_at (tmp_dfd, ".", O_WRONLY | O_CLOEXEC,
part_tmpf, error))
if (fchmod (part_builder->part_tmpf.fd, 0644) < 0)
{
glnx_set_error_from_errno (error);
goto out;
}
if (!glnx_link_tmpfile_at (&part_builder->part_tmpf, GLNX_LINK_TMPFILE_REPLACE,
descriptor_dfd, partstr, error))
goto out;
/* Transfer tempfile ownership */
part_temp_outstream = g_unix_output_stream_new (part_tmpf->fd, FALSE);
g_ptr_array_add (part_temp_paths, g_steal_pointer (&part_tmpf));
}
part_in = variant_to_inputstream (delta_part);
if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in,
&part_checksum,
cancellable, error))
goto out;
g_variant_builder_add_value (part_headers, g_variant_ref (part_builder->header));
checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN);
objtype_checksum_array = objtype_checksum_array_new (part_builder->objects);
delta_part_header = g_variant_new ("(u@aytt@ay)",
maybe_swap_endian_u32 (builder.swap_endian, OSTREE_DELTAPART_VERSION),
ot_gvariant_new_ay_bytes (checksum_bytes),
maybe_swap_endian_u64 (builder.swap_endian, (guint64) g_variant_get_size (delta_part)),
maybe_swap_endian_u64 (builder.swap_endian, part_builder->uncompressed_size),
ot_gvariant_new_ay_bytes (objtype_checksum_array));
g_variant_builder_add_value (part_headers, g_variant_ref (delta_part_header));
total_compressed_size += g_variant_get_size (delta_part);
total_compressed_size += part_builder->compressed_size;
total_uncompressed_size += part_builder->uncompressed_size;
if (delta_opts & DELTAOPT_FLAG_VERBOSE)
{
g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n",
i, part_builder->objects->len,
(guint64)g_variant_get_size (delta_part),
part_builder->uncompressed_size);
}
}
for (i = 0; i < part_temp_paths->len; i++)
{
g_autofree char *partstr = g_strdup_printf ("%u", i);
/* Take ownership of the path/fd here */
g_auto(GLnxTmpfile) tmpf = *((GLnxTmpfile*)part_temp_paths->pdata[i]);
g_clear_pointer (&(part_temp_paths->pdata[i]), g_free);
if (fchmod (tmpf.fd, 0644) < 0)
{
glnx_set_error_from_errno (error);
goto out;
}
if (!glnx_link_tmpfile_at (&tmpf, GLNX_LINK_TMPFILE_REPLACE,
descriptor_dfd, partstr, error))
goto out;
}
if (!get_fallback_headers (self, &builder, &fallback_headers,