mirror of
https://github.com/ostreedev/ostree.git
synced 2025-02-01 09:47:45 +03:00
fetcher: Drop the libsoup queue
Now that we have queuing in the higher level pull logic, we don't need to do this anymore. It's tempting to keep it since the code diff is so small (without completely rewriting things), but dropping it here will make it easier to see when things go wrong at a higher level. Note that I kept an assertion. Closes: #654 Approved by: jlebon
This commit is contained in:
parent
c18628ecb8
commit
f4d1334e19
@ -62,8 +62,7 @@ typedef struct {
|
|||||||
GVariant *extra_headers;
|
GVariant *extra_headers;
|
||||||
int max_outstanding;
|
int max_outstanding;
|
||||||
|
|
||||||
/* Queue for libsoup, see bgo#708591 */
|
/* Our active HTTP requests */
|
||||||
GQueue pending_queue;
|
|
||||||
GHashTable *outstanding;
|
GHashTable *outstanding;
|
||||||
|
|
||||||
/* Shared across threads; be sure to lock. */
|
/* Shared across threads; be sure to lock. */
|
||||||
@ -77,9 +76,6 @@ typedef struct {
|
|||||||
|
|
||||||
} ThreadClosure;
|
} ThreadClosure;
|
||||||
|
|
||||||
static void
|
|
||||||
session_thread_process_pending_queue (ThreadClosure *thread_closure);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
volatile int ref_count;
|
volatile int ref_count;
|
||||||
|
|
||||||
@ -187,18 +183,6 @@ idle_closure_free (IdleClosure *idle_closure)
|
|||||||
g_slice_free (IdleClosure, idle_closure);
|
g_slice_free (IdleClosure, idle_closure);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
pending_task_compare (gconstpointer a,
|
|
||||||
gconstpointer b,
|
|
||||||
gpointer unused)
|
|
||||||
{
|
|
||||||
gint priority_a = g_task_get_priority (G_TASK (a));
|
|
||||||
gint priority_b = g_task_get_priority (G_TASK (b));
|
|
||||||
|
|
||||||
return (priority_a == priority_b) ? 0 :
|
|
||||||
(priority_a < priority_b) ? -1 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static OstreeFetcherPendingURI *
|
static OstreeFetcherPendingURI *
|
||||||
pending_uri_ref (OstreeFetcherPendingURI *pending)
|
pending_uri_ref (OstreeFetcherPendingURI *pending)
|
||||||
{
|
{
|
||||||
@ -403,30 +387,23 @@ static void
|
|||||||
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
|
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
session_thread_process_pending_queue (ThreadClosure *thread_closure)
|
start_pending_request (ThreadClosure *thread_closure,
|
||||||
|
GTask *task)
|
||||||
{
|
{
|
||||||
|
|
||||||
while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
|
OstreeFetcherPendingURI *pending;
|
||||||
g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
|
GCancellable *cancellable;
|
||||||
{
|
|
||||||
GTask *task;
|
|
||||||
OstreeFetcherPendingURI *pending;
|
|
||||||
GCancellable *cancellable;
|
|
||||||
|
|
||||||
task = g_queue_pop_head (&thread_closure->pending_queue);
|
g_assert_cmpint (g_hash_table_size (thread_closure->outstanding), <, thread_closure->max_outstanding);
|
||||||
|
|
||||||
pending = g_task_get_task_data (task);
|
pending = g_task_get_task_data (task);
|
||||||
cancellable = g_task_get_cancellable (task);
|
cancellable = g_task_get_cancellable (task);
|
||||||
|
|
||||||
g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
|
g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
|
||||||
|
soup_request_send_async (pending->request,
|
||||||
soup_request_send_async (pending->request,
|
cancellable,
|
||||||
cancellable,
|
on_request_sent,
|
||||||
on_request_sent,
|
g_object_ref (task));
|
||||||
g_object_ref (task));
|
|
||||||
|
|
||||||
g_object_unref (task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -547,10 +524,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
|
|||||||
pending->out_tmpfile = tmpfile;
|
pending->out_tmpfile = tmpfile;
|
||||||
tmpfile = NULL; /* Transfer ownership */
|
tmpfile = NULL; /* Transfer ownership */
|
||||||
|
|
||||||
g_queue_insert_sorted (&thread_closure->pending_queue,
|
start_pending_request (thread_closure, task);
|
||||||
g_object_ref (task),
|
|
||||||
pending_task_compare, NULL);
|
|
||||||
session_thread_process_pending_queue (thread_closure);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,8 +574,6 @@ ostree_fetcher_session_thread (gpointer data)
|
|||||||
* unreference all data related to the SoupSession ourself to ensure
|
* unreference all data related to the SoupSession ourself to ensure
|
||||||
* it's freed in the same thread where it was created. */
|
* it's freed in the same thread where it was created. */
|
||||||
g_clear_pointer (&closure->outstanding, g_hash_table_unref);
|
g_clear_pointer (&closure->outstanding, g_hash_table_unref);
|
||||||
while (!g_queue_is_empty (&closure->pending_queue))
|
|
||||||
g_object_unref (g_queue_pop_head (&closure->pending_queue));
|
|
||||||
g_clear_pointer (&closure->session, g_object_unref);
|
g_clear_pointer (&closure->session, g_object_unref);
|
||||||
|
|
||||||
thread_closure_unref (closure);
|
thread_closure_unref (closure);
|
||||||
@ -903,11 +875,6 @@ finish_stream (OstreeFetcherPendingURI *pending,
|
|||||||
|
|
||||||
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
|
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
|
||||||
|
|
||||||
/* Now that we've finished downloading, continue with other queued
|
|
||||||
* requests.
|
|
||||||
*/
|
|
||||||
session_thread_process_pending_queue (pending->thread_closure);
|
|
||||||
|
|
||||||
if (!pending->is_membuf)
|
if (!pending->is_membuf)
|
||||||
{
|
{
|
||||||
if (stbuf.st_size < pending->content_length)
|
if (stbuf.st_size < pending->content_length)
|
||||||
@ -935,14 +902,13 @@ on_stream_read (GObject *object,
|
|||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
|
remove_pending (OstreeFetcherPendingURI *pending)
|
||||||
{
|
{
|
||||||
/* Hold a temporary ref to ensure the reference to
|
/* Hold a temporary ref to ensure the reference to
|
||||||
* pending->thread_closure is valid.
|
* pending->thread_closure is valid.
|
||||||
*/
|
*/
|
||||||
pending_uri_ref (pending);
|
pending_uri_ref (pending);
|
||||||
g_hash_table_remove (pending->thread_closure->outstanding, pending);
|
g_hash_table_remove (pending->thread_closure->outstanding, pending);
|
||||||
session_thread_process_pending_queue (pending->thread_closure);
|
|
||||||
pending_uri_unref (pending);
|
pending_uri_unref (pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -976,7 +942,7 @@ on_out_splice_complete (GObject *object,
|
|||||||
if (local_error)
|
if (local_error)
|
||||||
{
|
{
|
||||||
g_task_return_error (task, local_error);
|
g_task_return_error (task, local_error);
|
||||||
remove_pending_rerun_queue (pending);
|
remove_pending (pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
g_object_unref (task);
|
g_object_unref (task);
|
||||||
@ -1018,7 +984,7 @@ on_stream_read (GObject *object,
|
|||||||
g_strdup (pending->out_tmpfile),
|
g_strdup (pending->out_tmpfile),
|
||||||
(GDestroyNotify) g_free);
|
(GDestroyNotify) g_free);
|
||||||
}
|
}
|
||||||
remove_pending_rerun_queue (pending);
|
remove_pending (pending);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1057,7 +1023,7 @@ on_stream_read (GObject *object,
|
|||||||
if (local_error)
|
if (local_error)
|
||||||
{
|
{
|
||||||
g_task_return_error (task, local_error);
|
g_task_return_error (task, local_error);
|
||||||
remove_pending_rerun_queue (pending);
|
remove_pending (pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
g_object_unref (task);
|
g_object_unref (task);
|
||||||
@ -1096,7 +1062,7 @@ on_request_sent (GObject *object,
|
|||||||
g_task_return_pointer (task,
|
g_task_return_pointer (task,
|
||||||
g_strdup (pending->out_tmpfile),
|
g_strdup (pending->out_tmpfile),
|
||||||
(GDestroyNotify) g_free);
|
(GDestroyNotify) g_free);
|
||||||
remove_pending_rerun_queue (pending);
|
remove_pending (pending);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
|
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
|
||||||
@ -1110,10 +1076,8 @@ on_request_sent (GObject *object,
|
|||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
(void) g_input_stream_close (pending->request_body, NULL, NULL);
|
(void) g_input_stream_close (pending->request_body, NULL, NULL);
|
||||||
g_queue_insert_sorted (&pending->thread_closure->pending_queue,
|
|
||||||
g_object_ref (task), pending_task_compare,
|
start_pending_request (pending->thread_closure, task);
|
||||||
NULL);
|
|
||||||
remove_pending_rerun_queue (pending);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1204,7 +1168,7 @@ on_request_sent (GObject *object,
|
|||||||
if (pending->request_body)
|
if (pending->request_body)
|
||||||
(void) g_input_stream_close (pending->request_body, NULL, NULL);
|
(void) g_input_stream_close (pending->request_body, NULL, NULL);
|
||||||
g_task_return_error (task, local_error);
|
g_task_return_error (task, local_error);
|
||||||
remove_pending_rerun_queue (pending);
|
remove_pending (pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
g_object_unref (task);
|
g_object_unref (task);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user