From 8a5cd31e5fa0b1313a196b902e4b3c4603e7dfdf Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 15 Dec 2017 22:24:52 +0100 Subject: [PATCH] sd-bus: optionally, use inotify to wait for bus sockets to appear This adds a "watch-bind" feature to sd-bus connections. If set and the AF_UNIX socket we are connecting to doesn't exist yet, we'll establish an inotify watch instead, and wait for the socket to appear. In other words, a missing AF_UNIX just makes connecting slower. This is useful for daemons such as networkd or resolved that shall be able to run during early-boot, before dbus-daemon is up, and want to connect to dbus-daemon as soon as it becomes ready. --- src/libsystemd/libsystemd.sym | 6 + src/libsystemd/sd-bus/bus-internal.h | 13 + src/libsystemd/sd-bus/bus-socket.c | 279 +++++++++++++++- src/libsystemd/sd-bus/bus-socket.h | 1 + src/libsystemd/sd-bus/sd-bus.c | 337 +++++++++++++------- src/libsystemd/sd-bus/test-bus-watch-bind.c | 239 ++++++++++++++ src/systemd/sd-bus.h | 4 +- src/test/meson.build | 4 + 8 files changed, 760 insertions(+), 123 deletions(-) create mode 100644 src/libsystemd/sd-bus/test-bus-watch-bind.c diff --git a/src/libsystemd/libsystemd.sym b/src/libsystemd/libsystemd.sym index 1a29b03e855..4229e0aeb18 100644 --- a/src/libsystemd/libsystemd.sym +++ b/src/libsystemd/libsystemd.sym @@ -530,3 +530,9 @@ global: sd_bus_message_new; sd_bus_message_seal; } LIBSYSTEMD_234; + +LIBSYSTEMD_237 { +global: + sd_bus_set_watch_bind; + sd_bus_get_watch_bind; +} LIBSYSTEMD_236; diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h index 4175ca3efe0..629d8b3f370 100644 --- a/src/libsystemd/sd-bus/bus-internal.h +++ b/src/libsystemd/sd-bus/bus-internal.h @@ -157,6 +157,7 @@ struct sd_bus_slot { enum bus_state { BUS_UNSET, + BUS_WATCH_BIND, /* waiting for the socket to appear via inotify */ BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, @@ -188,6 +189,7 @@ struct sd_bus { enum bus_state state; int input_fd, output_fd; + int inotify_fd; int message_version; int message_endian; @@ -210,6 +212,7 @@ struct sd_bus { bool exited:1; bool exit_triggered:1; bool is_local:1; + bool watch_bind:1; int use_memfd; @@ -293,6 +296,7 @@ struct sd_bus { sd_event_source *output_io_event_source; sd_event_source *time_event_source; sd_event_source *quit_event_source; + sd_event_source *inotify_event_source; sd_event *event; int event_priority; @@ -312,6 +316,9 @@ struct sd_bus { LIST_HEAD(sd_bus_slot, slots); LIST_HEAD(sd_bus_track, tracks); + + int *inotify_watches; + size_t n_inotify_watches; }; /* For method calls we time-out at 25s, like in the D-Bus reference implementation */ @@ -367,6 +374,12 @@ bool bus_pid_changed(sd_bus *bus); char *bus_address_escape(const char *v); +int bus_attach_io_events(sd_bus *b); +int bus_attach_inotify_event(sd_bus *b); + +void bus_close_inotify_fd(sd_bus *b); +void bus_close_io_fds(sd_bus *b); + #define OBJECT_PATH_FOREACH_PREFIX(prefix, path) \ for (char *_slash = ({ strcpy((prefix), (path)); streq((prefix), "/") ? NULL : strrchr((prefix), '/'); }) ; \ _slash && !(_slash[(_slash) == (prefix)] = 0); \ diff --git a/src/libsystemd/sd-bus/bus-socket.c b/src/libsystemd/sd-bus/bus-socket.c index 5034d51472d..9291fed0e75 100644 --- a/src/libsystemd/sd-bus/bus-socket.c +++ b/src/libsystemd/sd-bus/bus-socket.c @@ -32,9 +32,12 @@ #include "bus-socket.h" #include "fd-util.h" #include "format-util.h" +#include "fs-util.h" #include "hexdecoct.h" +#include "io-util.h" #include "macro.h" #include "missing.h" +#include "path-util.h" #include "selinux-util.h" #include "signal-util.h" #include "stdio-util.h" @@ -688,30 +691,249 @@ int bus_socket_start_auth(sd_bus *b) { return bus_socket_start_auth_client(b); } +static int bus_socket_inotify_setup(sd_bus *b) { + _cleanup_free_ int *new_watches = NULL; + _cleanup_free_ char *absolute = NULL; + size_t n_allocated = 0, n = 0, done = 0, i; + unsigned max_follow = 32; + const char *p; + int wd, r; + + assert(b); + assert(b->watch_bind); + assert(b->sockaddr.sa.sa_family == AF_UNIX); + assert(b->sockaddr.un.sun_path[0] != 0); + + /* Sets up an inotify fd in case watch_bind is enabled: wait until the configured AF_UNIX file system socket + * appears before connecting to it. The implemented is pretty simplistic: we just subscribe to relevant changes + * to all prefix components of the path, and every time we get an event for that we try to reconnect again, + * without actually caring what precisely the event we got told us. If we still can't connect we re-subscribe + * to all relevant changes of anything in the path, so that our watches include any possibly newly created path + * components. */ + + if (b->inotify_fd < 0) { + b->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); + if (b->inotify_fd < 0) + return -errno; + } + + /* Make sure the path is NUL terminated */ + p = strndupa(b->sockaddr.un.sun_path, sizeof(b->sockaddr.un.sun_path)); + + /* Make sure the path is absolute */ + r = path_make_absolute_cwd(p, &absolute); + if (r < 0) + goto fail; + + /* Watch all parent directories, and don't mind any prefix that doesn't exist yet. For the innermost directory + * that exists we want to know when files are created or moved into it. For all parents of it we just care if + * they are removed or renamed. */ + + if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) { + r = -ENOMEM; + goto fail; + } + + /* Start with the top-level directory, which is a bit simpler than the rest, since it can't be a symlink, and + * always exists */ + wd = inotify_add_watch(b->inotify_fd, "/", IN_CREATE|IN_MOVED_TO); + if (wd < 0) { + r = log_debug_errno(errno, "Failed to add inotify watch on /: %m"); + goto fail; + } else + new_watches[n++] = wd; + + for (;;) { + _cleanup_free_ char *component = NULL, *prefix = NULL, *destination = NULL; + size_t n_slashes, n_component; + char *c = NULL; + + n_slashes = strspn(absolute + done, "/"); + n_component = n_slashes + strcspn(absolute + done + n_slashes, "/"); + + if (n_component == 0) /* The end */ + break; + + component = strndup(absolute + done, n_component); + if (!component) { + r = -ENOMEM; + goto fail; + } + + /* A trailing slash? That's a directory, and not a socket then */ + if (path_equal(component, "/")) { + r = -EISDIR; + goto fail; + } + + /* A single dot? Let's eat this up */ + if (path_equal(component, "/.")) { + done += n_component; + continue; + } + + prefix = strndup(absolute, done + n_component); + if (!prefix) { + r = -ENOMEM; + goto fail; + } + + if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) { + r = -ENOMEM; + goto fail; + } + + wd = inotify_add_watch(b->inotify_fd, prefix, IN_DELETE_SELF|IN_MOVE_SELF|IN_ATTRIB|IN_CREATE|IN_MOVED_TO|IN_DONT_FOLLOW); + log_debug("Added inotify watch for %s on bus %s: %i", prefix, strna(b->description), wd); + + if (wd < 0) { + if (IN_SET(errno, ENOENT, ELOOP)) + break; /* This component doesn't exist yet, or the path contains a cyclic symlink right now */ + + r = log_debug_errno(errno, "Failed to add inotify watch on %s: %m", isempty(prefix) ? "/" : prefix); + goto fail; + } else + new_watches[n++] = wd; + + /* Check if this is possibly a symlink. If so, let's follow it and watch it too. */ + r = readlink_malloc(prefix, &destination); + if (r == -EINVAL) { /* not a symlink */ + done += n_component; + continue; + } + if (r < 0) + goto fail; + + if (isempty(destination)) { /* Empty symlink target? Yuck! */ + r = -EINVAL; + goto fail; + } + + if (max_follow <= 0) { /* Let's make sure we don't follow symlinks forever */ + r = -ELOOP; + goto fail; + } + + if (path_is_absolute(destination)) { + /* For absolute symlinks we build the new path and start anew */ + c = strjoin(destination, absolute + done + n_component); + done = 0; + } else { + _cleanup_free_ char *t = NULL; + + /* For relative symlinks we replace the last component, and try again */ + t = strndup(absolute, done); + if (!t) + return -ENOMEM; + + c = strjoin(t, "/", destination, absolute + done + n_component); + } + if (!c) { + r = -ENOMEM; + goto fail; + } + + free(absolute); + absolute = c; + + max_follow--; + } + + /* And now, let's remove all watches from the previous iteration we don't need anymore */ + for (i = 0; i < b->n_inotify_watches; i++) { + bool found = false; + size_t j; + + for (j = 0; j < n; j++) + if (new_watches[j] == b->inotify_watches[i]) { + found = true; + break; + } + + if (found) + continue; + + (void) inotify_rm_watch(b->inotify_fd, b->inotify_watches[i]); + } + + free_and_replace(b->inotify_watches, new_watches); + b->n_inotify_watches = n; + + return 0; + +fail: + bus_close_inotify_fd(b); + return r; +} + int bus_socket_connect(sd_bus *b) { + bool inotify_done = false; int r; assert(b); - assert(b->input_fd < 0); - assert(b->output_fd < 0); - assert(b->sockaddr.sa.sa_family != AF_UNSPEC); - b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); - if (b->input_fd < 0) - return -errno; + for (;;) { + assert(b->input_fd < 0); + assert(b->output_fd < 0); + assert(b->sockaddr.sa.sa_family != AF_UNSPEC); - b->output_fd = b->input_fd; + b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (b->input_fd < 0) + return -errno; - bus_socket_setup(b); + b->output_fd = b->input_fd; + bus_socket_setup(b); - r = connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size); - if (r < 0) { - if (errno == EINPROGRESS) - return 1; + if (connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size) < 0) { + if (errno == EINPROGRESS) { - return -errno; + /* If we have any inotify watches open, close them now, we don't need them anymore, as + * we have successfully initiated a connection */ + bus_close_inotify_fd(b); + + /* Note that very likely we are already in BUS_OPENING state here, as we enter it when + * we start parsing the address string. The only reason we set the state explicitly + * here, is to undo BUS_WATCH_BIND, in case we did the inotify magic. */ + b->state = BUS_OPENING; + return 1; + } + + if (IN_SET(errno, ENOENT, ECONNREFUSED) && /* ENOENT → unix socket doesn't exist at all; ECONNREFUSED → unix socket stale */ + b->watch_bind && + b->sockaddr.sa.sa_family == AF_UNIX && + b->sockaddr.un.sun_path[0] != 0) { + + /* This connection attempt failed, let's release the socket for now, and start with a + * fresh one when reconnecting. */ + bus_close_io_fds(b); + + if (inotify_done) { + /* inotify set up already, don't do it again, just return now, and remember + * that we are waiting for inotify events now. */ + b->state = BUS_WATCH_BIND; + return 1; + } + + /* This is a file system socket, and the inotify logic is enabled. Let's create the necessary inotify fd. */ + r = bus_socket_inotify_setup(b); + if (r < 0) + return r; + + /* Let's now try to connect a second time, because in theory there's otherwise a race + * here: the socket might have been created in the time between our first connect() and + * the time we set up the inotify logic. But let's remember that we set up inotify now, + * so that we don't do the connect() more than twice. */ + inotify_done = true; + + } else + return -errno; + } else + break; } + /* Yay, established, we don't need no inotify anymore! */ + bus_close_inotify_fd(b); + return bus_socket_start_auth(b); } @@ -1069,3 +1291,34 @@ int bus_socket_process_authenticating(sd_bus *b) { return bus_socket_read_auth(b); } + +int bus_socket_process_watch_bind(sd_bus *b) { + int r, q; + + assert(b); + assert(b->state == BUS_WATCH_BIND); + assert(b->inotify_fd >= 0); + + r = flush_fd(b->inotify_fd); + if (r <= 0) + return r; + + log_debug("Got inotify event on bus %s.", strna(b->description)); + + /* We flushed events out of the inotify fd. In that case, maybe the socket is valid now? Let's try to connect + * to it again */ + + r = bus_socket_connect(b); + if (r < 0) + return r; + + q = bus_attach_io_events(b); + if (q < 0) + return q; + + q = bus_attach_inotify_event(b); + if (q < 0) + return q; + + return r; +} diff --git a/src/libsystemd/sd-bus/bus-socket.h b/src/libsystemd/sd-bus/bus-socket.h index 915a283f5ad..c180562f981 100644 --- a/src/libsystemd/sd-bus/bus-socket.h +++ b/src/libsystemd/sd-bus/bus-socket.h @@ -34,5 +34,6 @@ int bus_socket_read_message(sd_bus *bus); int bus_socket_process_opening(sd_bus *b); int bus_socket_process_authenticating(sd_bus *b); +int bus_socket_process_watch_bind(sd_bus *b); bool bus_socket_auth_needs_write(sd_bus *b); diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index 0866a09d1f8..3b65f51d04c 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -72,23 +72,33 @@ } while (false) static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec); -static int attach_io_events(sd_bus *b); -static void detach_io_events(sd_bus *b); +static void bus_detach_io_events(sd_bus *b); +static void bus_detach_inotify_event(sd_bus *b); static thread_local sd_bus *default_system_bus = NULL; static thread_local sd_bus *default_user_bus = NULL; static thread_local sd_bus *default_starter_bus = NULL; -static void bus_close_fds(sd_bus *b) { +void bus_close_io_fds(sd_bus *b) { assert(b); - detach_io_events(b); + bus_detach_io_events(b); if (b->input_fd != b->output_fd) safe_close(b->output_fd); b->output_fd = b->input_fd = safe_close(b->input_fd); } +void bus_close_inotify_fd(sd_bus *b) { + assert(b); + + bus_detach_inotify_event(b); + + b->inotify_fd = safe_close(b->inotify_fd); + b->inotify_watches = mfree(b->inotify_watches); + b->n_inotify_watches = 0; +} + static void bus_reset_queues(sd_bus *b) { assert(b); @@ -132,7 +142,8 @@ static void bus_free(sd_bus *b) { if (b->default_bus_ptr) *b->default_bus_ptr = NULL; - bus_close_fds(b); + bus_close_io_fds(b); + bus_close_inotify_fd(b); free(b->label); free(b->groups); @@ -182,6 +193,7 @@ _public_ int sd_bus_new(sd_bus **ret) { r->n_ref = REFCNT_INIT; r->input_fd = r->output_fd = -1; + r->inotify_fd = -1; r->message_version = 1; r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME; r->hello_flags |= KDBUS_HELLO_ACCEPT_FD; @@ -379,6 +391,22 @@ _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) { return bus->allow_interactive_authorization; } +_public_ int sd_bus_set_watch_bind(sd_bus *bus, int b) { + assert_return(bus, -EINVAL); + assert_return(bus->state == BUS_UNSET, -EPERM); + assert_return(!bus_pid_changed(bus), -ECHILD); + + bus->watch_bind = b; + return 0; +} + +_public_ int sd_bus_get_watch_bind(sd_bus *bus) { + assert_return(bus, -EINVAL); + assert_return(!bus_pid_changed(bus), -ECHILD); + + return bus->watch_bind; +} + static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) { const char *s; sd_bus *bus; @@ -901,7 +929,8 @@ static int bus_start_address(sd_bus *b) { assert(b); for (;;) { - bus_close_fds(b); + bus_close_io_fds(b); + bus_close_inotify_fd(b); /* If you provide multiple different bus-addresses, we * try all of them in order and use the first one that @@ -909,20 +938,25 @@ static int bus_start_address(sd_bus *b) { if (b->exec_path) r = bus_socket_exec(b); - else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC) r = bus_container_connect_socket(b); - else if (b->sockaddr.sa.sa_family != AF_UNSPEC) r = bus_socket_connect(b); - else goto next; if (r >= 0) { - r = attach_io_events(b); - if (r >= 0) - return r; + int q; + + q = bus_attach_io_events(b); + if (q < 0) + return q; + + q = bus_attach_inotify_event(b); + if (q < 0) + return q; + + return r; } b->last_connect_error = -r; @@ -1305,7 +1339,8 @@ _public_ void sd_bus_close(sd_bus *bus) { * the bus object and the bus may be freed */ bus_reset_queues(bus); - bus_close_fds(bus); + bus_close_io_fds(bus); + bus_close_inotify_fd(bus); } _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) { @@ -1322,7 +1357,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) { static void bus_enter_closing(sd_bus *bus) { assert(bus); - if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING)) + if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING)) return; bus->state = BUS_CLOSING; @@ -1973,7 +2008,16 @@ _public_ int sd_bus_get_fd(sd_bus *bus) { assert_return(bus->input_fd == bus->output_fd, -EPERM); assert_return(!bus_pid_changed(bus), -ECHILD); - return bus->input_fd; + if (bus->state == BUS_CLOSED) + return -ENOTCONN; + + if (bus->inotify_fd >= 0) + return bus->inotify_fd; + + if (bus->input_fd >= 0) + return bus->input_fd; + + return -ENOTCONN; } _public_ int sd_bus_get_events(sd_bus *bus) { @@ -1982,23 +2026,40 @@ _public_ int sd_bus_get_events(sd_bus *bus) { assert_return(bus, -EINVAL); assert_return(!bus_pid_changed(bus), -ECHILD); - if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING) + switch (bus->state) { + + case BUS_UNSET: + case BUS_CLOSED: return -ENOTCONN; - if (bus->state == BUS_OPENING) - flags |= POLLOUT; - else if (bus->state == BUS_AUTHENTICATING) { + case BUS_WATCH_BIND: + flags |= POLLIN; + break; + case BUS_OPENING: + flags |= POLLOUT; + break; + + case BUS_AUTHENTICATING: if (bus_socket_auth_needs_write(bus)) flags |= POLLOUT; flags |= POLLIN; + break; - } else if (IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) { + case BUS_RUNNING: + case BUS_HELLO: if (bus->rqueue_size <= 0) flags |= POLLIN; if (bus->wqueue_size > 0) flags |= POLLOUT; + break; + + case BUS_CLOSING: + break; + + default: + assert_not_reached("Unknown state"); } return flags; @@ -2019,39 +2080,45 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) { return 1; } - if (bus->state == BUS_CLOSING) { - *timeout_usec = 0; - return 1; - } + switch (bus->state) { - if (bus->state == BUS_AUTHENTICATING) { + case BUS_AUTHENTICATING: *timeout_usec = bus->auth_timeout; return 1; - } - if (!IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) { - *timeout_usec = (uint64_t) -1; - return 0; - } + case BUS_RUNNING: + case BUS_HELLO: + if (bus->rqueue_size > 0) { + *timeout_usec = 0; + return 1; + } - if (bus->rqueue_size > 0) { + c = prioq_peek(bus->reply_callbacks_prioq); + if (!c) { + *timeout_usec = (uint64_t) -1; + return 0; + } + + if (c->timeout == 0) { + *timeout_usec = (uint64_t) -1; + return 0; + } + + *timeout_usec = c->timeout; + return 1; + + case BUS_CLOSING: *timeout_usec = 0; return 1; - } - c = prioq_peek(bus->reply_callbacks_prioq); - if (!c) { + case BUS_WATCH_BIND: + case BUS_OPENING: *timeout_usec = (uint64_t) -1; return 0; - } - if (c->timeout == 0) { - *timeout_usec = (uint64_t) -1; - return 0; + default: + assert_not_reached("Unknown or unexpected stat"); } - - *timeout_usec = c->timeout; - return 1; } static int process_timeout(sd_bus *bus) { @@ -2114,8 +2181,8 @@ static int process_timeout(sd_bus *bus) { sd_bus_slot_unref(slot); - /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and - * ignore the callback handler's return value. */ + /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log + * and ignore the callback handler's return value. */ if (is_hello) return r; @@ -2219,8 +2286,8 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) { sd_bus_slot_unref(slot); - /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log - * and ignore the callback handler's return value. */ + /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and + * ignore the callback handler's return value. */ if (is_hello) return r; @@ -2656,48 +2723,44 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit case BUS_CLOSED: return -ECONNRESET; + case BUS_WATCH_BIND: + r = bus_socket_process_watch_bind(bus); + break; + case BUS_OPENING: r = bus_socket_process_opening(bus); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; - } else if (r < 0) - return r; - if (ret) - *ret = NULL; - return r; + break; case BUS_AUTHENTICATING: r = bus_socket_process_authenticating(bus); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; - } else if (r < 0) - return r; - - if (ret) - *ret = NULL; - - return r; + break; case BUS_RUNNING: case BUS_HELLO: r = process_running(bus, hint_priority, priority, ret); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; + if (r >= 0) + return r; - if (ret) - *ret = NULL; - } - - return r; + /* This branch initializes *ret, hence we don't use the generic error checking below */ + break; case BUS_CLOSING: return process_closing(bus, ret); + + default: + assert_not_reached("Unknown state"); } - assert_not_reached("Unknown state"); + if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { + bus_enter_closing(bus); + r = 1; + } else if (r < 0) + return r; + + if (ret) + *ret = NULL; + + return r; } _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { @@ -2710,7 +2773,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { struct pollfd p[2] = {}; - int r, e, n; + int r, n; struct timespec ts; usec_t m = USEC_INFINITY; @@ -2722,45 +2785,52 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { if (!BUS_IS_OPEN(bus->state)) return -ENOTCONN; - e = sd_bus_get_events(bus); - if (e < 0) - return e; + if (bus->state == BUS_WATCH_BIND) { + assert(bus->inotify_fd >= 0); - if (need_more) - /* The caller really needs some more data, he doesn't - * care about what's already read, or any timeouts - * except its own. */ - e |= POLLIN; - else { - usec_t until; - /* The caller wants to process if there's something to - * process, but doesn't care otherwise */ + p[0].events = POLLIN; + p[0].fd = bus->inotify_fd; + n = 1; + } else { + int e; - r = sd_bus_get_timeout(bus, &until); - if (r < 0) - return r; - if (r > 0) { - usec_t nw; - nw = now(CLOCK_MONOTONIC); - m = until > nw ? until - nw : 0; + e = sd_bus_get_events(bus); + if (e < 0) + return e; + + if (need_more) + /* The caller really needs some more data, he doesn't + * care about what's already read, or any timeouts + * except its own. */ + e |= POLLIN; + else { + usec_t until; + /* The caller wants to process if there's something to + * process, but doesn't care otherwise */ + + r = sd_bus_get_timeout(bus, &until); + if (r < 0) + return r; + if (r > 0) + m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC)); + } + + p[0].fd = bus->input_fd; + if (bus->output_fd == bus->input_fd) { + p[0].events = e; + n = 1; + } else { + p[0].events = e & POLLIN; + p[1].fd = bus->output_fd; + p[1].events = e & POLLOUT; + n = 2; } } - if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m)) + if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m)) m = timeout_usec; - p[0].fd = bus->input_fd; - if (bus->output_fd == bus->input_fd) { - p[0].events = e; - n = 1; - } else { - p[0].events = e & POLLIN; - p[1].fd = bus->output_fd; - p[1].events = e & POLLOUT; - n = 2; - } - - r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL); + r = ppoll(p, n, m == USEC_INFINITY ? NULL : timespec_store(&ts, m), NULL); if (r < 0) return -errno; @@ -2796,6 +2866,10 @@ _public_ int sd_bus_flush(sd_bus *bus) { if (!BUS_IS_OPEN(bus->state)) return -ENOTCONN; + /* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */ + if (bus->state == BUS_WATCH_BIND) + return -EUNATCH; + r = bus_ensure_running(bus); if (r < 0) return r; @@ -2966,6 +3040,8 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd assert(bus); + /* Note that this is called both on input_fd, output_fd as well as inotify_fd events */ + r = sd_bus_process(bus, NULL); if (r < 0) { log_debug_errno(r, "Processing of bus failed, closing down: %m"); @@ -3053,7 +3129,7 @@ static int quit_callback(sd_event_source *event, void *userdata) { return 1; } -static int attach_io_events(sd_bus *bus) { +int bus_attach_io_events(sd_bus *bus) { int r; assert(bus); @@ -3107,7 +3183,7 @@ static int attach_io_events(sd_bus *bus) { return 0; } -static void detach_io_events(sd_bus *bus) { +static void bus_detach_io_events(sd_bus *bus) { assert(bus); if (bus->input_io_event_source) { @@ -3121,6 +3197,44 @@ static void detach_io_events(sd_bus *bus) { } } +int bus_attach_inotify_event(sd_bus *bus) { + int r; + + assert(bus); + + if (bus->inotify_fd < 0) + return 0; + + if (!bus->event) + return 0; + + if (!bus->inotify_event_source) { + r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus); + if (r < 0) + return r; + + r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority); + if (r < 0) + return r; + + r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify"); + } else + r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd); + if (r < 0) + return r; + + return 0; +} + +static void bus_detach_inotify_event(sd_bus *bus) { + assert(bus); + + if (bus->inotify_event_source) { + sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF); + bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source); + } +} + _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) { int r; @@ -3161,7 +3275,11 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) { if (r < 0) goto fail; - r = attach_io_events(bus); + r = bus_attach_io_events(bus); + if (r < 0) + goto fail; + + r = bus_attach_inotify_event(bus); if (r < 0) goto fail; @@ -3178,7 +3296,8 @@ _public_ int sd_bus_detach_event(sd_bus *bus) { if (!bus->event) return 0; - detach_io_events(bus); + bus_detach_io_events(bus); + bus_detach_inotify_event(bus); if (bus->time_event_source) { sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF); diff --git a/src/libsystemd/sd-bus/test-bus-watch-bind.c b/src/libsystemd/sd-bus/test-bus-watch-bind.c new file mode 100644 index 00000000000..aef5ba9486b --- /dev/null +++ b/src/libsystemd/sd-bus/test-bus-watch-bind.c @@ -0,0 +1,239 @@ +/* SPDX-License-Identifier: LGPL-2.1+ */ +/*** + This file is part of systemd. + + Copyright 2017 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see . +***/ + +#include + +#include "sd-bus.h" +#include "sd-event.h" +#include "sd-id128.h" + +#include "alloc-util.h" +#include "fd-util.h" +#include "fileio.h" +#include "fs-util.h" +#include "mkdir.h" +#include "path-util.h" +#include "random-util.h" +#include "rm-rf.h" +#include "socket-util.h" +#include "string-util.h" + +static int method_foobar(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) { + log_info("Got Foobar() call."); + + assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0); + return sd_bus_reply_method_return(m, NULL); +} + +static int method_exit(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) { + log_info("Got Exit() call"); + assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 1) >= 0); + return sd_bus_reply_method_return(m, NULL); +} + +static const sd_bus_vtable vtable[] = { + SD_BUS_VTABLE_START(0), + SD_BUS_METHOD("Foobar", NULL, NULL, method_foobar, SD_BUS_VTABLE_UNPRIVILEGED), + SD_BUS_METHOD("Exit", NULL, NULL, method_exit, SD_BUS_VTABLE_UNPRIVILEGED), + SD_BUS_VTABLE_END, +}; + +static void* thread_server(void *p) { + _cleanup_free_ char *suffixed = NULL, *suffixed2 = NULL, *d = NULL; + _cleanup_close_ int fd = -1; + union sockaddr_union u = { + .un.sun_family = AF_UNIX, + }; + const char *path = p; + + log_debug("Initializing server"); + + /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */ + (void) usleep(100 * USEC_PER_MSEC); + + assert_se(mkdir_parents(path, 0755) >= 0); + (void) usleep(100 * USEC_PER_MSEC); + + d = dirname_malloc(path); + assert_se(d); + assert_se(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()) >= 0); + assert_se(rename(d, suffixed) >= 0); + (void) usleep(100 * USEC_PER_MSEC); + + assert_se(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()) >= 0); + assert_se(symlink(suffixed2, d) >= 0); + (void) usleep(100 * USEC_PER_MSEC); + + assert_se(symlink(basename(suffixed), suffixed2) >= 0); + (void) usleep(100 * USEC_PER_MSEC); + + strncpy(u.un.sun_path, path, sizeof(u.un.sun_path)); + + fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + assert_se(fd >= 0); + + assert_se(bind(fd, &u.sa, SOCKADDR_UN_LEN(u.un)) >= 0); + usleep(100 * USEC_PER_MSEC); + + assert_se(listen(fd, SOMAXCONN) >= 0); + usleep(100 * USEC_PER_MSEC); + + assert_se(touch(path) >= 0); + usleep(100 * USEC_PER_MSEC); + + log_debug("Initialized server"); + + for (;;) { + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + sd_id128_t id; + int bus_fd, code; + + assert_se(sd_id128_randomize(&id) >= 0); + + assert_se(sd_event_new(&event) >= 0); + + bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + assert_se(bus_fd >= 0); + + log_debug("Accepted server connection"); + + assert_se(sd_bus_new(&bus) >= 0); + assert_se(sd_bus_set_description(bus, "server") >= 0); + assert_se(sd_bus_set_fd(bus, bus_fd, bus_fd) >= 0); + assert_se(sd_bus_set_server(bus, true, id) >= 0); + /* assert_se(sd_bus_set_anonymous(bus, true) >= 0); */ + + assert_se(sd_bus_attach_event(bus, event, 0) >= 0); + + assert_se(sd_bus_add_object_vtable(bus, NULL, "/foo", "foo.TestInterface", vtable, NULL) >= 0); + + assert_se(sd_bus_start(bus) >= 0); + + assert_se(sd_event_loop(event) >= 0); + + assert_se(sd_event_get_exit_code(event, &code) >= 0); + + if (code > 0) + break; + } + + log_debug("Server done"); + + return NULL; +} + +static void* thread_client1(void *p) { + _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; + const char *path = p, *t; + int r; + + log_debug("Initializing client1"); + + assert_se(sd_bus_new(&bus) >= 0); + assert_se(sd_bus_set_description(bus, "client1") >= 0); + + t = strjoina("unix:path=", path); + assert_se(sd_bus_set_address(bus, t) >= 0); + assert_se(sd_bus_set_watch_bind(bus, true) >= 0); + assert_se(sd_bus_start(bus) >= 0); + + r = sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", &error, NULL, NULL); + assert_se(r >= 0); + + log_debug("Client1 done"); + + return NULL; +} + +static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) { + assert_se(sd_bus_message_is_method_error(m, NULL) == 0); + assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0); + return 0; +} + +static void* thread_client2(void *p) { + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + const char *path = p, *t; + + log_debug("Initializing client2"); + + assert_se(sd_event_new(&event) >= 0); + assert_se(sd_bus_new(&bus) >= 0); + assert_se(sd_bus_set_description(bus, "client2") >= 0); + + t = strjoina("unix:path=", path); + assert_se(sd_bus_set_address(bus, t) >= 0); + assert_se(sd_bus_set_watch_bind(bus, true) >= 0); + assert_se(sd_bus_attach_event(bus, event, 0) >= 0); + assert_se(sd_bus_start(bus) >= 0); + + assert_se(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL) >= 0); + + assert_se(sd_event_loop(event) >= 0); + + log_debug("Client2 done"); + + return NULL; +} + +static void request_exit(const char *path) { + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; + const char *t; + + assert_se(sd_bus_new(&bus) >= 0); + + t = strjoina("unix:path=", path); + assert_se(sd_bus_set_address(bus, t) >= 0); + assert_se(sd_bus_set_watch_bind(bus, true) >= 0); + assert_se(sd_bus_set_description(bus, "request-exit") >= 0); + assert_se(sd_bus_start(bus) >= 0); + + assert_se(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL) >= 0); +} + +int main(int argc, char *argv[]) { + _cleanup_(rm_rf_physical_and_freep) char *d = NULL; + pthread_t server, client1, client2; + char *path; + + log_set_max_level(LOG_DEBUG); + + /* We use /dev/shm here rather than /tmp, since some weird distros might set up /tmp as some weird fs that + * doesn't support inotify properly. */ + assert_se(mkdtemp_malloc("/dev/shm/systemd-watch-bind-XXXXXX", &d) >= 0); + + path = strjoina(d, "/this/is/a/socket"); + + assert_se(pthread_create(&server, NULL, thread_server, path) == 0); + assert_se(pthread_create(&client1, NULL, thread_client1, path) == 0); + assert_se(pthread_create(&client2, NULL, thread_client2, path) == 0); + + assert_se(pthread_join(client1, NULL) == 0); + assert_se(pthread_join(client2, NULL) == 0); + + request_exit(path); + + assert_se(pthread_join(server, NULL) == 0); + + return 0; +} diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h index c5c7096d55b..66bc48842bf 100644 --- a/src/systemd/sd-bus.h +++ b/src/systemd/sd-bus.h @@ -150,8 +150,10 @@ int sd_bus_set_allow_interactive_authorization(sd_bus *bus, int b); int sd_bus_get_allow_interactive_authorization(sd_bus *bus); int sd_bus_set_exit_on_disconnect(sd_bus *bus, int b); int sd_bus_get_exit_on_disconnect(sd_bus *bus); +int sd_bus_set_watch_bind(sd_bus *bus, int b); +int sd_bus_get_watch_bind(sd_bus *bus); -int sd_bus_start(sd_bus *ret); +int sd_bus_start(sd_bus *bus); int sd_bus_try_close(sd_bus *bus); void sd_bus_close(sd_bus *bus); diff --git a/src/test/meson.build b/src/test/meson.build index 71b440637d7..18e957ddc82 100644 --- a/src/test/meson.build +++ b/src/test/meson.build @@ -762,6 +762,10 @@ tests += [ [], [threads]], + [['src/libsystemd/sd-bus/test-bus-watch-bind.c'], + [], + [threads], '', 'timeout=120'], + [['src/libsystemd/sd-bus/test-bus-chat.c'], [], [threads]],