fetcher: Move the SoupSession to a separate thread

Move the SoupSession to a separate thread with its own isolated main
context and main loop.  All interaction with the SoupSession occurs
by way of idle sources attached to the session's main context, which
execute on the session's thread.

This should solve the problem of running an asynchronous fetch request
synchronously by pushing a new thread-default main context and iterating
a main loop until the request completes.  Prior to this, the new thread-
default main context would interfere with the SoupSession's own async
processing.
This commit is contained in:
Matthew Barnes 2015-11-13 15:44:20 -05:00
parent af30fc764a
commit 54066420cf

View File

@ -41,7 +41,29 @@ typedef enum {
} OstreeFetcherState;
typedef struct {
OstreeFetcher *self;
volatile int ref_count;
SoupSession *session;
GMainContext *main_context;
GMainLoop *main_loop;
int tmpdir_dfd;
int max_outstanding;
/* Queue for libsoup, see bgo#708591 */
GQueue pending_queue;
GHashTable *outstanding;
/* Shared across threads; be sure to lock. */
GHashTable *output_stream_set; /* set<GOutputStream> */
GMutex output_stream_set_lock;
/* Also protected by output_stream_set_lock. */
guint64 total_downloaded;
} ThreadClosure;
typedef struct {
ThreadClosure *thread_closure;
SoupURI *uri;
OstreeFetcherState state;
@ -60,29 +82,30 @@ typedef struct {
GTask *task;
} OstreeFetcherPendingURI;
/* Used by session_thread_idle_add() */
typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
gpointer data);
/* Used by session_thread_idle_add() */
typedef struct {
ThreadClosure *thread_closure;
SessionThreadFunc function;
gpointer data;
GDestroyNotify notify;
} IdleClosure;
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
int tmpdir_dfd;
char *tmpdir_name;
GLnxLockFile tmpdir_lock;
int base_tmpdir_dfd;
GTlsCertificate *client_cert;
SoupSession *session;
SoupRequester *requester;
GHashTable *output_stream_set; /* set<GOutputStream> */
guint64 total_downloaded;
/* Queue for libsoup, see bgo#708591 */
GQueue pending_queue;
GHashTable *outstanding;
gint max_outstanding;
GThread *session_thread;
ThreadClosure *thread_closure;
};
enum {
@ -92,6 +115,56 @@ enum {
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
static ThreadClosure *
thread_closure_ref (ThreadClosure *thread_closure)
{
g_return_val_if_fail (thread_closure != NULL, NULL);
g_return_val_if_fail (thread_closure->ref_count > 0, NULL);
g_atomic_int_inc (&thread_closure->ref_count);
return thread_closure;
}
static void
thread_closure_unref (ThreadClosure *thread_closure)
{
g_return_if_fail (thread_closure != NULL);
g_return_if_fail (thread_closure->ref_count > 0);
if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
{
g_clear_object (&thread_closure->session);
g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
g_clear_pointer (&thread_closure->main_loop, g_main_loop_unref);
if (thread_closure->tmpdir_dfd != -1)
close (thread_closure->tmpdir_dfd);
while (!g_queue_is_empty (&thread_closure->pending_queue))
g_object_unref (g_queue_pop_head (&thread_closure->pending_queue));
g_clear_pointer (&thread_closure->outstanding, g_hash_table_unref);
g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
g_mutex_clear (&thread_closure->output_stream_set_lock);
g_slice_free (ThreadClosure, thread_closure);
}
}
static void
idle_closure_free (IdleClosure *idle_closure)
{
g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
if (idle_closure->notify != NULL)
idle_closure->notify (idle_closure->data);
g_slice_free (IdleClosure, idle_closure);
}
static int
pending_task_compare (gconstpointer a,
gconstpointer b,
@ -107,10 +180,11 @@ pending_task_compare (gconstpointer a,
static void
pending_uri_free (OstreeFetcherPendingURI *pending)
{
g_hash_table_remove (pending->self->outstanding, pending);
g_hash_table_remove (pending->thread_closure->outstanding, pending);
g_clear_pointer (&pending->thread_closure, thread_closure_unref);
soup_uri_free (pending->uri);
g_clear_object (&pending->self);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile);
@ -118,6 +192,250 @@ pending_uri_free (OstreeFetcherPendingURI *pending)
g_free (pending);
}
static gboolean
session_thread_idle_dispatch (gpointer data)
{
IdleClosure *idle_closure = data;
idle_closure->function (idle_closure->thread_closure,
idle_closure->data);
return G_SOURCE_REMOVE;
}
static void
session_thread_idle_add (ThreadClosure *thread_closure,
SessionThreadFunc function,
gpointer data,
GDestroyNotify notify)
{
IdleClosure *idle_closure;
g_return_if_fail (thread_closure != NULL);
g_return_if_fail (function != NULL);
idle_closure = g_slice_new (IdleClosure);
idle_closure->thread_closure = thread_closure_ref (thread_closure);
idle_closure->function = function;
idle_closure->data = data;
idle_closure->notify = notify;
g_main_context_invoke_full (thread_closure->main_context,
G_PRIORITY_DEFAULT,
session_thread_idle_dispatch,
idle_closure, /* takes ownership */
(GDestroyNotify) idle_closure_free);
}
static void
session_thread_add_logger (ThreadClosure *thread_closure,
gpointer data)
{
glnx_unref_object SoupLogger *logger = NULL;
logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500);
soup_session_add_feature (thread_closure->session,
SOUP_SESSION_FEATURE (logger));
}
static void
session_thread_config_flags (ThreadClosure *thread_closure,
gpointer data)
{
OstreeFetcherConfigFlags config_flags;
config_flags = GPOINTER_TO_UINT (data);
if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
{
g_object_set (thread_closure->session,
SOUP_SESSION_SSL_STRICT,
FALSE, NULL);
}
}
static void
session_thread_set_proxy_cb (ThreadClosure *thread_closure,
gpointer data)
{
SoupURI *proxy_uri = data;
g_object_set (thread_closure->session,
SOUP_SESSION_PROXY_URI,
proxy_uri, NULL);
}
static void
session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
gpointer data)
{
GTlsInteraction *interaction = data;
g_object_set (thread_closure->session,
SOUP_SESSION_TLS_INTERACTION,
interaction, NULL);
}
static void
session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
gpointer data)
{
GTlsDatabase *database = data;
if (database != NULL)
{
g_object_set (thread_closure->session,
SOUP_SESSION_TLS_DATABASE,
database, NULL);
}
else
{
g_object_set (thread_closure->session,
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE,
TRUE, NULL);
}
}
static void
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
static void
session_thread_process_pending_queue (ThreadClosure *thread_closure)
{
while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
{
GTask *task;
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
task = g_queue_pop_head (&thread_closure->pending_queue);
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* pending_uri_free() removes this. */
g_hash_table_add (thread_closure->outstanding, pending);
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
g_object_unref (task);
}
}
static void
session_thread_request_uri (ThreadClosure *thread_closure,
gpointer data)
{
GTask *task = G_TASK (data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
pending->request = soup_session_request_uri (thread_closure->session,
pending->uri,
&local_error);
if (local_error != NULL)
{
g_task_return_error (task, local_error);
return;
}
if (pending->is_stream)
{
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}
else
{
g_autofree char *uristring = soup_uri_to_string (pending->uri, FALSE);
g_autofree char *tmpfile = NULL;
struct stat stbuf;
gboolean exists;
tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
if (fstatat (thread_closure->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
exists = TRUE;
else
{
if (errno == ENOENT)
exists = FALSE;
else
{
gs_set_error_from_errno (&local_error, errno);
g_task_return_error (task, local_error);
return;
}
}
if (SOUP_IS_REQUEST_HTTP (pending->request))
{
glnx_unref_object SoupMessage *msg = NULL;
msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
if (exists && stbuf.st_size > 0)
soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
}
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */
g_queue_insert_sorted (&thread_closure->pending_queue,
g_object_ref (task),
pending_task_compare, NULL);
session_thread_process_pending_queue (thread_closure);
}
}
static gpointer
ostree_fetcher_session_thread (gpointer data)
{
ThreadClosure *closure = data;
gint max_conns;
/* This becomes the GMainContext that SoupSession schedules async
* callbacks and emits signals from. Make it the thread-default
* context for this thread before creating the session. */
g_main_context_push_thread_default (closure->main_context);
closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
SOUP_SESSION_TIMEOUT, 60,
SOUP_SESSION_IDLE_TIMEOUT, 60,
NULL);
g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL);
if (max_conns < 8)
{
/* We download a lot of small objects in ostree, so this
* helps a lot. Also matches what most modern browsers do. */
max_conns = 8;
g_object_set (closure->session,
"max-conns-per-host",
max_conns, NULL);
}
closure->max_outstanding = 3 * max_conns;
g_main_loop_run (closure->main_loop);
g_main_context_pop_thread_default (closure->main_context);
thread_closure_unref (closure);
return NULL;
}
static void
_ostree_fetcher_set_property (GObject *object,
guint prop_id,
@ -159,12 +477,12 @@ _ostree_fetcher_get_property (GObject *object,
static void
_ostree_fetcher_finalize (GObject *object)
{
OstreeFetcher *self;
OstreeFetcher *self = OSTREE_FETCHER (object);
self = OSTREE_FETCHER (object);
if (self->tmpdir_dfd != -1)
close (self->tmpdir_dfd);
/* Terminate the session thread. */
g_main_loop_quit (self->thread_closure->main_loop);
g_clear_pointer (&self->session_thread, g_thread_unref);
g_clear_pointer (&self->thread_closure, thread_closure_unref);
/* Note: We don't remove the tmpdir here, because that would cause
us to not reuse it on resume. This happens because we use two
@ -174,19 +492,57 @@ _ostree_fetcher_finalize (GObject *object)
g_free (self->tmpdir_name);
glnx_release_lock_file (&self->tmpdir_lock);
g_clear_object (&self->session);
g_clear_object (&self->client_cert);
g_hash_table_destroy (self->output_stream_set);
while (!g_queue_is_empty (&self->pending_queue))
g_object_unref (g_queue_pop_head (&self->pending_queue));
g_hash_table_destroy (self->outstanding);
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
}
static void
_ostree_fetcher_constructed (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
g_autoptr(GMainContext) main_context = NULL;
const char *http_proxy;
main_context = g_main_context_new ();
self->thread_closure = g_slice_new0 (ThreadClosure);
self->thread_closure->ref_count = 1;
self->thread_closure->main_context = g_main_context_ref (main_context);
self->thread_closure->main_loop = g_main_loop_new (main_context, FALSE);
self->thread_closure->tmpdir_dfd = -1;
self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
(GDestroyNotify) NULL,
(GDestroyNotify) g_object_unref);
if (g_getenv ("OSTREE_DEBUG_HTTP"))
{
session_thread_idle_add (self->thread_closure,
session_thread_add_logger,
NULL, (GDestroyNotify) NULL);
}
if (self->config_flags != 0)
{
session_thread_idle_add (self->thread_closure,
session_thread_config_flags,
GUINT_TO_POINTER (self->config_flags),
(GDestroyNotify) NULL);
}
http_proxy = g_getenv ("http_proxy");
if (http_proxy != NULL)
_ostree_fetcher_set_proxy (self, http_proxy);
/* FIXME Maybe implement GInitableIface and use g_thread_try_new()
* so we can try to handle thread creation errors gracefully? */
self->session_thread = g_thread_new ("fetcher-session-thread",
ostree_fetcher_session_thread,
thread_closure_ref (self->thread_closure));
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}
static void
_ostree_fetcher_class_init (OstreeFetcherClass *klass)
{
@ -195,6 +551,7 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
gobject_class->set_property = _ostree_fetcher_set_property;
gobject_class->get_property = _ostree_fetcher_get_property;
gobject_class->finalize = _ostree_fetcher_finalize;
gobject_class->constructed = _ostree_fetcher_constructed;
g_object_class_install_property (gobject_class,
PROP_CONFIG_FLAGS,
@ -211,50 +568,9 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
gint max_conns;
const char *http_proxy;
GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT;
g_queue_init (&self->pending_queue);
self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
SOUP_SESSION_TIMEOUT, 60,
SOUP_SESSION_IDLE_TIMEOUT, 60,
NULL);
http_proxy = g_getenv ("http_proxy");
if (http_proxy)
{
_ostree_fetcher_set_proxy (self, http_proxy);
}
if (g_getenv ("OSTREE_DEBUG_HTTP"))
soup_session_add_feature (self->session, (SoupSessionFeature*)soup_logger_new (SOUP_LOGGER_LOG_BODY, 500));
if ((self->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
g_object_set (self->session, SOUP_SESSION_SSL_STRICT, FALSE, NULL);
self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
g_object_get (self->session, "max-conns-per-host", &max_conns, NULL);
if (max_conns <= 8)
{
// We download a lot of small objects in ostree, so this helps a
// lot. Also matches what most modern browsers do.
max_conns = 8;
g_object_set (self->session, "max-conns-per-host", max_conns, NULL);
}
self->max_outstanding = 3 * max_conns;
self->output_stream_set = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_object_unref);
self->outstanding = g_hash_table_new_full (NULL, NULL, NULL, NULL);
self->tmpdir_dfd = -1;
self->tmpdir_lock = empty_lockfile;
}
OstreeFetcher *
@ -270,13 +586,13 @@ _ostree_fetcher_new (int tmpdir_dfd,
if (!_ostree_repo_allocate_tmpdir (tmpdir_dfd,
"fetcher-",
&self->tmpdir_name,
&self->tmpdir_dfd,
&self->thread_closure->tmpdir_dfd,
&self->tmpdir_lock,
NULL,
cancellable, error))
return NULL;
self->tmpdir_dfd = tmpdir_dfd;
self->base_tmpdir_dfd = tmpdir_dfd;
return self;
}
@ -284,81 +600,69 @@ _ostree_fetcher_new (int tmpdir_dfd,
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
return fetcher->tmpdir_dfd;
return fetcher->thread_closure->tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
SoupURI *proxy_uri = soup_uri_new (http_proxy);
SoupURI *proxy_uri;
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (http_proxy != NULL);
proxy_uri = soup_uri_new (http_proxy);
if (!proxy_uri)
{
g_warning ("Invalid proxy URI '%s'", http_proxy);
}
else
{
g_object_set (self->session, SOUP_SESSION_PROXY_URI, proxy_uri, NULL);
soup_uri_free (proxy_uri);
session_thread_idle_add (self->thread_closure,
session_thread_set_proxy_cb,
proxy_uri, /* takes ownership */
(GDestroyNotify) soup_uri_free);
}
}
void
_ostree_fetcher_set_client_cert (OstreeFetcher *fetcher,
GTlsCertificate *cert)
_ostree_fetcher_set_client_cert (OstreeFetcher *self,
GTlsCertificate *cert)
{
g_clear_object (&fetcher->client_cert);
fetcher->client_cert = g_object_ref (cert);
if (fetcher->client_cert)
{
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (G_IS_TLS_CERTIFICATE (cert));
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
g_autoptr(GTlsInteraction) interaction =
(GTlsInteraction*)_ostree_tls_cert_interaction_new (fetcher->client_cert);
g_object_set (fetcher->session, "tls-interaction", interaction, NULL);
session_thread_idle_add (self->thread_closure,
session_thread_set_tls_interaction_cb,
_ostree_tls_cert_interaction_new (cert),
(GDestroyNotify) g_object_unref);
#else
g_warning ("This version of OSTree is compiled without client side certificate support");
g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
}
}
void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
GTlsDatabase *db)
{
if (db)
g_object_set ((GObject*)self->session, "tls-database", db, NULL);
else
g_object_set ((GObject*)self->session, "ssl-use-system-ca-file", TRUE, NULL);
}
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (db == NULL || G_IS_TLS_DATABASE (db));
static void
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
static void
ostree_fetcher_process_pending_queue (OstreeFetcher *self)
{
while (g_queue_peek_head (&self->pending_queue) != NULL &&
g_hash_table_size (self->outstanding) < self->max_outstanding)
if (db != NULL)
{
GTask *task;
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
task = g_queue_pop_head (&self->pending_queue);
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* pending_uri_free() removes this. */
g_hash_table_add (self->outstanding, pending);
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
g_object_unref (task);
session_thread_idle_add (self->thread_closure,
session_thread_set_tls_database_cb,
g_object_ref (db),
(GDestroyNotify) g_object_unref);
}
else
{
session_thread_idle_add (self->thread_closure,
session_thread_set_tls_database_cb,
NULL, (GDestroyNotify) NULL);
}
}
@ -377,11 +681,17 @@ finish_stream (OstreeFetcherPendingURI *pending,
{
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_remove (pending->thread_closure->output_stream_set,
pending->out_stream);
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
if (fstatat (pending->self->tmpdir_dfd, pending->out_tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
if (fstatat (pending->thread_closure->tmpdir_dfd,
pending->out_tmpfile,
&stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
gs_set_error_from_errno (error, errno);
goto out;
@ -390,7 +700,7 @@ finish_stream (OstreeFetcherPendingURI *pending,
/* Now that we've finished downloading, continue with other queued
* requests.
*/
ostree_fetcher_process_pending_queue (pending->self);
session_thread_process_pending_queue (pending->thread_closure);
if (stbuf.st_size < pending->content_length)
{
@ -399,7 +709,9 @@ finish_stream (OstreeFetcherPendingURI *pending,
}
else
{
pending->self->total_downloaded += stbuf.st_size;
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
pending->thread_closure->total_downloaded += stbuf.st_size;
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
ret = TRUE;
@ -593,14 +905,20 @@ on_request_sent (GObject *object,
else
oflags |= O_TRUNC;
fd = openat (pending->self->tmpdir_dfd, pending->out_tmpfile, oflags, 0666);
fd = openat (pending->thread_closure->tmpdir_dfd,
pending->out_tmpfile, oflags, 0666);
if (fd == -1)
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
@ -636,13 +954,15 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
gpointer user_data,
gpointer source_tag)
{
GTask *task;
OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
GError *local_error = NULL;
g_autoptr(GTask) task = NULL;
OstreeFetcherPendingURI *pending;
pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (uri != NULL);
pending->self = g_object_ref (self);
/* SoupRequest is created in session thread. */
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->uri = soup_uri_copy (uri);
pending->max_size = max_size;
pending->is_stream = is_stream;
@ -654,55 +974,10 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
if (is_stream)
{
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}
else
{
g_autofree char *uristring = soup_uri_to_string (uri, FALSE);
g_autofree char *tmpfile = NULL;
struct stat stbuf;
gboolean exists;
tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
if (fstatat (self->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
exists = TRUE;
else
{
if (errno == ENOENT)
exists = FALSE;
else
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
}
if (SOUP_IS_REQUEST_HTTP (pending->request))
{
glnx_unref_object SoupMessage *msg = NULL;
msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
if (exists && stbuf.st_size > 0)
soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
}
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */
g_queue_insert_sorted (&self->pending_queue,
g_object_ref (task),
pending_task_compare, NULL);
ostree_fetcher_process_pending_queue (self);
}
g_assert_no_error (local_error);
out:
g_object_unref (task);
session_thread_idle_add (self->thread_closure,
session_thread_request_uri,
g_object_ref (task),
(GDestroyNotify) g_object_unref);
}
void
@ -760,11 +1035,17 @@ ostree_fetcher_stream_uri_finish (OstreeFetcher *self,
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
guint64 ret = self->total_downloaded;
GHashTableIter hiter;
gpointer key, value;
guint64 ret;
g_hash_table_iter_init (&hiter, self->output_stream_set);
g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
g_mutex_lock (&self->thread_closure->output_stream_set_lock);
ret = self->thread_closure->total_downloaded;
g_hash_table_iter_init (&hiter, self->thread_closure->output_stream_set);
while (g_hash_table_iter_next (&hiter, &key, &value))
{
GFileOutputStream *stream = key;
@ -776,7 +1057,9 @@ _ostree_fetcher_bytes_transferred (OstreeFetcher *self)
ret += stbuf.st_size;
}
}
g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
return ret;
}