lib/pull: Drop partial fetch code from libsoup backend

Doing this in prep for libglnx tmpdir porting, but I think we should also do
this because the partial fetch code IMO was never fully baked; among other
things it was never integrated into the scheme we came up with for "boot id
sync" that we use for complete/staged objects.

There's a lot of complexity here that while we have some coverage for, I think
we need to refocus on the core functionality. The libcurl backend doesn't have
an equivalent to this today.

In particular for small objects, this is simply overly complex. The downside is
clearly for large objects like FAH's 61MB initramfs; not being able to resume
fetches of those is unfortunate.

In practice though, I think most people should be using deltas, and we need to
make sure deltas work for large objects anyways.

Further ultimately the peer-to-peer work should help a lot for people
with truly unreliable connections.

Closes: #1176
Approved by: jlebon
This commit is contained in:
Colin Walters 2017-09-14 15:10:54 -04:00 committed by Atomic Bot
parent c32f234e9a
commit 0488b4870e
6 changed files with 65 additions and 145 deletions

View File

@ -379,14 +379,13 @@ check_multi_info (OstreeFetcher *fetcher)
g_autoptr(GError) local_error = NULL;
GError **error = &local_error;
/* TODO - share file naming with soup, and fix it */
g_autofree char *tmpfile_path =
g_compute_checksum_for_string (G_CHECKSUM_SHA256,
eff_url, strlen (eff_url));
ostree_fetcher_generate_url_tmpname (eff_url);
if (!ensure_tmpfile (req, error))
{
g_task_return_error (task, g_steal_pointer (&local_error));
}
/* This should match the libsoup chmod */
else if (fchmod (req->tmpf.fd, 0644) < 0)
{
glnx_set_error_from_errno (error);

View File

@ -55,10 +55,7 @@ typedef struct {
volatile gint running;
GError *initialization_error; /* Any failure to load the db */
int tmpdir_dfd;
char *remote_name;
char *tmpdir_name;
GLnxLockFile tmpdir_lock;
int base_tmpdir_dfd;
GVariant *extra_headers;
@ -94,7 +91,7 @@ typedef struct {
gboolean is_membuf;
OstreeFetcherRequestFlags flags;
GInputStream *request_body;
char *out_tmpfile;
GLnxTmpfile tmpf;
GOutputStream *out_stream;
guint64 max_size;
@ -155,17 +152,6 @@ thread_closure_unref (ThreadClosure *thread_closure)
g_clear_pointer (&thread_closure->extra_headers, (GDestroyNotify)g_variant_unref);
if (thread_closure->tmpdir_dfd != -1)
close (thread_closure->tmpdir_dfd);
/* Note: We don't remove the tmpdir here, because that would cause
us to not reuse it on resume. This happens because we use two
fetchers for each pull, so finalizing the first one would remove
all the files to be resumed from the previous second one */
g_free (thread_closure->tmpdir_name);
glnx_release_lock_file (&thread_closure->tmpdir_lock);
g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
g_mutex_clear (&thread_closure->output_stream_set_lock);
@ -210,7 +196,7 @@ pending_uri_unref (OstreeFetcherPendingURI *pending)
g_free (pending->filename);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile);
glnx_tmpfile_clear (&pending->tmpf);
g_clear_object (&pending->out_stream);
g_free (pending);
}
@ -477,59 +463,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
}
else
{
g_autofree char *uristring
= soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
g_autofree char *tmpfile = NULL;
struct stat stbuf;
gboolean exists;
/* The tmp directory is lazily created for each fetcher instance,
* since it may require superuser permissions and some instances
* only need _ostree_fetcher_request_uri_to_membuf() which keeps
* everything in memory buffers. */
if (thread_closure->tmpdir_name == NULL)
{
if (!_ostree_repo_allocate_tmpdir (thread_closure->base_tmpdir_dfd,
OSTREE_REPO_TMPDIR_FETCHER,
&thread_closure->tmpdir_name,
&thread_closure->tmpdir_dfd,
&thread_closure->tmpdir_lock,
NULL,
cancellable,
&local_error))
{
g_task_return_error (task, local_error);
return;
}
}
tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
if (fstatat (thread_closure->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
exists = TRUE;
else
{
if (errno == ENOENT)
exists = FALSE;
else
{
glnx_set_error_from_errno (&local_error);
g_task_return_error (task, local_error);
return;
}
}
if (SOUP_IS_REQUEST_HTTP (pending->request))
{
glnx_unref_object SoupMessage *msg = NULL;
msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
if (exists && stbuf.st_size > 0)
soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
}
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */
start_pending_request (thread_closure, task);
start_pending_request (thread_closure, task);
}
}
@ -658,7 +592,6 @@ _ostree_fetcher_constructed (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
g_autoptr(GMainContext) main_context = NULL;
GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT;
const char *http_proxy;
main_context = g_main_context_new ();
@ -668,8 +601,6 @@ _ostree_fetcher_constructed (GObject *object)
self->thread_closure->main_context = g_main_context_ref (main_context);
self->thread_closure->running = 1;
self->thread_closure->transfer_gzip = (self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) != 0;
self->thread_closure->tmpdir_dfd = -1;
self->thread_closure->tmpdir_lock = empty_lockfile;
self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
@ -749,7 +680,7 @@ _ostree_fetcher_new (int tmpdir_dfd,
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
return fetcher->thread_closure->tmpdir_dfd;
return fetcher->thread_closure->base_tmpdir_dfd;
}
void
@ -874,13 +805,8 @@ finish_stream (OstreeFetcherPendingURI *pending,
if (!pending->is_membuf)
{
if (fstatat (pending->thread_closure->tmpdir_dfd,
pending->out_tmpfile,
&stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
glnx_set_error_from_errno (error);
goto out;
}
if (!glnx_fstat (pending->tmpf.fd, &stbuf, error))
goto out;
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
@ -973,11 +899,39 @@ on_stream_read (GObject *object,
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* Only open the output stream on demand to ensure we use as
* few file descriptors as possible.
*/
if (!pending->out_stream)
{
if (!pending->is_membuf)
{
if (!glnx_open_tmpfile_linkable_at (pending->thread_closure->base_tmpdir_dfd, ".",
O_WRONLY | O_CLOEXEC, &pending->tmpf, &local_error))
goto out;
/* This should match the libcurl chmod */
if (!glnx_fchmod (pending->tmpf.fd, 0644, &local_error))
goto out;
pending->out_stream = g_unix_output_stream_new (pending->tmpf.fd, FALSE);
}
else
{
pending->out_stream = g_memory_output_stream_new_resizable ();
}
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
/* Get a GBytes buffer */
bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
if (!bytes)
goto out;
bytes_read = g_bytes_get_size (bytes);
/* Was this the end of the stream? */
if (bytes_read == 0)
{
if (!finish_stream (pending, cancellable, &local_error))
@ -990,14 +944,24 @@ on_stream_read (GObject *object,
}
else
{
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
g_autofree char *uristring =
soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
g_autofree char *tmpfile_path =
ostree_fetcher_generate_url_tmpname (uristring);
if (!glnx_link_tmpfile_at (&pending->tmpf, GLNX_LINK_TMPFILE_REPLACE,
pending->thread_closure->base_tmpdir_dfd, tmpfile_path,
&local_error))
g_task_return_error (task, g_steal_pointer (&local_error));
else
g_task_return_pointer (task,
g_steal_pointer (&tmpfile_path),
(GDestroyNotify) g_free);
}
remove_pending (pending);
}
else
{
/* Verify max size */
if (pending->max_size > 0)
{
if (bytes_read > pending->max_size ||
@ -1011,7 +975,7 @@ on_stream_read (GObject *object,
goto out;
}
}
pending->current_size += bytes_read;
/* We do this instead of _write_bytes_async() as that's not
@ -1063,19 +1027,7 @@ on_request_sent (GObject *object,
if (SOUP_IS_REQUEST_HTTP (object))
{
msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
if (!pending->is_membuf &&
msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
{
// We already have the whole file, so just use it.
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
remove_pending (pending);
goto out;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
{
/* is there another mirror we can try? */
if (pending->mirrorlist_idx + 1 < pending->mirrorlist->len)
@ -1091,8 +1043,8 @@ on_request_sent (GObject *object,
}
else
{
g_autofree char *uristring
= soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
g_autofree char *uristring =
soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
GIOErrorEnum code;
switch (msg->status_code)
@ -1143,38 +1095,6 @@ on_request_sent (GObject *object,
pending->content_length = soup_request_get_content_length (pending->request);
if (!pending->is_membuf)
{
int oflags = O_CREAT | O_WRONLY | O_CLOEXEC;
int fd;
/* If we got partial content, we can append; if the server
* ignored our range request, we need to truncate.
*/
if (msg && msg->status_code == SOUP_STATUS_PARTIAL_CONTENT)
oflags |= O_APPEND;
else
oflags |= O_TRUNC;
fd = openat (pending->thread_closure->tmpdir_dfd,
pending->out_tmpfile, oflags, 0666);
if (fd == -1)
{
glnx_set_error_from_errno (&local_error);
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
}
else
{
pending->out_stream = g_memory_output_stream_new_resizable ();
}
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,

View File

@ -26,6 +26,16 @@
G_BEGIN_DECLS
/* FIXME - delete this and replace by having fetchers simply
* return O_TMPFILE fds, not file paths.
*/
static inline char *
ostree_fetcher_generate_url_tmpname (const char *url)
{
return g_compute_checksum_for_string (G_CHECKSUM_SHA256,
url, strlen (url));
}
gboolean _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher *fetcher,
GPtrArray *mirrorlist,
const char *filename,

View File

@ -1284,13 +1284,6 @@ cleanup_tmpdir (OstreeRepo *self,
if (!glnx_shutil_rm_rf_at (dfd_iter.fd, dent->d_name, cancellable, error))
return glnx_prefix_error (error, "Removing %s", dent->d_name);
}
/* FIXME - move OSTREE_REPO_TMPDIR_FETCHER underneath the
* staging/boot-id scheme as well, since all of the "did it get
* fsync'd" concerns apply to that as well. Then we can skip
* this special case.
*/
else if (g_str_has_prefix (dent->d_name, OSTREE_REPO_TMPDIR_FETCHER))
continue;
else
{
/* Now we do time-based cleanup. Ignore it if it's somehow

View File

@ -205,7 +205,6 @@ _ostree_repo_memory_cache_ref_destroy (OstreeRepoMemoryCacheRef *state);
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(OstreeRepoMemoryCacheRef, _ostree_repo_memory_cache_ref_destroy)
#define OSTREE_REPO_TMPDIR_STAGING "staging-"
#define OSTREE_REPO_TMPDIR_FETCHER "fetcher-"
gboolean
_ostree_repo_allocate_tmpdir (int tmpdir_dfd,

View File

@ -5061,8 +5061,7 @@ ostree_repo_regenerate_summary (OstreeRepo *self,
gboolean
_ostree_repo_is_locked_tmpdir (const char *filename)
{
return g_str_has_prefix (filename, OSTREE_REPO_TMPDIR_STAGING) ||
g_str_has_prefix (filename, OSTREE_REPO_TMPDIR_FETCHER);
return g_str_has_prefix (filename, OSTREE_REPO_TMPDIR_STAGING);
}
gboolean