1
0
mirror of https://github.com/systemd/systemd.git synced 2025-01-21 22:04:01 +03:00

sd-journal: drop using pthread for offlining or archiving journal files

This makes systemd-journald (also journal-remote and friends)
- synchronously offline journal files. This mostly not change anything,
  as we effectively did so, as we called journal_file_offline_close()
  in most places.
- asynchronously archive journal files, but in a forked process rather
  than a thread, and do only the last processes like puch holing,
  calling fsync(), chattr() and so on.

Hopefully, no visible behavior change. But the code becomes much
simpler.
This commit is contained in:
Yu Watanabe 2024-02-13 22:08:48 +09:00
parent 6e27477666
commit 2791179cfb
10 changed files with 152 additions and 459 deletions

View File

@ -11,7 +11,7 @@
static int do_rotate(JournalFile **f, MMapCache *m, JournalFileFlags file_flags) {
int r;
r = journal_file_rotate(f, m, file_flags, UINT64_MAX, NULL);
r = journal_file_rotate(f, /* async = */ true, m, file_flags, UINT64_MAX);
if (r < 0) {
if (*f)
log_error_errno(r, "Failed to rotate %s: %m", (*f)->path);

View File

@ -86,8 +86,6 @@
* for a bit of additional metadata. */
#define DEFAULT_LINE_MAX (48*1024)
#define DEFERRED_CLOSES_MAX (4096)
#define IDLE_TIMEOUT_USEC (30*USEC_PER_SEC)
#define FAILED_TO_WRITE_ENTRY_RATELIMIT ((const RateLimit) { .interval = 1 * USEC_PER_SEC, .burst = 1 })
@ -286,8 +284,6 @@ static int server_open_journal(
(seal ? JOURNAL_SEAL : 0) |
JOURNAL_STRICT_ORDER;
set_clear_with_destructor(s->deferred_closes, journal_file_offline_close);
if (reliably)
r = journal_file_open_reliably(
fname,
@ -563,7 +559,7 @@ static int server_do_rotate(
(seal ? JOURNAL_SEAL : 0) |
JOURNAL_STRICT_ORDER;
r = journal_file_rotate(f, s->mmap, file_flags, s->compress.threshold_bytes, s->deferred_closes);
r = journal_file_rotate(f, /* async = */ true, s->mmap, file_flags, s->compress.threshold_bytes);
if (r < 0) {
if (*f)
return log_ratelimit_error_errno(r, JOURNAL_LOG_RATELIMIT,
@ -577,38 +573,6 @@ static int server_do_rotate(
return r;
}
static void server_process_deferred_closes(Server *s) {
JournalFile *f;
/* Perform any deferred closes which aren't still offlining. */
SET_FOREACH(f, s->deferred_closes) {
if (journal_file_is_offlining(f))
continue;
(void) set_remove(s->deferred_closes, f);
(void) journal_file_offline_close(f);
}
}
static void server_vacuum_deferred_closes(Server *s) {
assert(s);
/* Make some room in the deferred closes list, so that it doesn't grow without bounds */
if (set_size(s->deferred_closes) < DEFERRED_CLOSES_MAX)
return;
/* Let's first remove all journal files that might already have completed closing */
server_process_deferred_closes(s);
/* And now, let's close some more until we reach the limit again. */
while (set_size(s->deferred_closes) >= DEFERRED_CLOSES_MAX) {
JournalFile *f;
assert_se(f = set_steal_first(s->deferred_closes));
journal_file_offline_close(f);
}
}
static int server_archive_offline_user_journals(Server *s) {
_cleanup_closedir_ DIR *d = NULL;
int r;
@ -625,10 +589,10 @@ static int server_archive_offline_user_journals(Server *s) {
}
for (;;) {
_cleanup_(journal_file_offline_closep) JournalFile *f = NULL;
_cleanup_free_ char *full = NULL;
_cleanup_close_ int fd = -EBADF;
struct dirent *de;
JournalFile *f;
uid_t uid;
errno = 0;
@ -665,9 +629,6 @@ static int server_archive_offline_user_journals(Server *s) {
continue;
}
/* Make some room in the set of deferred close()s */
server_vacuum_deferred_closes(s);
/* Open the file briefly, so that we can archive it */
r = journal_file_open(
fd,
@ -699,12 +660,9 @@ static int server_archive_offline_user_journals(Server *s) {
TAKE_FD(fd); /* Donated to journal_file_open() */
journal_file_write_final_tag(f);
r = journal_file_archive(f, NULL);
r = journal_file_archive(&f, /* async = */ true);
if (r < 0)
log_debug_errno(r, "Failed to archive journal file '%s', ignoring: %m", full);
journal_file_initiate_close(TAKE_PTR(f), s->deferred_closes);
}
return 0;
@ -735,8 +693,6 @@ void server_rotate(Server *s) {
* actually have access to /var, i.e. are not in the log-to-runtime-journal mode). */
if (!s->runtime_journal)
(void) server_archive_offline_user_journals(s);
server_process_deferred_closes(s);
}
static void server_rotate_journal(Server *s, JournalFile *f, uid_t uid) {
@ -762,27 +718,17 @@ static void server_rotate_journal(Server *s, JournalFile *f, uid_t uid) {
/* Old file has been closed and deallocated */
ordered_hashmap_remove(s->user_journals, UID_TO_PTR(uid));
}
server_process_deferred_closes(s);
}
static void server_sync(Server *s, bool wait) {
JournalFile *f;
int r;
if (s->system_journal) {
r = journal_file_set_offline(s->system_journal, wait);
if (r < 0)
log_ratelimit_warning_errno(r, JOURNAL_LOG_RATELIMIT,
"Failed to sync system journal, ignoring: %m");
}
if (s->system_journal)
journal_file_finalize(s->system_journal, STATE_OFFLINE);
ORDERED_HASHMAP_FOREACH(f, s->user_journals) {
r = journal_file_set_offline(f, wait);
if (r < 0)
log_ratelimit_warning_errno(r, JOURNAL_LOG_RATELIMIT,
"Failed to sync user journal, ignoring: %m");
}
ORDERED_HASHMAP_FOREACH(f, s->user_journals)
journal_file_finalize(f, STATE_OFFLINE);
r = sd_event_source_set_enabled(s->sync_event_source, SD_EVENT_OFF);
if (r < 0)
@ -1481,7 +1427,6 @@ static int server_relinquish_var(Server *s) {
s->system_journal = journal_file_offline_close(s->system_journal);
ordered_hashmap_clear_with_destructor(s->user_journals, journal_file_offline_close);
set_clear_with_destructor(s->deferred_closes, journal_file_offline_close);
server_refresh_idle_timer(s);
return 0;
@ -2691,10 +2636,6 @@ int server_init(Server *s, const char *namespace) {
if (!s->mmap)
return log_oom();
s->deferred_closes = set_new(NULL);
if (!s->deferred_closes)
return log_oom();
r = sd_event_default(&s->event);
if (r < 0)
return log_error_errno(r, "Failed to create event loop: %m");
@ -2888,8 +2829,6 @@ Server* server_free(Server *s) {
free(s->namespace);
free(s->namespace_field);
set_free_with_destructor(s->deferred_closes, journal_file_offline_close);
while (s->stdout_streams)
stdout_stream_free(s->stdout_streams);

View File

@ -151,8 +151,6 @@ struct Server {
MMapCache *mmap;
Set *deferred_closes;
uint64_t *kernel_seqnum;
bool dev_kmsg_readable:1;
RateLimit kmsg_own_ratelimit;

View File

@ -4,7 +4,6 @@
#include <fcntl.h>
#include <linux/fs.h>
#include <linux/magic.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/mman.h>
#include <sys/statvfs.h>
@ -180,95 +179,23 @@ int journal_file_tail_end_by_mmap(JournalFile *f, uint64_t *ret_offset) {
return 0;
}
int journal_file_set_offline_thread_join(JournalFile *f) {
int r;
assert(f);
if (f->offline_state == OFFLINE_JOINED)
return 0;
r = pthread_join(f->offline_thread, NULL);
if (r)
return -r;
f->offline_state = OFFLINE_JOINED;
if (mmap_cache_fd_got_sigbus(f->cache_fd))
return -EIO;
return 0;
}
static int journal_file_set_online(JournalFile *f) {
bool wait = true;
int journal_file_set_state(JournalFile *f, uint8_t state) {
assert(f);
assert(state < _STATE_MAX);
if (!journal_file_writable(f))
return -EPERM;
if (f->fd < 0 || !f->header)
if (!f->header)
return -EINVAL;
while (wait) {
switch (f->offline_state) {
case OFFLINE_JOINED:
/* No offline thread, no need to wait. */
wait = false;
break;
case OFFLINE_SYNCING: {
OfflineState tmp_state = OFFLINE_SYNCING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_CANCEL,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
/* Canceled syncing prior to offlining, no need to wait. */
wait = false;
break;
case OFFLINE_AGAIN_FROM_SYNCING: {
OfflineState tmp_state = OFFLINE_AGAIN_FROM_SYNCING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_CANCEL,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
/* Canceled restart from syncing, no need to wait. */
wait = false;
break;
case OFFLINE_AGAIN_FROM_OFFLINING: {
OfflineState tmp_state = OFFLINE_AGAIN_FROM_OFFLINING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_CANCEL,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
/* Canceled restart from offlining, must wait for offlining to complete however. */
_fallthrough_;
default: {
int r;
r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;
wait = false;
break;
}
}
}
if (mmap_cache_fd_got_sigbus(f->cache_fd))
return -EIO;
switch (f->header->state) {
case STATE_ONLINE:
return 0;
case STATE_OFFLINE:
f->header->state = STATE_ONLINE;
(void) fsync(f->fd);
f->header->state = state;
return 0;
default:
@ -453,7 +380,7 @@ static int journal_file_refresh_header(JournalFile *f) {
/* We used to update the header's boot ID field here, but we don't do that anymore, as per
* HEADER_COMPATIBLE_TAIL_ENTRY_BOOT_ID */
r = journal_file_set_online(f);
r = journal_file_set_state(f, STATE_ONLINE);
/* Sync the online state to disk; likely just created a new file, also sync the directory this file
* is located in. */
@ -1235,7 +1162,7 @@ int journal_file_append_object(
assert(type > OBJECT_UNUSED && type < _OBJECT_TYPE_MAX);
assert(size >= sizeof(ObjectHeader));
r = journal_file_set_online(f);
r = journal_file_set_state(f, STATE_ONLINE);
if (r < 0)
return r;
@ -4330,7 +4257,7 @@ int journal_file_parse_uid_from_filename(const char *path, uid_t *ret_uid) {
return parse_uid(buf, ret_uid);
}
int journal_file_archive(JournalFile *f, char **ret_previous_path) {
int journal_file_rename_for_archiving(JournalFile *f, char **ret_previous_path) {
_cleanup_free_ char *p = NULL;
assert(f);
@ -4365,14 +4292,6 @@ int journal_file_archive(JournalFile *f, char **ret_previous_path) {
*ret_previous_path = TAKE_PTR(f->path);
free_and_replace(f->path, p);
/* Set as archive so offlining commits w/state=STATE_ARCHIVED. Previously we would set old_file->header->state
* to STATE_ARCHIVED directly here, but journal_file_set_offline() short-circuits when state != STATE_ONLINE,
* which would result in the rotated journal never getting fsync() called before closing. Now we simply queue
* the archive state by setting an archive bit, leaving the state as STATE_ONLINE so proper offlining
* occurs. */
f->archive = true;
return 0;
}

View File

@ -49,16 +49,6 @@ typedef enum LocationType {
LOCATION_SEEK,
} LocationType;
typedef enum OfflineState {
OFFLINE_JOINED,
OFFLINE_SYNCING,
OFFLINE_OFFLINING,
OFFLINE_CANCEL,
OFFLINE_AGAIN_FROM_SYNCING,
OFFLINE_AGAIN_FROM_OFFLINING,
OFFLINE_DONE,
} OfflineState;
typedef struct JournalFile {
int fd;
MMapFileDescriptor *cache_fd;
@ -67,7 +57,6 @@ typedef struct JournalFile {
int open_flags;
bool close_fd:1;
bool archive:1;
bool strict_order:1;
direction_t last_direction;
@ -96,9 +85,6 @@ typedef struct JournalFile {
OrderedHashmap *chain_cache;
pthread_t offline_thread;
volatile OfflineState offline_state;
unsigned last_seen_generation;
uint64_t compress_threshold_bytes;
@ -157,7 +143,7 @@ int journal_file_open(
JournalFile *template,
JournalFile **ret);
int journal_file_set_offline_thread_join(JournalFile *f);
int journal_file_set_state(JournalFile *f, uint8_t state);
JournalFile* journal_file_close(JournalFile *j);
int journal_file_fstat(JournalFile *f);
DEFINE_TRIVIAL_CLEANUP_FUNC(JournalFile*, journal_file_close);
@ -308,7 +294,7 @@ int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint6
void journal_file_dump(JournalFile *f);
void journal_file_print_header(JournalFile *f);
int journal_file_archive(JournalFile *f, char **ret_previous_path);
int journal_file_rename_for_archiving(JournalFile *f, char **ret_previous_path);
int journal_file_parse_uid_from_filename(const char *path, uid_t *uid);
int journal_file_dispose(int dir_fd, const char *fname);

View File

@ -155,7 +155,7 @@ static void test_journal_flush_one(int argc, char *argv[]) {
if (n == 0)
return (void) log_tests_skipped("No journal entry found");
/* Open the new journal before archiving and offlining the file. */
/* Open the new journal before archiving the file. */
sd_journal_close(j);
assert_se(sd_journal_open_directory(&j, dn, SD_JOURNAL_ASSUME_IMMUTABLE) >= 0);
@ -175,10 +175,9 @@ static void test_journal_flush_one(int argc, char *argv[]) {
}
/* Archive and offline file. */
assert_se(journal_file_archive(new_journal, NULL) >= 0);
assert_se(journal_file_set_offline(new_journal, /* wait = */ true) >= 0);
assert_se(journal_file_archive(&new_journal, /* async = */ false) >= 0);
/* Read the archived and offline journal. */
/* Read the archived journal. */
for (uint64_t q = ALIGN64(p + 1); q < (uint64_t) j->current_file->last_stat.st_size; q = ALIGN64(q + 1)) {
Object *o;

View File

@ -1139,10 +1139,10 @@ static void append_test_entry_full(
ASSERT_OK(journal_file_rotate(
f,
/* async = */ false,
m,
/* file_flags = */ JOURNAL_STRICT_ORDER,
/* compress_threshold_bytes = */ UINT64_MAX,
/* deferred_closes = */ NULL));
/* compress_threshold_bytes = */ UINT64_MAX));
}
ASSERT_OK(journal_file_append_entry(

View File

@ -100,8 +100,8 @@ static void test_non_empty_one(void) {
assert_se(journal_file_move_to_entry_by_seqnum(f, 10, DIRECTION_DOWN, &o, NULL) == 0);
journal_file_rotate(&f, m, JOURNAL_SEAL|JOURNAL_COMPRESS, UINT64_MAX, NULL);
journal_file_rotate(&f, m, JOURNAL_SEAL|JOURNAL_COMPRESS, UINT64_MAX, NULL);
journal_file_rotate(&f, /* async = */ false, m, JOURNAL_SEAL|JOURNAL_COMPRESS, UINT64_MAX);
journal_file_rotate(&f, /* async = */ false, m, JOURNAL_SEAL|JOURNAL_COMPRESS, UINT64_MAX);
(void) journal_file_offline_close(f);

View File

@ -1,6 +1,5 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include <pthread.h>
#include <unistd.h>
#include "chattr-util.h"
@ -11,8 +10,7 @@
#include "journal-authenticate.h"
#include "journal-file-util.h"
#include "path-util.h"
#include "random-util.h"
#include "set.h"
#include "process-util.h"
#include "stat-util.h"
#include "sync-util.h"
@ -97,8 +95,14 @@ static int journal_file_punch_holes(JournalFile *f) {
ssize_t n = SSIZE_MAX;
int r;
assert(f);
r = journal_file_end_punch_hole(f);
if (r < 0)
return r;
r = journal_file_entry_array_punch_hole(
f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
if (r < 0)
return r;
@ -142,299 +146,148 @@ static int journal_file_punch_holes(JournalFile *f) {
return 0;
}
/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
* As a result we use atomic operations on f->offline_state for inter-thread communications with
* journal_file_set_offline() and journal_file_set_online(). */
static void journal_file_set_offline_internal(JournalFile *f) {
int r;
assert(f);
assert(f->fd >= 0);
assert(f->header);
for (;;) {
switch (f->offline_state) {
case OFFLINE_CANCEL: {
OfflineState tmp_state = OFFLINE_CANCEL;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_DONE,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
return;
case OFFLINE_AGAIN_FROM_SYNCING: {
OfflineState tmp_state = OFFLINE_AGAIN_FROM_SYNCING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_SYNCING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
break;
case OFFLINE_AGAIN_FROM_OFFLINING: {
OfflineState tmp_state = OFFLINE_AGAIN_FROM_OFFLINING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_SYNCING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
break;
case OFFLINE_SYNCING:
if (f->archive) {
(void) journal_file_end_punch_hole(f);
(void) journal_file_punch_holes(f);
}
(void) fsync(f->fd);
{
OfflineState tmp_state = OFFLINE_SYNCING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_OFFLINING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
f->header->state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
(void) fsync(f->fd);
/* If we've archived the journal file, first try to re-enable COW on the file. If the
* FS_NOCOW_FL flag was never set or we successfully removed it, continue. If we fail
* to remove the flag on the archived file, rewrite the file without the NOCOW flag.
* We need this fallback because on some filesystems (BTRFS), the NOCOW flag cannot
* be removed after data has been written to a file. The only way to remove it is to
* copy all data to a new file without the NOCOW flag set. */
if (f->archive) {
r = chattr_fd(f->fd, 0, FS_NOCOW_FL, NULL);
if (r >= 0)
continue;
log_debug_errno(r, "Failed to re-enable copy-on-write for %s: %m, rewriting file", f->path);
/* Here, setting COPY_VERIFY_LINKED flag is crucial. Otherwise, a broken
* journal file may be created, if journal_directory_vacuum() ->
* unlinkat_deallocate() is called in the main thread while this thread is
* copying the file. See issue #24150 and #31222. */
r = copy_file_atomic_at_full(
f->fd, NULL, AT_FDCWD, f->path, f->mode,
0,
FS_NOCOW_FL,
COPY_REPLACE | COPY_FSYNC | COPY_HOLES | COPY_ALL_XATTRS | COPY_VERIFY_LINKED,
NULL, NULL);
if (r < 0) {
log_debug_errno(r, "Failed to rewrite %s: %m", f->path);
continue;
}
}
break;
case OFFLINE_OFFLINING: {
OfflineState tmp_state = OFFLINE_OFFLINING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_DONE,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
_fallthrough_;
case OFFLINE_DONE:
return;
case OFFLINE_JOINED:
log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
return;
}
}
}
static void * journal_file_set_offline_thread(void *arg) {
JournalFile *f = arg;
(void) pthread_setname_np(pthread_self(), "journal-offline");
journal_file_set_offline_internal(f);
return NULL;
}
/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
static bool journal_file_set_offline_try_restart(JournalFile *f) {
for (;;) {
switch (f->offline_state) {
case OFFLINE_AGAIN_FROM_SYNCING:
case OFFLINE_AGAIN_FROM_OFFLINING:
return true;
case OFFLINE_CANCEL: {
OfflineState tmp_state = OFFLINE_CANCEL;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_SYNCING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
return true;
case OFFLINE_SYNCING: {
OfflineState tmp_state = OFFLINE_SYNCING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_SYNCING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
return true;
case OFFLINE_OFFLINING: {
OfflineState tmp_state = OFFLINE_OFFLINING;
if (!__atomic_compare_exchange_n(&f->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_OFFLINING,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue;
}
return true;
default:
return false;
}
}
}
/* Sets a journal offline.
*
* If wait is false then an offline is dispatched in a separate thread for a
* subsequent journal_file_set_offline() or journal_file_set_online() of the
* same journal to synchronize with.
*
* If wait is true, then either an existing offline thread will be restarted
* and joined, or if none exists the offline is simply performed in this
* context without involving another thread.
*/
int journal_file_set_offline(JournalFile *f, bool wait) {
int target_state;
bool restarted;
void journal_file_finalize(JournalFile *f, uint8_t state) {
int r;
assert(f);
assert(IN_SET(state, STATE_OFFLINE, STATE_ARCHIVED));
if (!journal_file_writable(f))
return -EPERM;
if (f->fd < 0 || !f->header)
return -EINVAL;
target_state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
/* An offlining journal is implicitly online and may modify f->header->state,
* we must also join any potentially lingering offline thread when already in
* the desired offline state.
*/
if (!journal_file_is_offlining(f) && f->header->state == target_state)
return journal_file_set_offline_thread_join(f);
/* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
restarted = journal_file_set_offline_try_restart(f);
if ((restarted && wait) || !restarted) {
r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;
}
if (restarted)
return 0;
/* Initiate a new offline. */
f->offline_state = OFFLINE_SYNCING;
if (wait) {
/* Without using a thread if waiting. */
journal_file_set_offline_internal(f);
assert(f->offline_state == OFFLINE_DONE);
f->offline_state = OFFLINE_JOINED;
} else {
sigset_t ss, saved_ss;
int k;
assert_se(sigfillset(&ss) >= 0);
/* Don't block SIGBUS since the offlining thread accesses a memory mapped file.
* Asynchronous SIGBUS signals can safely be handled by either thread. */
assert_se(sigdelset(&ss, SIGBUS) >= 0);
r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss);
if (r > 0)
return -r;
r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL);
if (r > 0) {
f->offline_state = OFFLINE_JOINED;
return -r;
}
if (k > 0)
return -k;
}
return 0;
}
bool journal_file_is_offlining(JournalFile *f) {
assert(f);
__atomic_thread_fence(__ATOMIC_SEQ_CST);
if (IN_SET(f->offline_state, OFFLINE_DONE, OFFLINE_JOINED))
return false;
return true;
}
void journal_file_write_final_tag(JournalFile *f) {
assert(f);
#if HAVE_GCRYPT
if (!JOURNAL_HEADER_SEALED(f->header) || !journal_file_writable(f))
return;
int r = journal_file_append_tag(f);
#if HAVE_GCRYPT
/* Write the final tag. */
r = journal_file_append_tag(f);
if (r < 0)
log_debug_errno(r, "Failed to append tag when closing journal: %m");
log_debug_errno(r, "Failed to append tag on closing journal file '%s', ignoring: %m", f->path);
#endif
/* offlining the file. */
r = journal_file_set_state(f, state);
if (r < 0)
log_debug_errno(r, "Failed to offlining journal file '%s', ignoring: %m", f->path);
/* If there is a scheduled task, finish it now. */
if (sd_event_source_get_enabled(f->post_change_timer, NULL) > 0) {
journal_file_post_change(f);
r = sd_event_source_set_enabled(f->post_change_timer, SD_EVENT_OFF);
if (r < 0)
log_debug_errno(r, "Failed to disable post change timer for journal file '%s', ignoring: %m", f->path);
}
}
JournalFile* journal_file_offline_close(JournalFile *f) {
if (!f)
return NULL;
journal_file_write_final_tag(f);
if (!journal_file_writable(f))
return journal_file_close(f);
if (sd_event_source_get_enabled(f->post_change_timer, NULL) > 0)
journal_file_post_change(f);
f->post_change_timer = sd_event_source_disable_unref(f->post_change_timer);
journal_file_set_offline(f, true);
assert(f->close_fd);
assert(f->fd >= 0);
journal_file_finalize(f, STATE_OFFLINE);
(void) fsync(f->fd);
return journal_file_close(f);
}
JournalFile* journal_file_initiate_close(JournalFile *f, Set *deferred_closes) {
static void journal_file_post_archive_tasks(JournalFile *f) {
int r;
assert(f);
assert(f->fd >= 0);
assert(f->path);
if (deferred_closes) {
r = set_put(deferred_closes, f);
(void) journal_file_punch_holes(f);
if (fsync(f->fd) < 0)
log_debug_errno(errno, "Failed to fsync() journal file '%s', ignoring: %m", f->path);
/* If we've archived the journal file, first try to re-enable COW on the file. If the FS_NOCOW_FL
* flag was never set or we successfully removed it, continue. If we fail to remove the flag on the
* archived file, rewrite the file without the NOCOW flag. We need this fallback because on some
* filesystems (BTRFS), the NOCOW flag cannot be removed after data has been written to a file. The
* only way to remove it is to copy all data to a new file without the NOCOW flag set. */
r = chattr_fd(f->fd, 0, FS_NOCOW_FL, NULL);
if (r < 0) {
log_debug_errno(r, "Failed to re-enable copy-on-write for %s, rewriting file: %m", f->path);
/* Here, setting COPY_VERIFY_LINKED flag is crucial. Otherwise, a broken journal file may be
* created, if journal_directory_vacuum() -> unlinkat_deallocate() is called in the main
* process while this process is copying the file. See issue #24150 and #31222. */
r = copy_file_atomic_at_full(
f->fd, NULL, AT_FDCWD, f->path, f->mode,
0,
FS_NOCOW_FL,
COPY_REPLACE | COPY_FSYNC | COPY_HOLES | COPY_ALL_XATTRS | COPY_VERIFY_LINKED,
NULL, NULL);
if (r < 0)
log_debug_errno(r, "Failed to add file to deferred close set, closing immediately.");
else {
(void) journal_file_set_offline(f, false);
return NULL;
log_debug_errno(r, "Failed to rewrite %s, ignoring: %m", f->path);
}
}
static int journal_file_archive_impl(JournalFile *f, bool async, char **ret_original_name) {
int r;
assert(f);
assert(f->fd >= 0);
assert(f->header);
if (!journal_file_writable(f))
return -EPERM;
if (f->header->state == STATE_ARCHIVED)
return -EINVAL;
r = journal_file_rename_for_archiving(f, ret_original_name);
if (r < 0)
return r;
journal_file_finalize(f, STATE_ARCHIVED);
if (async) {
r = safe_fork_full("(journal-archiver)", NULL, &f->fd, 1,
FORK_RESET_SIGNALS|FORK_CLOSE_ALL_FDS|FORK_DETACH, NULL);
if (r == 0) {
/* Child process */
journal_file_post_archive_tasks(f);
_exit(EXIT_SUCCESS);
}
/* Parent process */
if (r < 0) {
log_debug_errno(r, "Failed to fork process for archiving journal file '%s', ignoring: %m", f->path);
/* On failure, process synchronously. */
async = false;
}
}
return journal_file_offline_close(f);
if (!async)
journal_file_post_archive_tasks(f);
return 0;
}
int journal_file_archive(JournalFile **f, bool async) {
int r;
assert(f);
assert(*f);
r = journal_file_archive_impl(*f, async, NULL);
if (r < 0)
return r;
*f = journal_file_close(*f);
return 0;
}
int journal_file_rotate(
JournalFile **f,
bool async,
MMapCache *mmap_cache,
JournalFileFlags file_flags,
uint64_t compress_threshold_bytes,
Set *deferred_closes) {
uint64_t compress_threshold_bytes) {
_cleanup_free_ char *path = NULL;
JournalFile *new_file = NULL;
@ -443,13 +296,10 @@ int journal_file_rotate(
assert(f);
assert(*f);
journal_file_write_final_tag(*f);
r = journal_file_archive(*f, &path);
r = journal_file_archive_impl(*f, async, &path);
if (r < 0)
return r;
set_clear_with_destructor(deferred_closes, journal_file_offline_close);
r = journal_file_open(
/* fd= */ -EBADF,
path,
@ -462,9 +312,9 @@ int journal_file_rotate(
/* template= */ *f,
&new_file);
journal_file_initiate_close(*f, deferred_closes);
*f = new_file;
journal_file_close(*f);
*f = new_file;
return r;
}

View File

@ -1,11 +1,12 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
#include "journal-file.h"
#include <inttypes.h>
int journal_file_set_offline(JournalFile *f, bool wait);
bool journal_file_is_offlining(JournalFile *f);
void journal_file_write_final_tag(JournalFile *f);
#include "journal-file.h"
#include "macro.h"
void journal_file_finalize(JournalFile *f, uint8_t state);
JournalFile* journal_file_offline_close(JournalFile *f);
DEFINE_TRIVIAL_CLEANUP_FUNC(JournalFile*, journal_file_offline_close);
@ -19,10 +20,11 @@ int journal_file_open_reliably(
MMapCache *mmap_cache,
JournalFile **ret);
JournalFile* journal_file_initiate_close(JournalFile *f, Set *deferred_closes);
int journal_file_archive(JournalFile **f, bool async);
int journal_file_rotate(
JournalFile **f,
bool async,
MMapCache *mmap_cache,
JournalFileFlags file_flags,
uint64_t compress_threshold_bytes,
Set *deferred_closes);
uint64_t compress_threshold_bytes);