From 1cf2dc4d2395b616ea32752fb74de790fa12a244 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Wed, 16 Oct 2024 14:39:44 +0300 Subject: [PATCH 1/4] basic: expose compression level argument in compress_blob functions --- docs/ENVIRONMENT.md | 2 +- src/basic/compress.c | 40 ++++++++++++------ src/basic/compress.h | 15 +++---- src/fuzz/fuzz-compress.c | 2 +- src/libsystemd/sd-journal/journal-file.c | 2 +- src/test/test-compress-benchmark.c | 10 ++--- src/test/test-compress.c | 52 ++++++++++++------------ 7 files changed, 70 insertions(+), 53 deletions(-) diff --git a/docs/ENVIRONMENT.md b/docs/ENVIRONMENT.md index cf5fb91eb96..afe257d17a3 100644 --- a/docs/ENVIRONMENT.md +++ b/docs/ENVIRONMENT.md @@ -611,7 +611,7 @@ SYSTEMD_HOME_DEBUG_SUFFIX=foo \ 32-bit offsets. Enabled by default. * `$SYSTEMD_JOURNAL_COMPRESS` – Takes a boolean, or one of the compression - algorithms "XZ", "LZ4", and "ZSTD". If enabled, the default compression + algorithms "xz", "lz4", and "zstd". If enabled, the default compression algorithm set at compile time will be used when opening a new journal file. If disabled, the journal file compression will be disabled. Note that the compression mode of existing journal files are not changed. To make the diff --git a/src/basic/compress.c b/src/basic/compress.c index 06db2eed7d7..aea745a0cc1 100644 --- a/src/basic/compress.c +++ b/src/basic/compress.c @@ -42,6 +42,7 @@ static DLSYM_PROTOTYPE(LZ4F_freeCompressionContext) = NULL; static DLSYM_PROTOTYPE(LZ4F_freeDecompressionContext) = NULL; static DLSYM_PROTOTYPE(LZ4F_isError) = NULL; DLSYM_PROTOTYPE(LZ4_compress_default) = NULL; +DLSYM_PROTOTYPE(LZ4_compress_HC) = NULL; DLSYM_PROTOTYPE(LZ4_decompress_safe) = NULL; DLSYM_PROTOTYPE(LZ4_decompress_safe_partial) = NULL; DLSYM_PROTOTYPE(LZ4_versionNumber) = NULL; @@ -93,6 +94,7 @@ static DLSYM_PROTOTYPE(lzma_easy_encoder) = NULL; static DLSYM_PROTOTYPE(lzma_end) = NULL; static DLSYM_PROTOTYPE(lzma_stream_buffer_encode) = NULL; static DLSYM_PROTOTYPE(lzma_stream_decoder) = NULL; +static DLSYM_PROTOTYPE(lzma_lzma_preset) = NULL; /* We can't just do _cleanup_(sym_lzma_end) because a compiler bug makes * this fail with: @@ -109,10 +111,10 @@ static inline void lzma_end_wrapper(lzma_stream *ls) { #define ALIGN_8(l) ALIGN_TO(l, sizeof(size_t)) static const char* const compression_table[_COMPRESSION_MAX] = { - [COMPRESSION_NONE] = "NONE", - [COMPRESSION_XZ] = "XZ", - [COMPRESSION_LZ4] = "LZ4", - [COMPRESSION_ZSTD] = "ZSTD", + [COMPRESSION_NONE] = "none", + [COMPRESSION_XZ] = "xz", + [COMPRESSION_LZ4] = "lz4", + [COMPRESSION_ZSTD] = "zstd", }; DEFINE_STRING_TABLE_LOOKUP(compression, Compression); @@ -141,12 +143,13 @@ int dlopen_lzma(void) { DLSYM_ARG(lzma_easy_encoder), DLSYM_ARG(lzma_end), DLSYM_ARG(lzma_stream_buffer_encode), + DLSYM_ARG(lzma_lzma_preset), DLSYM_ARG(lzma_stream_decoder)); } #endif int compress_blob_xz(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -155,12 +158,12 @@ int compress_blob_xz(const void *src, uint64_t src_size, assert(dst_size); #if HAVE_XZ - static const lzma_options_lzma opt = { + lzma_options_lzma opt = { 1u << 20u, NULL, 0, LZMA_LC_DEFAULT, LZMA_LP_DEFAULT, LZMA_PB_DEFAULT, LZMA_MODE_FAST, 128, LZMA_MF_HC3, 4 }; - static const lzma_filter filters[] = { - { LZMA_FILTER_LZMA2, (lzma_options_lzma*) &opt }, + const lzma_filter filters[] = { + { LZMA_FILTER_LZMA2, &opt }, { LZMA_VLI_UNKNOWN, NULL } }; lzma_ret ret; @@ -171,6 +174,12 @@ int compress_blob_xz(const void *src, uint64_t src_size, if (r < 0) return r; + if (level >= 0) { + r = sym_lzma_lzma_preset(&opt, (uint32_t) level); + if (r < 0) + return r; + } + /* Returns < 0 if we couldn't compress the data or the * compressed result is longer than the original */ @@ -210,6 +219,7 @@ int dlopen_lz4(void) { DLSYM_ARG(LZ4F_freeDecompressionContext), DLSYM_ARG(LZ4F_isError), DLSYM_ARG(LZ4_compress_default), + DLSYM_ARG(LZ4_compress_HC), DLSYM_ARG(LZ4_decompress_safe), DLSYM_ARG(LZ4_decompress_safe_partial), DLSYM_ARG(LZ4_versionNumber)); @@ -217,7 +227,7 @@ int dlopen_lz4(void) { #endif int compress_blob_lz4(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -237,7 +247,10 @@ int compress_blob_lz4(const void *src, uint64_t src_size, if (src_size < 9) return -ENOBUFS; - r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8); + if (level <= 0) + r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8); + else + r = sym_LZ4_compress_HC(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8, level); if (r <= 0) return -ENOBUFS; @@ -281,7 +294,7 @@ int dlopen_zstd(void) { int compress_blob_zstd( const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { assert(src); assert(src_size > 0); @@ -297,7 +310,10 @@ int compress_blob_zstd( if (r < 0) return r; - k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, 0); + if (level < 0) + level = 0; + + k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, level); if (sym_ZSTD_isError(k)) return zstd_ret_to_errno(k); diff --git a/src/basic/compress.h b/src/basic/compress.h index 1ad87ee8781..c72b1b7e6fe 100644 --- a/src/basic/compress.h +++ b/src/basic/compress.h @@ -8,6 +8,7 @@ #if HAVE_LZ4 #include +#include #include #endif @@ -28,11 +29,11 @@ Compression compression_from_string(const char *compression); bool compression_supported(Compression c); int compress_blob_xz(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int compress_blob_lz4(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int compress_blob_zstd(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); int decompress_blob_xz(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -90,15 +91,15 @@ int dlopen_lzma(void); static inline int compress_blob( Compression compression, const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size) { + void *dst, size_t dst_alloc_size, size_t *dst_size, int level) { switch (compression) { case COMPRESSION_ZSTD: - return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size, level); case COMPRESSION_LZ4: - return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size, level); case COMPRESSION_XZ: - return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size); + return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size, level); default: return -EOPNOTSUPP; } diff --git a/src/fuzz/fuzz-compress.c b/src/fuzz/fuzz-compress.c index c3f68f62dd2..6fcad736b18 100644 --- a/src/fuzz/fuzz-compress.c +++ b/src/fuzz/fuzz-compress.c @@ -42,7 +42,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { } size_t csize; - r = compress_blob(alg, h->data, data_len, buf, size, &csize); + r = compress_blob(alg, h->data, data_len, buf, size, &csize, /* level = */ -1); if (r < 0) { log_error_errno(r, "Compression failed: %m"); return 0; diff --git a/src/libsystemd/sd-journal/journal-file.c b/src/libsystemd/sd-journal/journal-file.c index 7e941edb199..57e5d0c8e33 100644 --- a/src/libsystemd/sd-journal/journal-file.c +++ b/src/libsystemd/sd-journal/journal-file.c @@ -1810,7 +1810,7 @@ static int maybe_compress_payload(JournalFile *f, uint8_t *dst, const uint8_t *s if (c == COMPRESSION_NONE || size < f->compress_threshold_bytes) return 0; - r = compress_blob(c, src, size, dst, size - 1, rsize); + r = compress_blob(c, src, size, dst, size - 1, rsize, /* level = */ -1); if (r < 0) return log_debug_errno(r, "Failed to compress data object using %s, ignoring: %m", compression_to_string(c)); diff --git a/src/test/test-compress-benchmark.c b/src/test/test-compress-benchmark.c index 1727db8134d..6b0aef31373 100644 --- a/src/test/test-compress-benchmark.c +++ b/src/test/test-compress-benchmark.c @@ -13,7 +13,7 @@ #include "tests.h" typedef int (compress_t)(const void *src, uint64_t src_size, void *dst, - size_t dst_alloc_size, size_t *dst_size); + size_t dst_alloc_size, size_t *dst_size, int level); typedef int (decompress_t)(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -100,7 +100,7 @@ static void test_compress_decompress(const char* label, const char* type, memzero(buf, MIN(size + 1000, MAX_SIZE)); - r = compress(text, size, buf, size, &j); + r = compress(text, size, buf, size, &j, /* level = */ -1); /* assume compression must be successful except for small or random inputs */ assert_se(r >= 0 || (size < 2048 && r == -ENOBUFS) || streq(type, "random")); @@ -160,13 +160,13 @@ int main(int argc, char *argv[]) { NULSTR_FOREACH(i, "zeros\0simple\0random\0") { #if HAVE_XZ - test_compress_decompress("XZ", i, compress_blob_xz, decompress_blob_xz); + test_compress_decompress("xz", i, compress_blob_xz, decompress_blob_xz); #endif #if HAVE_LZ4 - test_compress_decompress("LZ4", i, compress_blob_lz4, decompress_blob_lz4); + test_compress_decompress("lz4", i, compress_blob_lz4, decompress_blob_lz4); #endif #if HAVE_ZSTD - test_compress_decompress("ZSTD", i, compress_blob_zstd, decompress_blob_zstd); + test_compress_decompress("zstd", i, compress_blob_zstd, decompress_blob_zstd); #endif } return 0; diff --git a/src/test/test-compress.c b/src/test/test-compress.c index 86311c6217e..63b5659946e 100644 --- a/src/test/test-compress.c +++ b/src/test/test-compress.c @@ -33,7 +33,7 @@ #define HUGE_SIZE (4096*1024) typedef int (compress_blob_t)(const void *src, uint64_t src_size, - void *dst, size_t dst_alloc_size, size_t *dst_size); + void *dst, size_t dst_alloc_size, size_t *dst_size, int level); typedef int (decompress_blob_t)(const void *src, uint64_t src_size, void **dst, size_t* dst_size, size_t dst_max); @@ -62,7 +62,7 @@ _unused_ static void test_compress_decompress( log_info("/* testing %s %s blob compression/decompression */", compression, data); - r = compress(data, data_len, compressed, sizeof(compressed), &csize); + r = compress(data, data_len, compressed, sizeof(compressed), &csize, /* level = */ -1); if (r == -ENOBUFS) { log_info_errno(r, "compression failed: %m"); assert_se(may_fail); @@ -111,14 +111,14 @@ _unused_ static void test_decompress_startswith(const char *compression, compressed = compressed1 = malloc(BUFSIZE_1); assert_se(compressed1); - r = compress(data, data_len, compressed, BUFSIZE_1, &csize); + r = compress(data, data_len, compressed, BUFSIZE_1, &csize, /* level = */ -1); if (r == -ENOBUFS) { log_info_errno(r, "compression failed: %m"); assert_se(may_fail); compressed = compressed2 = malloc(BUFSIZE_2); assert_se(compressed2); - r = compress(data, data_len, compressed, BUFSIZE_2, &csize); + r = compress(data, data_len, compressed, BUFSIZE_2, &csize, /* level = */ -1); } assert_se(r >= 0); @@ -150,7 +150,7 @@ _unused_ static void test_decompress_startswith_short(const char *compression, log_info("/* %s with %s */", __func__, compression); - r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize); + r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize, /* level = */ -1); assert_se(r >= 0); for (size_t i = 1; i < strlen(TEXT); i++) { @@ -292,25 +292,25 @@ int main(int argc, char *argv[]) { random_bytes(data + 7, sizeof(data) - 7); #if HAVE_XZ - test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz, + test_compress_decompress("xz", compress_blob_xz, decompress_blob_xz, text, sizeof(text), false); - test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz, + test_compress_decompress("xz", compress_blob_xz, decompress_blob_xz, data, sizeof(data), true); - test_decompress_startswith("XZ", + test_decompress_startswith("xz", compress_blob_xz, decompress_startswith_xz, text, sizeof(text), false); - test_decompress_startswith("XZ", + test_decompress_startswith("xz", compress_blob_xz, decompress_startswith_xz, data, sizeof(data), true); - test_decompress_startswith("XZ", + test_decompress_startswith("xz", compress_blob_xz, decompress_startswith_xz, huge, HUGE_SIZE, true); - test_compress_stream("XZ", "xzcat", + test_compress_stream("xz", "xzcat", compress_stream_xz, decompress_stream_xz, srcfile); - test_decompress_startswith_short("XZ", compress_blob_xz, decompress_startswith_xz); + test_decompress_startswith_short("xz", compress_blob_xz, decompress_startswith_xz); #else log_info("/* XZ test skipped */"); @@ -318,27 +318,27 @@ int main(int argc, char *argv[]) { #if HAVE_LZ4 if (dlopen_lz4() >= 0) { - test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4, + test_compress_decompress("lz4", compress_blob_lz4, decompress_blob_lz4, text, sizeof(text), false); - test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4, + test_compress_decompress("lz4", compress_blob_lz4, decompress_blob_lz4, data, sizeof(data), true); - test_decompress_startswith("LZ4", + test_decompress_startswith("lz4", compress_blob_lz4, decompress_startswith_lz4, text, sizeof(text), false); - test_decompress_startswith("LZ4", + test_decompress_startswith("lz4", compress_blob_lz4, decompress_startswith_lz4, data, sizeof(data), true); - test_decompress_startswith("LZ4", + test_decompress_startswith("lz4", compress_blob_lz4, decompress_startswith_lz4, huge, HUGE_SIZE, true); - test_compress_stream("LZ4", "lz4cat", + test_compress_stream("lz4", "lz4cat", compress_stream_lz4, decompress_stream_lz4, srcfile); test_lz4_decompress_partial(); - test_decompress_startswith_short("LZ4", compress_blob_lz4, decompress_startswith_lz4); + test_decompress_startswith_short("lz4", compress_blob_lz4, decompress_startswith_lz4); } else log_error("/* Can't load liblz4 */"); #else @@ -346,25 +346,25 @@ int main(int argc, char *argv[]) { #endif #if HAVE_ZSTD - test_compress_decompress("ZSTD", compress_blob_zstd, decompress_blob_zstd, + test_compress_decompress("zstd", compress_blob_zstd, decompress_blob_zstd, text, sizeof(text), false); - test_compress_decompress("ZSTD", compress_blob_zstd, decompress_blob_zstd, + test_compress_decompress("zstd", compress_blob_zstd, decompress_blob_zstd, data, sizeof(data), true); - test_decompress_startswith("ZSTD", + test_decompress_startswith("zstd", compress_blob_zstd, decompress_startswith_zstd, text, sizeof(text), false); - test_decompress_startswith("ZSTD", + test_decompress_startswith("zstd", compress_blob_zstd, decompress_startswith_zstd, data, sizeof(data), true); - test_decompress_startswith("ZSTD", + test_decompress_startswith("zstd", compress_blob_zstd, decompress_startswith_zstd, huge, HUGE_SIZE, true); - test_compress_stream("ZSTD", "zstdcat", + test_compress_stream("zstd", "zstdcat", compress_stream_zstd, decompress_stream_zstd, srcfile); - test_decompress_startswith_short("ZSTD", compress_blob_zstd, decompress_startswith_zstd); + test_decompress_startswith_short("zstd", compress_blob_zstd, decompress_startswith_zstd); #else log_info("/* ZSTD test skipped */"); #endif From 151319aad572113db8d5cc15e70d6c82f2022688 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Wed, 16 Oct 2024 14:58:59 +0300 Subject: [PATCH 2/4] journal-remote: added compression and compression level --- man/journal-upload.conf.xml | 20 ++++++++ src/journal-remote/journal-upload-journal.c | 22 ++++++++- src/journal-remote/journal-upload.c | 52 +++++++++++++++++++-- src/journal-remote/journal-upload.h | 4 ++ src/shared/conf-parser.c | 31 ++++++++++++ src/shared/conf-parser.h | 1 + 6 files changed, 124 insertions(+), 6 deletions(-) diff --git a/man/journal-upload.conf.xml b/man/journal-upload.conf.xml index 7d3f22f96b4..c5c2e8e8bbb 100644 --- a/man/journal-upload.conf.xml +++ b/man/journal-upload.conf.xml @@ -60,6 +60,26 @@ + + Compression= + + Compression algorithm to be applied to logs data before sending. + Supported values are zstd, xz, lz4, none. + Defaults to none. + + + + + + CompressionLevel= + + Specifies the compression level of data to be uploaded. + Each algorithm has it's own range of allowed integer values: xz - [1 - 9], zstd - [1 - 22], lz4 - [1 - 9]. + Defaults to -1, and the default compression level for each compression algorithm will be used. + + + + ServerKeyFile= diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c index 23ad3b2d312..abaa50def63 100644 --- a/src/journal-remote/journal-upload-journal.c +++ b/src/journal-remote/journal-upload-journal.c @@ -251,6 +251,7 @@ static void check_update_watchdog(Uploader *u) { static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { Uploader *u = ASSERT_PTR(userp); + _cleanup_free_ char *compression_buffer = NULL; int r; sd_journal *j; size_t filled = 0; @@ -262,6 +263,14 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void j = u->journal; + if (u->compression != COMPRESSION_NONE) { + compression_buffer = malloc_multiply(nmemb, size); + if (!compression_buffer) { + log_oom(); + return CURL_READFUNC_ABORT; + } + } + while (j && filled < size * nmemb) { if (u->entry_state == ENTRY_DONE) { r = sd_journal_next(j); @@ -284,7 +293,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void u->entry_state = ENTRY_CURSOR; } - w = write_entry((char*)buf + filled, size * nmemb - filled, u); + w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u); if (w < 0) return CURL_READFUNC_ABORT; filled += w; @@ -300,6 +309,17 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void u->entries_sent, u->current_cursor); } + if (filled > 0 && u->compression != COMPRESSION_NONE) { + size_t compressed_size = 0; + r = compress_blob(u->compression, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression_level); + if (r < 0) { + log_error_errno(r, "Failed to compress %zu bytes using Compression=%s, CompressionLevel=%d)", + filled, compression_to_string(u->compression), u->compression_level); + return CURL_READFUNC_ABORT; + } + return compressed_size; + } + return filled; } diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index eb36d03130c..66e2fd2ef23 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -11,6 +11,7 @@ #include "alloc-util.h" #include "build.h" +#include "compress.h" #include "conf-parser.h" #include "constants.h" #include "daemon-util.h" @@ -42,7 +43,7 @@ #define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem" #define DEFAULT_PORT 19532 -static const char* arg_url = NULL; +static const char *arg_url = NULL; static const char *arg_key = NULL; static const char *arg_cert = NULL; static const char *arg_trust = NULL; @@ -58,6 +59,8 @@ static bool arg_merge = false; static int arg_follow = -1; static const char *arg_save_state = NULL; static usec_t arg_network_timeout_usec = USEC_INFINITY; +static Compression arg_compression = COMPRESSION_NONE; +static int arg_compression_level = -1; STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep); @@ -203,6 +206,18 @@ int start_upload(Uploader *u, return log_oom(); h = l; + if (arg_compression != COMPRESSION_NONE) { + _cleanup_free_ char *header = NULL; + header = strjoin("Content-Encoding: ", compression_to_string(arg_compression)); + if (!header) + return log_oom(); + + l = curl_slist_append(h, header); + if (!l) + return log_oom(); + h = l; + } + u->header = TAKE_PTR(h); } @@ -292,8 +307,10 @@ int start_upload(Uploader *u, } static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + int r; Uploader *u = ASSERT_PTR(userp); ssize_t n; + _cleanup_free_ char *compression_buffer = NULL; assert(nmemb < SSIZE_MAX / size); @@ -302,10 +319,31 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user assert(!size_multiply_overflow(size, nmemb)); - n = read(u->input, buf, size * nmemb); - log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n); - if (n > 0) - return n; + if (u->compression != COMPRESSION_NONE) { + compression_buffer = malloc_multiply(nmemb, size); + if (!compression_buffer) { + log_oom(); + return CURL_READFUNC_ABORT; + } + } + + n = read(u->input, compression_buffer ?: buf, size * nmemb); + log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n); + + if (n > 0) { + if (u->compression == COMPRESSION_NONE) + return n; + + size_t compressed_size; + + r = compress_blob(u->compression, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression_level); + if (r < 0) { + log_error_errno(r, "Failed to compress %ld bytes using Compression=%s, CompressionLevel=%d): %m", + n, compression_to_string(u->compression), u->compression_level); + return CURL_READFUNC_ABORT; + } + return compressed_size; + } u->uploading = false; if (n < 0) { @@ -389,6 +427,8 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file) *u = (Uploader) { .input = -1, + .compression = arg_compression, + .compression_level = arg_compression_level, }; host = STARTSWITH_SET(url, "http://", "https://"); @@ -496,6 +536,8 @@ static int parse_config(void) { { "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert }, { "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust }, { "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec }, + { "Upload", "Compression", config_parse_compression, 0, &arg_compression }, + { "Upload", "CompressionLevel", config_parse_int, 0, &arg_compression_level }, {} }; diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h index 5ba3c4f1a0e..02199c17ae1 100644 --- a/src/journal-remote/journal-upload.h +++ b/src/journal-remote/journal-upload.h @@ -7,6 +7,8 @@ #include "sd-event.h" #include "sd-journal.h" +#include "compress.h" +#include "conf-parser.h" #include "time-util.h" typedef enum { @@ -53,6 +55,8 @@ typedef struct Uploader { char *last_cursor, *current_cursor; usec_t watchdog_timestamp; usec_t watchdog_usec; + Compression compression; + int compression_level; } Uploader; #define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC) diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c index 8c5a4a7013a..8872e96aaf1 100644 --- a/src/shared/conf-parser.c +++ b/src/shared/conf-parser.c @@ -11,6 +11,7 @@ #include "alloc-util.h" #include "chase.h" #include "calendarspec.h" +#include "compress.h" #include "conf-files.h" #include "conf-parser.h" #include "constants.h" @@ -1385,6 +1386,36 @@ int config_parse_warn_compat( return 0; } +int config_parse_compression( + const char *unit, + const char *filename, + unsigned line, + const char *section, + unsigned section_line, + const char *lvalue, + int ltype, + const char *rvalue, + void *data, + void *userdata) { + + char *compression = ASSERT_PTR(data); + + if (isempty(rvalue)) { + *compression = COMPRESSION_NONE; + return 0; + } + + Compression c = compression_from_string(rvalue); + if (c < 0) + return log_syntax_parse_error(unit, filename, line, c, lvalue, rvalue); + if (!compression_supported(c)) + return log_syntax(unit, LOG_WARNING, filename, line, 0, + "Compression=%s is not supported on a system", rvalue); + + *compression = c; + return 0; +} + int config_parse_log_facility( const char *unit, const char *filename, diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h index 1599738f849..6865cdc2be0 100644 --- a/src/shared/conf-parser.h +++ b/src/shared/conf-parser.h @@ -288,6 +288,7 @@ CONFIG_PARSER_PROTOTYPE(config_parse_sec_def_unset); CONFIG_PARSER_PROTOTYPE(config_parse_nsec); CONFIG_PARSER_PROTOTYPE(config_parse_mode); CONFIG_PARSER_PROTOTYPE(config_parse_warn_compat); +CONFIG_PARSER_PROTOTYPE(config_parse_compression); CONFIG_PARSER_PROTOTYPE(config_parse_log_facility); CONFIG_PARSER_PROTOTYPE(config_parse_log_level); CONFIG_PARSER_PROTOTYPE(config_parse_signal); From 74e3f77ddc9a9984927f390bb310b059ec7c60cb Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Wed, 16 Oct 2024 15:06:19 +0300 Subject: [PATCH 3/4] journal-remote: added custom headers support --- man/journal-upload.conf.xml | 20 +++ src/journal-remote/header-util.c | 170 ++++++++++++++++++++++++++ src/journal-remote/header-util.h | 7 ++ src/journal-remote/journal-upload.c | 54 ++++++++ src/journal-remote/journal-upload.h | 2 + src/journal-remote/meson.build | 7 ++ src/journal-remote/test-header-util.c | 39 ++++++ 7 files changed, 299 insertions(+) create mode 100644 src/journal-remote/header-util.c create mode 100644 src/journal-remote/header-util.h create mode 100644 src/journal-remote/test-header-util.c diff --git a/man/journal-upload.conf.xml b/man/journal-upload.conf.xml index c5c2e8e8bbb..1f64ae38782 100644 --- a/man/journal-upload.conf.xml +++ b/man/journal-upload.conf.xml @@ -80,6 +80,26 @@ + + Header= + + HTTP header which should be added to each request to URL. Headers are expected to + be unquoted and it's expected to be no more than one header per each option. + + Header name can contains alphanumeric values, "_" and "-" symbols additionally. + + This option may be specified more than once, in which case all listed headers will be set. If + the same header name is listed twice, the later setting will override the earlier setting. + + + Example: + Header=HeaderName: HeaderValue + adds HeaderName header with HeaderValue to each HTTP request. + + + + + ServerKeyFile= diff --git a/src/journal-remote/header-util.c b/src/journal-remote/header-util.c new file mode 100644 index 00000000000..21885315a1a --- /dev/null +++ b/src/journal-remote/header-util.c @@ -0,0 +1,170 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include "header-util.h" +#include "strv.h" + +/* HTTP header name can contains: +- Alphanumeric characters: a-z, A-Z, and 0-9 +- The following special characters: - and _ */ +#define VALID_HEADER_NAME_CHARS \ + DIGITS LETTERS "_" \ + "-" + +#define VALID_HEADER_NAME_LENGTH 40 + +#define VALID_HEADER_VALUE_CHARS \ + DIGITS LETTERS "_" \ + " " \ + ":" \ + ";" \ + "." \ + "," \ + "\\" \ + "/" \ + "'" \ + "\"" \ + "?" \ + "!" \ + "(" \ + ")" \ + "{" \ + "}" \ + "[" \ + "]" \ + "@" \ + "<" \ + ">" \ + "=" \ + "-" \ + "+" \ + "*" \ + "#" \ + "$" \ + "&" \ + "`" \ + "|" \ + "~" \ + "^" \ + "%" + +static bool header_name_is_valid(const char *e, size_t n) { + if (!e) + return false; + + if (n > VALID_HEADER_NAME_LENGTH) + return false; + + for (const char *p = e; p < e + n; p++) + if (!strchr(VALID_HEADER_NAME_CHARS, *p)) + return false; + + return true; +} + +static bool header_value_is_valid(const char *e) { + if (!e) + return false; + + int n = strlen_ptr(e); + + if (n < 0) + return false; + + for (const char *p = e; p < e + n; p++) + if (!strchr(VALID_HEADER_VALUE_CHARS, *p)) + return false; + + return true; +} + +bool header_is_valid(const char *e) { + const char *eq; + + eq = strchr(e, ':'); + if (!eq) + return false; + + if (!header_name_is_valid(e, eq - e)) + return false; + + if (!header_value_is_valid(skip_leading_chars(eq + 1, WHITESPACE))) + return false; + + return true; +} + +static bool header_entry_has_name(const char *entry, const char *name) { + const char *t; + + assert(entry); + assert(name); + + t = startswith(entry, name); + if (!t) + return false; + + return *t == ':'; +} + +static char **strv_header_unset(char **l, const char *name) { + assert(name); + + if (!l) + return NULL; + + /* Drops every occurrence of the header var setting p in the + * string list. Edits in-place. */ + + char **f, **t; + for (f = t = l; *f; f++) { + if (header_entry_has_name(*f, name)) { + free(*f); + continue; + } + + *(t++) = *f; + } + + *t = NULL; + return l; +} + +int strv_header_replace_consume(char ***l, char *p) { + const char *t, *name; + int r; + + assert(p); + + /* p must be a valid "key: value" assignment. */ + + t = strchr(p, ':'); + if (!t) { + free(p); + return -EINVAL; + } + + if (!header_is_valid(p)) { + free(p); + return -EINVAL; + } + + name = strndupa(p, t - p); + if (!name) { + free(p); + return -ENOMEM; + } + + STRV_FOREACH(f, *l) + if (header_entry_has_name(*f, name)) { + free_and_replace(*f, p); + strv_header_unset(f + 1, name); + return 0; + } + + /* We didn't find a match, we need to append p or create a new strv */ + r = strv_consume(l, p); + if (r < 0) + return r; + + return 1; +} diff --git a/src/journal-remote/header-util.h b/src/journal-remote/header-util.h new file mode 100644 index 00000000000..d3e0500494b --- /dev/null +++ b/src/journal-remote/header-util.h @@ -0,0 +1,7 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include + +bool header_is_valid(const char *e); +int strv_header_replace_consume(char ***l, char *p); diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index 66e2fd2ef23..20743868caa 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -16,11 +16,13 @@ #include "constants.h" #include "daemon-util.h" #include "env-file.h" +#include "escape.h" #include "fd-util.h" #include "fileio.h" #include "format-util.h" #include "fs-util.h" #include "glob-util.h" +#include "header-util.h" #include "journal-upload.h" #include "journal-util.h" #include "log.h" @@ -48,6 +50,7 @@ static const char *arg_key = NULL; static const char *arg_cert = NULL; static const char *arg_trust = NULL; static const char *arg_directory = NULL; +static char **arg_headers = NULL; static char **arg_file = NULL; static const char *arg_cursor = NULL; static bool arg_after_cursor = false; @@ -63,6 +66,7 @@ static Compression arg_compression = COMPRESSION_NONE; static int arg_compression_level = -1; STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep); +STATIC_DESTRUCTOR_REGISTER(arg_headers, strv_freep); static void close_fd_input(Uploader *u); @@ -124,6 +128,48 @@ static int check_cursor_updating(Uploader *u) { return 0; } +int config_parse_header( + const char *unit, + const char *filename, + unsigned line, + const char *section, + unsigned section_line, + const char *lvalue, + int ltype, + const char *rvalue, + void *data, + void *userdata) { + + char ***headers = ASSERT_PTR(data); + char *unescaped; + int r; + + assert(filename); + assert(lvalue); + assert(rvalue); + + if (isempty(rvalue)) { + /* Empty assignment resets the list */ + *headers = strv_free(*headers); + return 0; + } + + r = cunescape(rvalue, 0, &unescaped); + if (r < 0) + return log_syntax(unit, LOG_WARNING, filename, line, r, + "Failed to unescape headers: %s", rvalue); + + if (!header_is_valid(rvalue)) + return log_syntax(unit, LOG_WARNING, filename, line, 0, + "Invalid header, ignoring: %s", rvalue); + + r = strv_header_replace_consume(headers, TAKE_PTR(unescaped)); + if (r < 0) + return log_syntax(unit, LOG_WARNING, filename, line, r, + "Failed to update headers: %s", rvalue); + return 0; +} + static int update_cursor_state(Uploader *u) { _cleanup_(unlink_and_freep) char *temp_path = NULL; _cleanup_fclose_ FILE *f = NULL; @@ -218,6 +264,13 @@ int start_upload(Uploader *u, h = l; } + STRV_FOREACH(header, arg_headers) { + l = curl_slist_append(h, *header); + if (!l) + return log_oom(); + h = l; + } + u->header = TAKE_PTR(h); } @@ -536,6 +589,7 @@ static int parse_config(void) { { "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert }, { "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust }, { "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec }, + { "Upload", "Header", config_parse_header, 0, &arg_headers }, { "Upload", "Compression", config_parse_compression, 0, &arg_compression }, { "Upload", "CompressionLevel", config_parse_int, 0, &arg_compression_level }, {} diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h index 02199c17ae1..560f04b5256 100644 --- a/src/journal-remote/journal-upload.h +++ b/src/journal-remote/journal-upload.h @@ -11,6 +11,8 @@ #include "conf-parser.h" #include "time-util.h" +CONFIG_PARSER_PROTOTYPE(config_parse_header); + typedef enum { ENTRY_CURSOR = 0, /* Nothing actually written yet. */ ENTRY_REALTIME, diff --git a/src/journal-remote/meson.build b/src/journal-remote/meson.build index 10a82751d77..2b8e52337f0 100644 --- a/src/journal-remote/meson.build +++ b/src/journal-remote/meson.build @@ -3,6 +3,7 @@ systemd_journal_upload_sources = files( 'journal-upload-journal.c', 'journal-upload.c', + 'header-util.c', ) libsystemd_journal_remote_sources = files( @@ -88,6 +89,12 @@ executables += [ }, ] +executables += [ + test_template + { + 'sources' : files('test-header-util.c', 'header-util.c'), + }, +] + in_files = [ ['journal-upload.conf', conf.get('ENABLE_REMOTE') == 1 and conf.get('HAVE_LIBCURL') == 1 and install_sysconfdir_samples], diff --git a/src/journal-remote/test-header-util.c b/src/journal-remote/test-header-util.c new file mode 100644 index 00000000000..bdec84c1dff --- /dev/null +++ b/src/journal-remote/test-header-util.c @@ -0,0 +1,39 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include "header-util.h" +#include "tests.h" + +static void check_headers_consume(char ***headers, const char *header, int idx) { + size_t prevLen; + _cleanup_free_ char *in = NULL; + + prevLen = strv_length(*headers); + in = strdup(header); + ASSERT_NOT_NULL(in); + if (idx >= 0) { + ASSERT_OK(strv_header_replace_consume(headers, TAKE_PTR(in))); + ASSERT_STREQ((*headers)[idx], header); + } else { + ASSERT_ERROR(strv_header_replace_consume(headers, TAKE_PTR(in)), EINVAL); + ASSERT_TRUE(strv_length(*headers) == prevLen); + } +} + +TEST(strv_header_replace_consume) { + _cleanup_strv_free_ char **headers = NULL; + + headers = strv_new("Name: Value1", "Other_Name: Value2"); + ASSERT_NOT_NULL(headers); + + check_headers_consume(&headers, "NewName: Val", 2); + check_headers_consume(&headers, "Name: Rewrite", 0); + check_headers_consume(&headers, "InvalidN@me: test", -1); +} + +TEST(header_is_valid) { + ASSERT_TRUE(header_is_valid("Name: Value1")); + ASSERT_TRUE(header_is_valid("Other_Name: Value2")); + ASSERT_FALSE(header_is_valid("N@me: Val")); +} + +DEFINE_TEST_MAIN(LOG_DEBUG); From 1f2e03ce63f6f6a0ddf3db8be11d54fa7d243214 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Wed, 16 Oct 2024 15:19:40 +0300 Subject: [PATCH 4/4] journal-remote: added batch max events, batch max size, batch timeout params --- man/journal-upload.conf.xml | 24 +++++++++ src/journal-remote/journal-upload-journal.c | 17 ++++-- src/journal-remote/journal-upload.c | 60 +++++++++++++++++++-- src/journal-remote/journal-upload.h | 4 ++ 4 files changed, 97 insertions(+), 8 deletions(-) diff --git a/man/journal-upload.conf.xml b/man/journal-upload.conf.xml index 1f64ae38782..7e891dbfb48 100644 --- a/man/journal-upload.conf.xml +++ b/man/journal-upload.conf.xml @@ -60,6 +60,30 @@ + + BatchMaxBytes= + + Max payload size of journal entries batch in a single HTTP request. + + + + + + BatchMaxEntries= + + Max number of journal entries in a batch in a single HTTP request. + + + + + + BatchTimeoutSec= + + Max time in seconds to wait since previous request before sending data. + + + + Compression= diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c index abaa50def63..24d62d14a99 100644 --- a/src/journal-remote/journal-upload-journal.c +++ b/src/journal-remote/journal-upload-journal.c @@ -255,23 +255,28 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void int r; sd_journal *j; size_t filled = 0; + size_t read_size = 0; ssize_t w; assert(nmemb <= SSIZE_MAX / size); + read_size = size * nmemb; + if (read_size > u->bytes_left) + read_size = u->bytes_left; + check_update_watchdog(u); j = u->journal; if (u->compression != COMPRESSION_NONE) { - compression_buffer = malloc_multiply(nmemb, size); + compression_buffer = malloc(read_size); if (!compression_buffer) { log_oom(); return CURL_READFUNC_ABORT; } } - while (j && filled < size * nmemb) { + while (j && filled < read_size && u->uploading) { if (u->entry_state == ENTRY_DONE) { r = sd_journal_next(j); if (r < 0) { @@ -293,7 +298,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void u->entry_state = ENTRY_CURSOR; } - w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u); + w = write_entry((compression_buffer ?: (char*) buf) + filled, read_size - filled, u); if (w < 0) return CURL_READFUNC_ABORT; filled += w; @@ -307,11 +312,15 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void log_debug("Entry %zu (%s) has been uploaded.", u->entries_sent, u->current_cursor); + + u->entries_left--; + if (u->entries_left <= 1) + u->uploading = false; } if (filled > 0 && u->compression != COMPRESSION_NONE) { size_t compressed_size = 0; - r = compress_blob(u->compression, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression_level); + r = compress_blob(u->compression, compression_buffer, filled, buf, read_size, &compressed_size, u->compression_level); if (r < 0) { log_error_errno(r, "Failed to compress %zu bytes using Compression=%s, CompressionLevel=%d)", filled, compression_to_string(u->compression), u->compression_level); diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index 20743868caa..4e826328530 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -64,6 +64,9 @@ static const char *arg_save_state = NULL; static usec_t arg_network_timeout_usec = USEC_INFINITY; static Compression arg_compression = COMPRESSION_NONE; static int arg_compression_level = -1; +static uint64_t arg_batch_max_bytes = UINT64_MAX; +static uint64_t arg_batch_max_entries = UINT64_MAX; +static usec_t arg_batch_timeout_usec = USEC_INFINITY; STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep); STATIC_DESTRUCTOR_REGISTER(arg_headers, strv_freep); @@ -205,6 +208,26 @@ fail: return log_error_errno(r, "Failed to save state %s: %m", u->state_file); } +static int refresh_timeout(Uploader *u) { + int r; + + assert(u); + + u->bytes_left = arg_batch_max_bytes; + u->entries_left = arg_batch_max_entries; + u->uploading = true; + + if (arg_batch_timeout_usec != USEC_INFINITY) { + r = sd_event_source_set_time_relative(u->batch_timeout_event, arg_batch_timeout_usec); + if (r < 0) + return log_error_errno(r, "Failed to create a batch timer: %m"); + r = sd_event_source_set_enabled(u->batch_timeout_event, SD_EVENT_ONESHOT); + if (r < 0) + return log_error_errno(r, "sd_event_source_set_enabled() failed: %m"); + } + return 0; +} + static int load_cursor_state(Uploader *u) { int r; @@ -229,6 +252,7 @@ int start_upload(Uploader *u, size_t nmemb, void *userdata), void *data) { + int r; CURLcode code; assert(u); @@ -347,6 +371,10 @@ int start_upload(Uploader *u, u->answer = mfree(u->answer); } + r = refresh_timeout(u); + if (r < 0) + return r; + /* upload to this place */ code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url); if (code) @@ -363,6 +391,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user int r; Uploader *u = ASSERT_PTR(userp); ssize_t n; + size_t read_size; _cleanup_free_ char *compression_buffer = NULL; assert(nmemb < SSIZE_MAX / size); @@ -372,16 +401,22 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user assert(!size_multiply_overflow(size, nmemb)); + read_size = size * nmemb; + if (read_size > u->bytes_left) + read_size = u->bytes_left; + if (u->compression != COMPRESSION_NONE) { - compression_buffer = malloc_multiply(nmemb, size); + compression_buffer = malloc(read_size); if (!compression_buffer) { log_oom(); return CURL_READFUNC_ABORT; } } - n = read(u->input, compression_buffer ?: buf, size * nmemb); - log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n); + n = read(u->input, compression_buffer ?: buf, read_size); + log_debug("%s: allowed %zu, read %zd", __func__, read_size, n); + + u->bytes_left -= n; if (n > 0) { if (u->compression == COMPRESSION_NONE) @@ -389,7 +424,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user size_t compressed_size; - r = compress_blob(u->compression, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression_level); + r = compress_blob(u->compression, compression_buffer, n, buf, read_size, &compressed_size, u->compression_level); if (r < 0) { log_error_errno(r, "Failed to compress %ld bytes using Compression=%s, CompressionLevel=%d): %m", n, compression_to_string(u->compression), u->compression_level); @@ -471,6 +506,12 @@ static int open_file_for_upload(Uploader *u, const char *filename) { return r; } +static int finish_uploading(sd_event_source *s, uint64_t usec, void *userdata) { + Uploader *u = ASSERT_PTR(userdata); + u->uploading = false; + return 0; +} + static int setup_uploader(Uploader *u, const char *url, const char *state_file) { int r; const char *host, *proto = ""; @@ -516,6 +557,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file) if (r < 0) return log_error_errno(r, "Failed to install SIGINT/SIGTERM handlers: %m"); + if (arg_batch_timeout_usec != USEC_INFINITY) { + r = sd_event_add_time_relative(u->event, &u->batch_timeout_event, CLOCK_MONOTONIC, + arg_batch_timeout_usec, 0, finish_uploading, u); + if (r < 0) + return log_error_errno(r, "Failed to start batch timer: %m"); + } + (void) sd_watchdog_enabled(false, &u->watchdog_usec); return load_cursor_state(u); @@ -534,6 +582,7 @@ static void destroy_uploader(Uploader *u) { free(u->url); u->input_event = sd_event_source_unref(u->input_event); + u->batch_timeout_event = sd_event_source_unref(u->batch_timeout_event); close_fd_input(u); close_journal_input(u); @@ -592,6 +641,9 @@ static int parse_config(void) { { "Upload", "Header", config_parse_header, 0, &arg_headers }, { "Upload", "Compression", config_parse_compression, 0, &arg_compression }, { "Upload", "CompressionLevel", config_parse_int, 0, &arg_compression_level }, + { "Upload", "BatchMaxBytes", config_parse_iec_size, 0, &arg_batch_max_bytes }, + { "Upload", "BatchMaxEntries", config_parse_iec_size, 0, &arg_batch_max_entries }, + { "Upload", "BatchTimeoutSec", config_parse_sec, 0, &arg_batch_timeout_usec }, {} }; diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h index 560f04b5256..8df5cc2b869 100644 --- a/src/journal-remote/journal-upload.h +++ b/src/journal-remote/journal-upload.h @@ -53,6 +53,10 @@ typedef struct Uploader { /* general metrics */ const char *state_file; + uint64_t bytes_left; + uint64_t entries_left; + sd_event_source *batch_timeout_event; + size_t entries_sent; char *last_cursor, *current_cursor; usec_t watchdog_timestamp;