mirror of
https://github.com/systemd/systemd.git
synced 2025-03-19 22:50:17 +03:00
journal: Move offlining logic to journald-file.c
With this change, the logic to write the final tag, emit the final change notification and to offline the file moves from journal_file_close() to journald_file_close(). Since all this logic is only executed when the journal file is writable and all code that writes journal files already uses journald_file_close() instead of journal_file_close(), this change should not introduce any changes in behaviour. Moving the offline related logic to journald-file.c allows us to use code from src/shared in the offlining logic, more specifically, we can use the file copying logic from copy.h to fix BTRFS filesystem compression for journal files when archiving.
This commit is contained in:
parent
035b0f8fe8
commit
764721cc07
@ -1,5 +1,8 @@
|
||||
/* SPDX-License-Identifier: LGPL-2.1-or-later */
|
||||
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "chattr-util.h"
|
||||
#include "fd-util.h"
|
||||
#include "format-util.h"
|
||||
@ -10,10 +13,312 @@
|
||||
#include "set.h"
|
||||
#include "sync-util.h"
|
||||
|
||||
static int journald_file_truncate(JournalFile *f) {
|
||||
uint64_t p;
|
||||
int r;
|
||||
|
||||
/* truncate excess from the end of archives */
|
||||
r = journal_file_tail_end(f, &p);
|
||||
if (r < 0)
|
||||
return log_debug_errno(r, "Failed to determine end of tail object: %m");
|
||||
|
||||
/* arena_size can't exceed the file size, ensure it's updated before truncating */
|
||||
f->header->arena_size = htole64(p - le64toh(f->header->header_size));
|
||||
|
||||
if (ftruncate(f->fd, p) < 0)
|
||||
log_debug_errno(errno, "Failed to truncate %s: %m", f->path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journald_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) {
|
||||
Object o;
|
||||
uint64_t offset, sz, n_items = 0, n_unused;
|
||||
int r;
|
||||
|
||||
if (n_entries == 0)
|
||||
return 0;
|
||||
|
||||
for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) {
|
||||
r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
n_items += journal_file_entry_array_n_items(&o);
|
||||
p = q;
|
||||
}
|
||||
|
||||
if (p == 0)
|
||||
return 0;
|
||||
|
||||
if (n_entries > n_items)
|
||||
return -EBADMSG;
|
||||
|
||||
/* Amount of unused items in the final entry array. */
|
||||
n_unused = n_items - n_entries;
|
||||
|
||||
if (n_unused == 0)
|
||||
return 0;
|
||||
|
||||
offset = p + offsetof(Object, entry_array.items) +
|
||||
(journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t);
|
||||
sz = p + le64toh(o.object.size) - offset;
|
||||
|
||||
if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0)
|
||||
return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journald_file_punch_holes(JournalFile *f) {
|
||||
HashItem items[4096 / sizeof(HashItem)];
|
||||
uint64_t p, sz;
|
||||
size_t to_read;
|
||||
int r;
|
||||
|
||||
r = journald_file_entry_array_punch_hole(
|
||||
f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
p = le64toh(f->header->data_hash_table_offset);
|
||||
sz = le64toh(f->header->data_hash_table_size);
|
||||
to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items));
|
||||
|
||||
for (uint64_t i = p; i < p + sz; i += sizeof(items)) {
|
||||
ssize_t n_read;
|
||||
|
||||
n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i);
|
||||
if (n_read < 0)
|
||||
return n_read;
|
||||
|
||||
for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) {
|
||||
Object o;
|
||||
|
||||
for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0;
|
||||
q = le64toh(o.data.next_hash_offset)) {
|
||||
|
||||
r = journal_file_read_object(f, OBJECT_DATA, q, &o);
|
||||
if (r < 0) {
|
||||
log_debug_errno(r, "Invalid data object: %m, ignoring");
|
||||
break;
|
||||
}
|
||||
|
||||
if (le64toh(o.data.n_entries) == 0)
|
||||
continue;
|
||||
|
||||
(void) journald_file_entry_array_punch_hole(
|
||||
f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
|
||||
* As a result we use atomic operations on f->offline_state for inter-thread communications with
|
||||
* journal_file_set_offline() and journal_file_set_online(). */
|
||||
static void journald_file_set_offline_internal(JournaldFile *f) {
|
||||
assert(f);
|
||||
assert(f->file->fd >= 0);
|
||||
assert(f->file->header);
|
||||
|
||||
for (;;) {
|
||||
switch (f->file->offline_state) {
|
||||
case OFFLINE_CANCEL:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
|
||||
continue;
|
||||
return;
|
||||
|
||||
case OFFLINE_AGAIN_FROM_SYNCING:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
|
||||
continue;
|
||||
break;
|
||||
|
||||
case OFFLINE_AGAIN_FROM_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
|
||||
continue;
|
||||
break;
|
||||
|
||||
case OFFLINE_SYNCING:
|
||||
if (f->file->archive) {
|
||||
(void) journald_file_truncate(f->file);
|
||||
(void) journald_file_punch_holes(f->file);
|
||||
}
|
||||
|
||||
(void) fsync(f->file->fd);
|
||||
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
|
||||
continue;
|
||||
|
||||
f->file->header->state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE;
|
||||
(void) fsync(f->file->fd);
|
||||
break;
|
||||
|
||||
case OFFLINE_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
|
||||
continue;
|
||||
_fallthrough_;
|
||||
case OFFLINE_DONE:
|
||||
return;
|
||||
|
||||
case OFFLINE_JOINED:
|
||||
log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void * journald_file_set_offline_thread(void *arg) {
|
||||
JournaldFile *f = arg;
|
||||
|
||||
(void) pthread_setname_np(pthread_self(), "journal-offline");
|
||||
|
||||
journald_file_set_offline_internal(f);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
|
||||
static bool journald_file_set_offline_try_restart(JournaldFile *f) {
|
||||
for (;;) {
|
||||
switch (f->file->offline_state) {
|
||||
case OFFLINE_AGAIN_FROM_SYNCING:
|
||||
case OFFLINE_AGAIN_FROM_OFFLINING:
|
||||
return true;
|
||||
|
||||
case OFFLINE_CANCEL:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
case OFFLINE_SYNCING:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
case OFFLINE_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Sets a journal offline.
|
||||
*
|
||||
* If wait is false then an offline is dispatched in a separate thread for a
|
||||
* subsequent journal_file_set_offline() or journal_file_set_online() of the
|
||||
* same journal to synchronize with.
|
||||
*
|
||||
* If wait is true, then either an existing offline thread will be restarted
|
||||
* and joined, or if none exists the offline is simply performed in this
|
||||
* context without involving another thread.
|
||||
*/
|
||||
int journald_file_set_offline(JournaldFile *f, bool wait) {
|
||||
int target_state;
|
||||
bool restarted;
|
||||
int r;
|
||||
|
||||
assert(f);
|
||||
|
||||
if (!f->file->writable)
|
||||
return -EPERM;
|
||||
|
||||
if (f->file->fd < 0 || !f->file->header)
|
||||
return -EINVAL;
|
||||
|
||||
target_state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE;
|
||||
|
||||
/* An offlining journal is implicitly online and may modify f->header->state,
|
||||
* we must also join any potentially lingering offline thread when already in
|
||||
* the desired offline state.
|
||||
*/
|
||||
if (!journald_file_is_offlining(f) && f->file->header->state == target_state)
|
||||
return journal_file_set_offline_thread_join(f->file);
|
||||
|
||||
/* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
|
||||
restarted = journald_file_set_offline_try_restart(f);
|
||||
if ((restarted && wait) || !restarted) {
|
||||
r = journal_file_set_offline_thread_join(f->file);
|
||||
if (r < 0)
|
||||
return r;
|
||||
}
|
||||
|
||||
if (restarted)
|
||||
return 0;
|
||||
|
||||
/* Initiate a new offline. */
|
||||
f->file->offline_state = OFFLINE_SYNCING;
|
||||
|
||||
if (wait) /* Without using a thread if waiting. */
|
||||
journald_file_set_offline_internal(f);
|
||||
else {
|
||||
sigset_t ss, saved_ss;
|
||||
int k;
|
||||
|
||||
assert_se(sigfillset(&ss) >= 0);
|
||||
/* Don't block SIGBUS since the offlining thread accesses a memory mapped file.
|
||||
* Asynchronous SIGBUS signals can safely be handled by either thread. */
|
||||
assert_se(sigdelset(&ss, SIGBUS) >= 0);
|
||||
|
||||
r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss);
|
||||
if (r > 0)
|
||||
return -r;
|
||||
|
||||
r = pthread_create(&f->file->offline_thread, NULL, journald_file_set_offline_thread, f);
|
||||
|
||||
k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL);
|
||||
if (r > 0) {
|
||||
f->file->offline_state = OFFLINE_JOINED;
|
||||
return -r;
|
||||
}
|
||||
if (k > 0)
|
||||
return -k;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool journald_file_is_offlining(JournaldFile *f) {
|
||||
assert(f);
|
||||
|
||||
__sync_synchronize();
|
||||
|
||||
if (IN_SET(f->file->offline_state, OFFLINE_DONE, OFFLINE_JOINED))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
JournaldFile* journald_file_close(JournaldFile *f) {
|
||||
if (!f)
|
||||
return NULL;
|
||||
|
||||
#if HAVE_GCRYPT
|
||||
/* Write the final tag */
|
||||
if (f->file->seal && f->file->writable) {
|
||||
int r;
|
||||
|
||||
r = journal_file_append_tag(f->file);
|
||||
if (r < 0)
|
||||
log_error_errno(r, "Failed to append tag when closing journal: %m");
|
||||
}
|
||||
#endif
|
||||
|
||||
if (f->file->post_change_timer) {
|
||||
if (sd_event_source_get_enabled(f->file->post_change_timer, NULL) > 0)
|
||||
journal_file_post_change(f->file);
|
||||
|
||||
sd_event_source_disable_unref(f->file->post_change_timer);
|
||||
}
|
||||
|
||||
journald_file_set_offline(f, true);
|
||||
|
||||
journal_file_close(f->file);
|
||||
|
||||
return mfree(f);
|
||||
@ -62,7 +367,7 @@ JournaldFile* journald_file_initiate_close(JournaldFile *f, Set *deferred_closes
|
||||
if (r < 0)
|
||||
log_debug_errno(r, "Failed to add file to deferred close set, closing immediately.");
|
||||
else {
|
||||
(void) journal_file_set_offline(f->file, false);
|
||||
(void) journald_file_set_offline(f, false);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,8 @@ int journald_file_open(
|
||||
JournaldFile *template,
|
||||
JournaldFile **ret);
|
||||
|
||||
int journald_file_set_offline(JournaldFile *f, bool wait);
|
||||
bool journald_file_is_offlining(JournaldFile *f);
|
||||
JournaldFile* journald_file_close(JournaldFile *f);
|
||||
DEFINE_TRIVIAL_CLEANUP_FUNC(JournaldFile*, journald_file_close);
|
||||
|
||||
|
@ -482,7 +482,7 @@ static void server_process_deferred_closes(Server *s) {
|
||||
|
||||
/* Perform any deferred closes which aren't still offlining. */
|
||||
SET_FOREACH(f, s->deferred_closes) {
|
||||
if (journal_file_is_offlining(f->file))
|
||||
if (journald_file_is_offlining(f))
|
||||
continue;
|
||||
|
||||
(void) set_remove(s->deferred_closes, f);
|
||||
@ -647,13 +647,13 @@ void server_sync(Server *s) {
|
||||
int r;
|
||||
|
||||
if (s->system_journal) {
|
||||
r = journal_file_set_offline(s->system_journal->file, false);
|
||||
r = journald_file_set_offline(s->system_journal, false);
|
||||
if (r < 0)
|
||||
log_warning_errno(r, "Failed to sync system journal, ignoring: %m");
|
||||
}
|
||||
|
||||
ORDERED_HASHMAP_FOREACH(f, s->user_journals) {
|
||||
r = journal_file_set_offline(f->file, false);
|
||||
r = journald_file_set_offline(f, false);
|
||||
if (r < 0)
|
||||
log_warning_errno(r, "Failed to sync user journal, ignoring: %m");
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ libjournal_core = static_library(
|
||||
'journal-core',
|
||||
sources,
|
||||
include_directories : includes,
|
||||
dependencies: threads,
|
||||
install : false)
|
||||
|
||||
journal_includes = [includes, include_directories('.')]
|
||||
|
@ -91,7 +91,7 @@
|
||||
# pragma GCC diagnostic ignored "-Waddress-of-packed-member"
|
||||
#endif
|
||||
|
||||
static int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) {
|
||||
int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) {
|
||||
Object tail;
|
||||
uint64_t p;
|
||||
int r;
|
||||
@ -126,174 +126,7 @@ static int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journal_file_truncate(JournalFile *f) {
|
||||
uint64_t p;
|
||||
int r;
|
||||
|
||||
/* truncate excess from the end of archives */
|
||||
r = journal_file_tail_end(f, &p);
|
||||
if (r < 0)
|
||||
return log_debug_errno(r, "Failed to determine end of tail object: %m");
|
||||
|
||||
/* arena_size can't exceed the file size, ensure it's updated before truncating */
|
||||
f->header->arena_size = htole64(p - le64toh(f->header->header_size));
|
||||
|
||||
if (ftruncate(f->fd, p) < 0)
|
||||
log_debug_errno(errno, "Failed to truncate %s: %m", f->path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journal_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) {
|
||||
Object o;
|
||||
uint64_t offset, sz, n_items = 0, n_unused;
|
||||
int r;
|
||||
|
||||
if (n_entries == 0)
|
||||
return 0;
|
||||
|
||||
for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) {
|
||||
r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
n_items += journal_file_entry_array_n_items(&o);
|
||||
p = q;
|
||||
}
|
||||
|
||||
if (p == 0)
|
||||
return 0;
|
||||
|
||||
if (n_entries > n_items)
|
||||
return -EBADMSG;
|
||||
|
||||
/* Amount of unused items in the final entry array. */
|
||||
n_unused = n_items - n_entries;
|
||||
|
||||
if (n_unused == 0)
|
||||
return 0;
|
||||
|
||||
offset = p + offsetof(Object, entry_array.items) +
|
||||
(journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t);
|
||||
sz = p + le64toh(o.object.size) - offset;
|
||||
|
||||
if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0)
|
||||
return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journal_file_punch_holes(JournalFile *f) {
|
||||
HashItem items[4096 / sizeof(HashItem)];
|
||||
uint64_t p, sz;
|
||||
size_t to_read;
|
||||
int r;
|
||||
|
||||
r = journal_file_entry_array_punch_hole(
|
||||
f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
p = le64toh(f->header->data_hash_table_offset);
|
||||
sz = le64toh(f->header->data_hash_table_size);
|
||||
to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items));
|
||||
|
||||
for (uint64_t i = p; i < p + sz; i += sizeof(items)) {
|
||||
ssize_t n_read;
|
||||
|
||||
n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i);
|
||||
if (n_read < 0)
|
||||
return n_read;
|
||||
|
||||
for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) {
|
||||
Object o;
|
||||
|
||||
for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0;
|
||||
q = le64toh(o.data.next_hash_offset)) {
|
||||
|
||||
r = journal_file_read_object(f, OBJECT_DATA, q, &o);
|
||||
if (r < 0) {
|
||||
log_debug_errno(r, "Invalid data object: %m, ignoring");
|
||||
break;
|
||||
}
|
||||
|
||||
if (le64toh(o.data.n_entries) == 0)
|
||||
continue;
|
||||
|
||||
(void) journal_file_entry_array_punch_hole(
|
||||
f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
|
||||
* As a result we use atomic operations on f->offline_state for inter-thread communications with
|
||||
* journal_file_set_offline() and journal_file_set_online(). */
|
||||
static void journal_file_set_offline_internal(JournalFile *f) {
|
||||
assert(f);
|
||||
assert(f->fd >= 0);
|
||||
assert(f->header);
|
||||
|
||||
for (;;) {
|
||||
switch (f->offline_state) {
|
||||
case OFFLINE_CANCEL:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
|
||||
continue;
|
||||
return;
|
||||
|
||||
case OFFLINE_AGAIN_FROM_SYNCING:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
|
||||
continue;
|
||||
break;
|
||||
|
||||
case OFFLINE_AGAIN_FROM_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
|
||||
continue;
|
||||
break;
|
||||
|
||||
case OFFLINE_SYNCING:
|
||||
if (f->archive) {
|
||||
(void) journal_file_truncate(f);
|
||||
(void) journal_file_punch_holes(f);
|
||||
}
|
||||
|
||||
(void) fsync(f->fd);
|
||||
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
|
||||
continue;
|
||||
|
||||
f->header->state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
|
||||
(void) fsync(f->fd);
|
||||
break;
|
||||
|
||||
case OFFLINE_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
|
||||
continue;
|
||||
_fallthrough_;
|
||||
case OFFLINE_DONE:
|
||||
return;
|
||||
|
||||
case OFFLINE_JOINED:
|
||||
log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void * journal_file_set_offline_thread(void *arg) {
|
||||
JournalFile *f = arg;
|
||||
|
||||
(void) pthread_setname_np(pthread_self(), "journal-offline");
|
||||
|
||||
journal_file_set_offline_internal(f);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int journal_file_set_offline_thread_join(JournalFile *f) {
|
||||
int journal_file_set_offline_thread_join(JournalFile *f) {
|
||||
int r;
|
||||
|
||||
assert(f);
|
||||
@ -313,110 +146,6 @@ static int journal_file_set_offline_thread_join(JournalFile *f) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
|
||||
static bool journal_file_set_offline_try_restart(JournalFile *f) {
|
||||
for (;;) {
|
||||
switch (f->offline_state) {
|
||||
case OFFLINE_AGAIN_FROM_SYNCING:
|
||||
case OFFLINE_AGAIN_FROM_OFFLINING:
|
||||
return true;
|
||||
|
||||
case OFFLINE_CANCEL:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
case OFFLINE_SYNCING:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
case OFFLINE_OFFLINING:
|
||||
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
|
||||
continue;
|
||||
return true;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Sets a journal offline.
|
||||
*
|
||||
* If wait is false then an offline is dispatched in a separate thread for a
|
||||
* subsequent journal_file_set_offline() or journal_file_set_online() of the
|
||||
* same journal to synchronize with.
|
||||
*
|
||||
* If wait is true, then either an existing offline thread will be restarted
|
||||
* and joined, or if none exists the offline is simply performed in this
|
||||
* context without involving another thread.
|
||||
*/
|
||||
int journal_file_set_offline(JournalFile *f, bool wait) {
|
||||
int target_state;
|
||||
bool restarted;
|
||||
int r;
|
||||
|
||||
assert(f);
|
||||
|
||||
if (!f->writable)
|
||||
return -EPERM;
|
||||
|
||||
if (f->fd < 0 || !f->header)
|
||||
return -EINVAL;
|
||||
|
||||
target_state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
|
||||
|
||||
/* An offlining journal is implicitly online and may modify f->header->state,
|
||||
* we must also join any potentially lingering offline thread when already in
|
||||
* the desired offline state.
|
||||
*/
|
||||
if (!journal_file_is_offlining(f) && f->header->state == target_state)
|
||||
return journal_file_set_offline_thread_join(f);
|
||||
|
||||
/* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
|
||||
restarted = journal_file_set_offline_try_restart(f);
|
||||
if ((restarted && wait) || !restarted) {
|
||||
r = journal_file_set_offline_thread_join(f);
|
||||
if (r < 0)
|
||||
return r;
|
||||
}
|
||||
|
||||
if (restarted)
|
||||
return 0;
|
||||
|
||||
/* Initiate a new offline. */
|
||||
f->offline_state = OFFLINE_SYNCING;
|
||||
|
||||
if (wait) /* Without using a thread if waiting. */
|
||||
journal_file_set_offline_internal(f);
|
||||
else {
|
||||
sigset_t ss, saved_ss;
|
||||
int k;
|
||||
|
||||
assert_se(sigfillset(&ss) >= 0);
|
||||
/* Don't block SIGBUS since the offlining thread accesses a memory mapped file.
|
||||
* Asynchronous SIGBUS signals can safely be handled by either thread. */
|
||||
assert_se(sigdelset(&ss, SIGBUS) >= 0);
|
||||
|
||||
r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss);
|
||||
if (r > 0)
|
||||
return -r;
|
||||
|
||||
r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
|
||||
|
||||
k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL);
|
||||
if (r > 0) {
|
||||
f->offline_state = OFFLINE_JOINED;
|
||||
return -r;
|
||||
}
|
||||
if (k > 0)
|
||||
return -k;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int journal_file_set_online(JournalFile *f) {
|
||||
bool wait = true;
|
||||
|
||||
@ -484,41 +213,10 @@ static int journal_file_set_online(JournalFile *f) {
|
||||
}
|
||||
}
|
||||
|
||||
bool journal_file_is_offlining(JournalFile *f) {
|
||||
assert(f);
|
||||
|
||||
__sync_synchronize();
|
||||
|
||||
if (IN_SET(f->offline_state, OFFLINE_DONE, OFFLINE_JOINED))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
JournalFile* journal_file_close(JournalFile *f) {
|
||||
if (!f)
|
||||
return NULL;
|
||||
|
||||
#if HAVE_GCRYPT
|
||||
/* Write the final tag */
|
||||
if (f->seal && f->writable) {
|
||||
int r;
|
||||
|
||||
r = journal_file_append_tag(f);
|
||||
if (r < 0)
|
||||
log_error_errno(r, "Failed to append tag when closing journal: %m");
|
||||
}
|
||||
#endif
|
||||
|
||||
if (f->post_change_timer) {
|
||||
if (sd_event_source_get_enabled(f->post_change_timer, NULL) > 0)
|
||||
journal_file_post_change(f);
|
||||
|
||||
sd_event_source_disable_unref(f->post_change_timer);
|
||||
}
|
||||
|
||||
journal_file_set_offline(f, true);
|
||||
|
||||
if (f->mmap && f->cache_fd)
|
||||
mmap_cache_fd_free(f->cache_fd);
|
||||
|
||||
|
@ -141,8 +141,7 @@ int journal_file_open(
|
||||
JournalFile *template,
|
||||
JournalFile **ret);
|
||||
|
||||
int journal_file_set_offline(JournalFile *f, bool wait);
|
||||
bool journal_file_is_offlining(JournalFile *f);
|
||||
int journal_file_set_offline_thread_join(JournalFile *f);
|
||||
JournalFile* journal_file_close(JournalFile *j);
|
||||
int journal_file_fstat(JournalFile *f);
|
||||
DEFINE_TRIVIAL_CLEANUP_FUNC(JournalFile*, journal_file_close);
|
||||
@ -190,6 +189,8 @@ static inline bool VALID_EPOCH(uint64_t u) {
|
||||
int journal_file_move_to_object(JournalFile *f, ObjectType type, uint64_t offset, Object **ret);
|
||||
int journal_file_read_object(JournalFile *f, ObjectType type, uint64_t offset, Object *ret);
|
||||
|
||||
int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset);
|
||||
|
||||
uint64_t journal_file_entry_n_items(Object *o) _pure_;
|
||||
uint64_t journal_file_entry_array_n_items(Object *o) _pure_;
|
||||
uint64_t journal_file_hash_table_n_items(Object *o) _pure_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user