diff --git a/src/journal/journald-file.c b/src/journal/journald-file.c index b12f40599b0..5dba7915170 100644 --- a/src/journal/journald-file.c +++ b/src/journal/journald-file.c @@ -1,5 +1,8 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ +#include +#include + #include "chattr-util.h" #include "fd-util.h" #include "format-util.h" @@ -10,10 +13,312 @@ #include "set.h" #include "sync-util.h" +static int journald_file_truncate(JournalFile *f) { + uint64_t p; + int r; + + /* truncate excess from the end of archives */ + r = journal_file_tail_end(f, &p); + if (r < 0) + return log_debug_errno(r, "Failed to determine end of tail object: %m"); + + /* arena_size can't exceed the file size, ensure it's updated before truncating */ + f->header->arena_size = htole64(p - le64toh(f->header->header_size)); + + if (ftruncate(f->fd, p) < 0) + log_debug_errno(errno, "Failed to truncate %s: %m", f->path); + + return 0; +} + +static int journald_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) { + Object o; + uint64_t offset, sz, n_items = 0, n_unused; + int r; + + if (n_entries == 0) + return 0; + + for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) { + r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o); + if (r < 0) + return r; + + n_items += journal_file_entry_array_n_items(&o); + p = q; + } + + if (p == 0) + return 0; + + if (n_entries > n_items) + return -EBADMSG; + + /* Amount of unused items in the final entry array. */ + n_unused = n_items - n_entries; + + if (n_unused == 0) + return 0; + + offset = p + offsetof(Object, entry_array.items) + + (journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t); + sz = p + le64toh(o.object.size) - offset; + + if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0) + return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path); + + return 0; +} + +static int journald_file_punch_holes(JournalFile *f) { + HashItem items[4096 / sizeof(HashItem)]; + uint64_t p, sz; + size_t to_read; + int r; + + r = journald_file_entry_array_punch_hole( + f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries)); + if (r < 0) + return r; + + p = le64toh(f->header->data_hash_table_offset); + sz = le64toh(f->header->data_hash_table_size); + to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items)); + + for (uint64_t i = p; i < p + sz; i += sizeof(items)) { + ssize_t n_read; + + n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i); + if (n_read < 0) + return n_read; + + for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) { + Object o; + + for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0; + q = le64toh(o.data.next_hash_offset)) { + + r = journal_file_read_object(f, OBJECT_DATA, q, &o); + if (r < 0) { + log_debug_errno(r, "Invalid data object: %m, ignoring"); + break; + } + + if (le64toh(o.data.n_entries) == 0) + continue; + + (void) journald_file_entry_array_punch_hole( + f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1); + } + } + } + + return 0; +} + +/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync(). + * As a result we use atomic operations on f->offline_state for inter-thread communications with + * journal_file_set_offline() and journal_file_set_online(). */ +static void journald_file_set_offline_internal(JournaldFile *f) { + assert(f); + assert(f->file->fd >= 0); + assert(f->file->header); + + for (;;) { + switch (f->file->offline_state) { + case OFFLINE_CANCEL: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_DONE)) + continue; + return; + + case OFFLINE_AGAIN_FROM_SYNCING: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING)) + continue; + break; + + case OFFLINE_AGAIN_FROM_OFFLINING: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING)) + continue; + break; + + case OFFLINE_SYNCING: + if (f->file->archive) { + (void) journald_file_truncate(f->file); + (void) journald_file_punch_holes(f->file); + } + + (void) fsync(f->file->fd); + + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING)) + continue; + + f->file->header->state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE; + (void) fsync(f->file->fd); + break; + + case OFFLINE_OFFLINING: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE)) + continue; + _fallthrough_; + case OFFLINE_DONE: + return; + + case OFFLINE_JOINED: + log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()"); + return; + } + } +} + +static void * journald_file_set_offline_thread(void *arg) { + JournaldFile *f = arg; + + (void) pthread_setname_np(pthread_self(), "journal-offline"); + + journald_file_set_offline_internal(f); + + return NULL; +} + +/* Trigger a restart if the offline thread is mid-flight in a restartable state. */ +static bool journald_file_set_offline_try_restart(JournaldFile *f) { + for (;;) { + switch (f->file->offline_state) { + case OFFLINE_AGAIN_FROM_SYNCING: + case OFFLINE_AGAIN_FROM_OFFLINING: + return true; + + case OFFLINE_CANCEL: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING)) + continue; + return true; + + case OFFLINE_SYNCING: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING)) + continue; + return true; + + case OFFLINE_OFFLINING: + if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING)) + continue; + return true; + + default: + return false; + } + } +} + +/* Sets a journal offline. + * + * If wait is false then an offline is dispatched in a separate thread for a + * subsequent journal_file_set_offline() or journal_file_set_online() of the + * same journal to synchronize with. + * + * If wait is true, then either an existing offline thread will be restarted + * and joined, or if none exists the offline is simply performed in this + * context without involving another thread. + */ +int journald_file_set_offline(JournaldFile *f, bool wait) { + int target_state; + bool restarted; + int r; + + assert(f); + + if (!f->file->writable) + return -EPERM; + + if (f->file->fd < 0 || !f->file->header) + return -EINVAL; + + target_state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE; + + /* An offlining journal is implicitly online and may modify f->header->state, + * we must also join any potentially lingering offline thread when already in + * the desired offline state. + */ + if (!journald_file_is_offlining(f) && f->file->header->state == target_state) + return journal_file_set_offline_thread_join(f->file); + + /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */ + restarted = journald_file_set_offline_try_restart(f); + if ((restarted && wait) || !restarted) { + r = journal_file_set_offline_thread_join(f->file); + if (r < 0) + return r; + } + + if (restarted) + return 0; + + /* Initiate a new offline. */ + f->file->offline_state = OFFLINE_SYNCING; + + if (wait) /* Without using a thread if waiting. */ + journald_file_set_offline_internal(f); + else { + sigset_t ss, saved_ss; + int k; + + assert_se(sigfillset(&ss) >= 0); + /* Don't block SIGBUS since the offlining thread accesses a memory mapped file. + * Asynchronous SIGBUS signals can safely be handled by either thread. */ + assert_se(sigdelset(&ss, SIGBUS) >= 0); + + r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss); + if (r > 0) + return -r; + + r = pthread_create(&f->file->offline_thread, NULL, journald_file_set_offline_thread, f); + + k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL); + if (r > 0) { + f->file->offline_state = OFFLINE_JOINED; + return -r; + } + if (k > 0) + return -k; + } + + return 0; +} + +bool journald_file_is_offlining(JournaldFile *f) { + assert(f); + + __sync_synchronize(); + + if (IN_SET(f->file->offline_state, OFFLINE_DONE, OFFLINE_JOINED)) + return false; + + return true; +} + JournaldFile* journald_file_close(JournaldFile *f) { if (!f) return NULL; +#if HAVE_GCRYPT + /* Write the final tag */ + if (f->file->seal && f->file->writable) { + int r; + + r = journal_file_append_tag(f->file); + if (r < 0) + log_error_errno(r, "Failed to append tag when closing journal: %m"); + } +#endif + + if (f->file->post_change_timer) { + if (sd_event_source_get_enabled(f->file->post_change_timer, NULL) > 0) + journal_file_post_change(f->file); + + sd_event_source_disable_unref(f->file->post_change_timer); + } + + journald_file_set_offline(f, true); + journal_file_close(f->file); return mfree(f); @@ -62,7 +367,7 @@ JournaldFile* journald_file_initiate_close(JournaldFile *f, Set *deferred_closes if (r < 0) log_debug_errno(r, "Failed to add file to deferred close set, closing immediately."); else { - (void) journal_file_set_offline(f->file, false); + (void) journald_file_set_offline(f, false); return NULL; } } diff --git a/src/journal/journald-file.h b/src/journal/journald-file.h index 7a299bd9756..341043c8362 100644 --- a/src/journal/journald-file.h +++ b/src/journal/journald-file.h @@ -21,6 +21,8 @@ int journald_file_open( JournaldFile *template, JournaldFile **ret); +int journald_file_set_offline(JournaldFile *f, bool wait); +bool journald_file_is_offlining(JournaldFile *f); JournaldFile* journald_file_close(JournaldFile *f); DEFINE_TRIVIAL_CLEANUP_FUNC(JournaldFile*, journald_file_close); diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c index 3d40c3822e0..5e97cd41fd9 100644 --- a/src/journal/journald-server.c +++ b/src/journal/journald-server.c @@ -482,7 +482,7 @@ static void server_process_deferred_closes(Server *s) { /* Perform any deferred closes which aren't still offlining. */ SET_FOREACH(f, s->deferred_closes) { - if (journal_file_is_offlining(f->file)) + if (journald_file_is_offlining(f)) continue; (void) set_remove(s->deferred_closes, f); @@ -647,13 +647,13 @@ void server_sync(Server *s) { int r; if (s->system_journal) { - r = journal_file_set_offline(s->system_journal->file, false); + r = journald_file_set_offline(s->system_journal, false); if (r < 0) log_warning_errno(r, "Failed to sync system journal, ignoring: %m"); } ORDERED_HASHMAP_FOREACH(f, s->user_journals) { - r = journal_file_set_offline(f->file, false); + r = journald_file_set_offline(f, false); if (r < 0) log_warning_errno(r, "Failed to sync user journal, ignoring: %m"); } diff --git a/src/journal/meson.build b/src/journal/meson.build index 3eeed772cff..383ed355fd5 100644 --- a/src/journal/meson.build +++ b/src/journal/meson.build @@ -35,6 +35,7 @@ libjournal_core = static_library( 'journal-core', sources, include_directories : includes, + dependencies: threads, install : false) journal_includes = [includes, include_directories('.')] diff --git a/src/libsystemd/sd-journal/journal-file.c b/src/libsystemd/sd-journal/journal-file.c index b2ec6cde542..9755bf14b94 100644 --- a/src/libsystemd/sd-journal/journal-file.c +++ b/src/libsystemd/sd-journal/journal-file.c @@ -91,7 +91,7 @@ # pragma GCC diagnostic ignored "-Waddress-of-packed-member" #endif -static int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) { +int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) { Object tail; uint64_t p; int r; @@ -126,174 +126,7 @@ static int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) { return 0; } -static int journal_file_truncate(JournalFile *f) { - uint64_t p; - int r; - - /* truncate excess from the end of archives */ - r = journal_file_tail_end(f, &p); - if (r < 0) - return log_debug_errno(r, "Failed to determine end of tail object: %m"); - - /* arena_size can't exceed the file size, ensure it's updated before truncating */ - f->header->arena_size = htole64(p - le64toh(f->header->header_size)); - - if (ftruncate(f->fd, p) < 0) - log_debug_errno(errno, "Failed to truncate %s: %m", f->path); - - return 0; -} - -static int journal_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) { - Object o; - uint64_t offset, sz, n_items = 0, n_unused; - int r; - - if (n_entries == 0) - return 0; - - for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) { - r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o); - if (r < 0) - return r; - - n_items += journal_file_entry_array_n_items(&o); - p = q; - } - - if (p == 0) - return 0; - - if (n_entries > n_items) - return -EBADMSG; - - /* Amount of unused items in the final entry array. */ - n_unused = n_items - n_entries; - - if (n_unused == 0) - return 0; - - offset = p + offsetof(Object, entry_array.items) + - (journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t); - sz = p + le64toh(o.object.size) - offset; - - if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0) - return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path); - - return 0; -} - -static int journal_file_punch_holes(JournalFile *f) { - HashItem items[4096 / sizeof(HashItem)]; - uint64_t p, sz; - size_t to_read; - int r; - - r = journal_file_entry_array_punch_hole( - f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries)); - if (r < 0) - return r; - - p = le64toh(f->header->data_hash_table_offset); - sz = le64toh(f->header->data_hash_table_size); - to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items)); - - for (uint64_t i = p; i < p + sz; i += sizeof(items)) { - ssize_t n_read; - - n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i); - if (n_read < 0) - return n_read; - - for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) { - Object o; - - for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0; - q = le64toh(o.data.next_hash_offset)) { - - r = journal_file_read_object(f, OBJECT_DATA, q, &o); - if (r < 0) { - log_debug_errno(r, "Invalid data object: %m, ignoring"); - break; - } - - if (le64toh(o.data.n_entries) == 0) - continue; - - (void) journal_file_entry_array_punch_hole( - f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1); - } - } - } - - return 0; -} - -/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync(). - * As a result we use atomic operations on f->offline_state for inter-thread communications with - * journal_file_set_offline() and journal_file_set_online(). */ -static void journal_file_set_offline_internal(JournalFile *f) { - assert(f); - assert(f->fd >= 0); - assert(f->header); - - for (;;) { - switch (f->offline_state) { - case OFFLINE_CANCEL: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE)) - continue; - return; - - case OFFLINE_AGAIN_FROM_SYNCING: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING)) - continue; - break; - - case OFFLINE_AGAIN_FROM_OFFLINING: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING)) - continue; - break; - - case OFFLINE_SYNCING: - if (f->archive) { - (void) journal_file_truncate(f); - (void) journal_file_punch_holes(f); - } - - (void) fsync(f->fd); - - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING)) - continue; - - f->header->state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE; - (void) fsync(f->fd); - break; - - case OFFLINE_OFFLINING: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE)) - continue; - _fallthrough_; - case OFFLINE_DONE: - return; - - case OFFLINE_JOINED: - log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()"); - return; - } - } -} - -static void * journal_file_set_offline_thread(void *arg) { - JournalFile *f = arg; - - (void) pthread_setname_np(pthread_self(), "journal-offline"); - - journal_file_set_offline_internal(f); - - return NULL; -} - -static int journal_file_set_offline_thread_join(JournalFile *f) { +int journal_file_set_offline_thread_join(JournalFile *f) { int r; assert(f); @@ -313,110 +146,6 @@ static int journal_file_set_offline_thread_join(JournalFile *f) { return 0; } -/* Trigger a restart if the offline thread is mid-flight in a restartable state. */ -static bool journal_file_set_offline_try_restart(JournalFile *f) { - for (;;) { - switch (f->offline_state) { - case OFFLINE_AGAIN_FROM_SYNCING: - case OFFLINE_AGAIN_FROM_OFFLINING: - return true; - - case OFFLINE_CANCEL: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING)) - continue; - return true; - - case OFFLINE_SYNCING: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING)) - continue; - return true; - - case OFFLINE_OFFLINING: - if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING)) - continue; - return true; - - default: - return false; - } - } -} - -/* Sets a journal offline. - * - * If wait is false then an offline is dispatched in a separate thread for a - * subsequent journal_file_set_offline() or journal_file_set_online() of the - * same journal to synchronize with. - * - * If wait is true, then either an existing offline thread will be restarted - * and joined, or if none exists the offline is simply performed in this - * context without involving another thread. - */ -int journal_file_set_offline(JournalFile *f, bool wait) { - int target_state; - bool restarted; - int r; - - assert(f); - - if (!f->writable) - return -EPERM; - - if (f->fd < 0 || !f->header) - return -EINVAL; - - target_state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE; - - /* An offlining journal is implicitly online and may modify f->header->state, - * we must also join any potentially lingering offline thread when already in - * the desired offline state. - */ - if (!journal_file_is_offlining(f) && f->header->state == target_state) - return journal_file_set_offline_thread_join(f); - - /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */ - restarted = journal_file_set_offline_try_restart(f); - if ((restarted && wait) || !restarted) { - r = journal_file_set_offline_thread_join(f); - if (r < 0) - return r; - } - - if (restarted) - return 0; - - /* Initiate a new offline. */ - f->offline_state = OFFLINE_SYNCING; - - if (wait) /* Without using a thread if waiting. */ - journal_file_set_offline_internal(f); - else { - sigset_t ss, saved_ss; - int k; - - assert_se(sigfillset(&ss) >= 0); - /* Don't block SIGBUS since the offlining thread accesses a memory mapped file. - * Asynchronous SIGBUS signals can safely be handled by either thread. */ - assert_se(sigdelset(&ss, SIGBUS) >= 0); - - r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss); - if (r > 0) - return -r; - - r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f); - - k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL); - if (r > 0) { - f->offline_state = OFFLINE_JOINED; - return -r; - } - if (k > 0) - return -k; - } - - return 0; -} - static int journal_file_set_online(JournalFile *f) { bool wait = true; @@ -484,41 +213,10 @@ static int journal_file_set_online(JournalFile *f) { } } -bool journal_file_is_offlining(JournalFile *f) { - assert(f); - - __sync_synchronize(); - - if (IN_SET(f->offline_state, OFFLINE_DONE, OFFLINE_JOINED)) - return false; - - return true; -} - JournalFile* journal_file_close(JournalFile *f) { if (!f) return NULL; -#if HAVE_GCRYPT - /* Write the final tag */ - if (f->seal && f->writable) { - int r; - - r = journal_file_append_tag(f); - if (r < 0) - log_error_errno(r, "Failed to append tag when closing journal: %m"); - } -#endif - - if (f->post_change_timer) { - if (sd_event_source_get_enabled(f->post_change_timer, NULL) > 0) - journal_file_post_change(f); - - sd_event_source_disable_unref(f->post_change_timer); - } - - journal_file_set_offline(f, true); - if (f->mmap && f->cache_fd) mmap_cache_fd_free(f->cache_fd); diff --git a/src/libsystemd/sd-journal/journal-file.h b/src/libsystemd/sd-journal/journal-file.h index 0fbd84e31cb..6916cc4ac3a 100644 --- a/src/libsystemd/sd-journal/journal-file.h +++ b/src/libsystemd/sd-journal/journal-file.h @@ -141,8 +141,7 @@ int journal_file_open( JournalFile *template, JournalFile **ret); -int journal_file_set_offline(JournalFile *f, bool wait); -bool journal_file_is_offlining(JournalFile *f); +int journal_file_set_offline_thread_join(JournalFile *f); JournalFile* journal_file_close(JournalFile *j); int journal_file_fstat(JournalFile *f); DEFINE_TRIVIAL_CLEANUP_FUNC(JournalFile*, journal_file_close); @@ -190,6 +189,8 @@ static inline bool VALID_EPOCH(uint64_t u) { int journal_file_move_to_object(JournalFile *f, ObjectType type, uint64_t offset, Object **ret); int journal_file_read_object(JournalFile *f, ObjectType type, uint64_t offset, Object *ret); +int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset); + uint64_t journal_file_entry_n_items(Object *o) _pure_; uint64_t journal_file_entry_array_n_items(Object *o) _pure_; uint64_t journal_file_hash_table_n_items(Object *o) _pure_;