1
0
mirror of https://github.com/systemd/systemd.git synced 2024-10-26 08:55:40 +03:00
This commit is contained in:
Andrii Chubatiuk 2024-10-26 08:00:12 +03:00 committed by GitHub
commit 8b04fd0edb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 583 additions and 60 deletions

View File

@ -611,7 +611,7 @@ SYSTEMD_HOME_DEBUG_SUFFIX=foo \
32-bit offsets. Enabled by default.
* `$SYSTEMD_JOURNAL_COMPRESS` Takes a boolean, or one of the compression
algorithms "XZ", "LZ4", and "ZSTD". If enabled, the default compression
algorithms "xz", "lz4", and "zstd". If enabled, the default compression
algorithm set at compile time will be used when opening a new journal file.
If disabled, the journal file compression will be disabled. Note that the
compression mode of existing journal files are not changed. To make the

View File

@ -60,6 +60,70 @@
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchMaxBytes=</varname></term>
<listitem><para>Max payload size of journal entries batch in a single HTTP request.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchMaxEntries=</varname></term>
<listitem><para>Max number of journal entries in a batch in a single HTTP request.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchTimeoutSec=</varname></term>
<listitem><para>Max time in seconds to wait since previous request before sending data.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem><para>Compression algorithm to be applied to logs data before sending.
Supported values are <literal>zstd</literal>, <literal>xz</literal>, <literal>lz4</literal>, <literal>none</literal>.
Defaults to <literal>none</literal>.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>CompressionLevel=</varname></term>
<listitem><para>Specifies the compression level of data to be uploaded.
Each algorithm has it's own range of allowed integer values: xz - [1 - 9], zstd - [1 - 22], lz4 - [1 - 9].
Defaults to -1, and the default compression level for each compression algorithm will be used.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Header=</varname></term>
<listitem><para>HTTP header which should be added to each request to URL. Headers are expected to
be unquoted and it's expected to be no more than one header per each option.</para>
<para>Header name can contains alphanumeric values, "_" and "-" symbols additionally.</para>
<para>This option may be specified more than once, in which case all listed headers will be set. If
the same header name is listed twice, the later setting will override the earlier setting.
</para>
<para>Example:
<programlisting>Header=HeaderName: HeaderValue</programlisting>
adds <literal>HeaderName</literal> header with <literal>HeaderValue</literal> to each HTTP request.
</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>ServerKeyFile=</varname></term>

View File

@ -42,6 +42,7 @@ static DLSYM_PROTOTYPE(LZ4F_freeCompressionContext) = NULL;
static DLSYM_PROTOTYPE(LZ4F_freeDecompressionContext) = NULL;
static DLSYM_PROTOTYPE(LZ4F_isError) = NULL;
DLSYM_PROTOTYPE(LZ4_compress_default) = NULL;
DLSYM_PROTOTYPE(LZ4_compress_HC) = NULL;
DLSYM_PROTOTYPE(LZ4_decompress_safe) = NULL;
DLSYM_PROTOTYPE(LZ4_decompress_safe_partial) = NULL;
DLSYM_PROTOTYPE(LZ4_versionNumber) = NULL;
@ -93,6 +94,7 @@ static DLSYM_PROTOTYPE(lzma_easy_encoder) = NULL;
static DLSYM_PROTOTYPE(lzma_end) = NULL;
static DLSYM_PROTOTYPE(lzma_stream_buffer_encode) = NULL;
static DLSYM_PROTOTYPE(lzma_stream_decoder) = NULL;
static DLSYM_PROTOTYPE(lzma_lzma_preset) = NULL;
/* We can't just do _cleanup_(sym_lzma_end) because a compiler bug makes
* this fail with:
@ -109,10 +111,10 @@ static inline void lzma_end_wrapper(lzma_stream *ls) {
#define ALIGN_8(l) ALIGN_TO(l, sizeof(size_t))
static const char* const compression_table[_COMPRESSION_MAX] = {
[COMPRESSION_NONE] = "NONE",
[COMPRESSION_XZ] = "XZ",
[COMPRESSION_LZ4] = "LZ4",
[COMPRESSION_ZSTD] = "ZSTD",
[COMPRESSION_NONE] = "none",
[COMPRESSION_XZ] = "xz",
[COMPRESSION_LZ4] = "lz4",
[COMPRESSION_ZSTD] = "zstd",
};
DEFINE_STRING_TABLE_LOOKUP(compression, Compression);
@ -141,12 +143,13 @@ int dlopen_lzma(void) {
DLSYM_ARG(lzma_easy_encoder),
DLSYM_ARG(lzma_end),
DLSYM_ARG(lzma_stream_buffer_encode),
DLSYM_ARG(lzma_lzma_preset),
DLSYM_ARG(lzma_stream_decoder));
}
#endif
int compress_blob_xz(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {
assert(src);
assert(src_size > 0);
@ -155,12 +158,12 @@ int compress_blob_xz(const void *src, uint64_t src_size,
assert(dst_size);
#if HAVE_XZ
static const lzma_options_lzma opt = {
lzma_options_lzma opt = {
1u << 20u, NULL, 0, LZMA_LC_DEFAULT, LZMA_LP_DEFAULT,
LZMA_PB_DEFAULT, LZMA_MODE_FAST, 128, LZMA_MF_HC3, 4
};
static const lzma_filter filters[] = {
{ LZMA_FILTER_LZMA2, (lzma_options_lzma*) &opt },
const lzma_filter filters[] = {
{ LZMA_FILTER_LZMA2, &opt },
{ LZMA_VLI_UNKNOWN, NULL }
};
lzma_ret ret;
@ -171,6 +174,12 @@ int compress_blob_xz(const void *src, uint64_t src_size,
if (r < 0)
return r;
if (level >= 0) {
r = sym_lzma_lzma_preset(&opt, (uint32_t) level);
if (r < 0)
return r;
}
/* Returns < 0 if we couldn't compress the data or the
* compressed result is longer than the original */
@ -210,6 +219,7 @@ int dlopen_lz4(void) {
DLSYM_ARG(LZ4F_freeDecompressionContext),
DLSYM_ARG(LZ4F_isError),
DLSYM_ARG(LZ4_compress_default),
DLSYM_ARG(LZ4_compress_HC),
DLSYM_ARG(LZ4_decompress_safe),
DLSYM_ARG(LZ4_decompress_safe_partial),
DLSYM_ARG(LZ4_versionNumber));
@ -217,7 +227,7 @@ int dlopen_lz4(void) {
#endif
int compress_blob_lz4(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {
assert(src);
assert(src_size > 0);
@ -237,7 +247,10 @@ int compress_blob_lz4(const void *src, uint64_t src_size,
if (src_size < 9)
return -ENOBUFS;
r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8);
if (level <= 0)
r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8);
else
r = sym_LZ4_compress_HC(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8, level);
if (r <= 0)
return -ENOBUFS;
@ -281,7 +294,7 @@ int dlopen_zstd(void) {
int compress_blob_zstd(
const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {
assert(src);
assert(src_size > 0);
@ -297,7 +310,10 @@ int compress_blob_zstd(
if (r < 0)
return r;
k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, 0);
if (level < 0)
level = 0;
k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, level);
if (sym_ZSTD_isError(k))
return zstd_ret_to_errno(k);

View File

@ -8,6 +8,7 @@
#if HAVE_LZ4
#include <lz4.h>
#include <lz4hc.h>
#include <lz4frame.h>
#endif
@ -28,11 +29,11 @@ Compression compression_from_string(const char *compression);
bool compression_supported(Compression c);
int compress_blob_xz(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
int compress_blob_lz4(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
int compress_blob_zstd(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
int decompress_blob_xz(const void *src, uint64_t src_size,
void **dst, size_t* dst_size, size_t dst_max);
@ -90,15 +91,15 @@ int dlopen_lzma(void);
static inline int compress_blob(
Compression compression,
const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {
switch (compression) {
case COMPRESSION_ZSTD:
return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size, level);
case COMPRESSION_LZ4:
return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size, level);
case COMPRESSION_XZ:
return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size, level);
default:
return -EOPNOTSUPP;
}

View File

@ -42,7 +42,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}
size_t csize;
r = compress_blob(alg, h->data, data_len, buf, size, &csize);
r = compress_blob(alg, h->data, data_len, buf, size, &csize, /* level = */ -1);
if (r < 0) {
log_error_errno(r, "Compression failed: %m");
return 0;

View File

@ -0,0 +1,170 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include "header-util.h"
#include "strv.h"
/* HTTP header name can contains:
- Alphanumeric characters: a-z, A-Z, and 0-9
- The following special characters: - and _ */
#define VALID_HEADER_NAME_CHARS \
DIGITS LETTERS "_" \
"-"
#define VALID_HEADER_NAME_LENGTH 40
#define VALID_HEADER_VALUE_CHARS \
DIGITS LETTERS "_" \
" " \
":" \
";" \
"." \
"," \
"\\" \
"/" \
"'" \
"\"" \
"?" \
"!" \
"(" \
")" \
"{" \
"}" \
"[" \
"]" \
"@" \
"<" \
">" \
"=" \
"-" \
"+" \
"*" \
"#" \
"$" \
"&" \
"`" \
"|" \
"~" \
"^" \
"%"
static bool header_name_is_valid(const char *e, size_t n) {
if (!e)
return false;
if (n > VALID_HEADER_NAME_LENGTH)
return false;
for (const char *p = e; p < e + n; p++)
if (!strchr(VALID_HEADER_NAME_CHARS, *p))
return false;
return true;
}
static bool header_value_is_valid(const char *e) {
if (!e)
return false;
int n = strlen_ptr(e);
if (n < 0)
return false;
for (const char *p = e; p < e + n; p++)
if (!strchr(VALID_HEADER_VALUE_CHARS, *p))
return false;
return true;
}
bool header_is_valid(const char *e) {
const char *eq;
eq = strchr(e, ':');
if (!eq)
return false;
if (!header_name_is_valid(e, eq - e))
return false;
if (!header_value_is_valid(skip_leading_chars(eq + 1, WHITESPACE)))
return false;
return true;
}
static bool header_entry_has_name(const char *entry, const char *name) {
const char *t;
assert(entry);
assert(name);
t = startswith(entry, name);
if (!t)
return false;
return *t == ':';
}
static char **strv_header_unset(char **l, const char *name) {
assert(name);
if (!l)
return NULL;
/* Drops every occurrence of the header var setting p in the
* string list. Edits in-place. */
char **f, **t;
for (f = t = l; *f; f++) {
if (header_entry_has_name(*f, name)) {
free(*f);
continue;
}
*(t++) = *f;
}
*t = NULL;
return l;
}
int strv_header_replace_consume(char ***l, char *p) {
const char *t, *name;
int r;
assert(p);
/* p must be a valid "key: value" assignment. */
t = strchr(p, ':');
if (!t) {
free(p);
return -EINVAL;
}
if (!header_is_valid(p)) {
free(p);
return -EINVAL;
}
name = strndupa(p, t - p);
if (!name) {
free(p);
return -ENOMEM;
}
STRV_FOREACH(f, *l)
if (header_entry_has_name(*f, name)) {
free_and_replace(*f, p);
strv_header_unset(f + 1, name);
return 0;
}
/* We didn't find a match, we need to append p or create a new strv */
r = strv_consume(l, p);
if (r < 0)
return r;
return 1;
}

View File

@ -0,0 +1,7 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
#include <stdbool.h>
bool header_is_valid(const char *e);
int strv_header_replace_consume(char ***l, char *p);

View File

@ -251,18 +251,32 @@ static void check_update_watchdog(Uploader *u) {
static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
Uploader *u = ASSERT_PTR(userp);
_cleanup_free_ char *compression_buffer = NULL;
int r;
sd_journal *j;
size_t filled = 0;
size_t read_size = 0;
ssize_t w;
assert(nmemb <= SSIZE_MAX / size);
read_size = size * nmemb;
if (read_size > u->bytes_left)
read_size = u->bytes_left;
check_update_watchdog(u);
j = u->journal;
while (j && filled < size * nmemb) {
if (u->compression != COMPRESSION_NONE) {
compression_buffer = malloc(read_size);
if (!compression_buffer) {
log_oom();
return CURL_READFUNC_ABORT;
}
}
while (j && filled < read_size && u->uploading) {
if (u->entry_state == ENTRY_DONE) {
r = sd_journal_next(j);
if (r < 0) {
@ -284,7 +298,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
u->entry_state = ENTRY_CURSOR;
}
w = write_entry((char*)buf + filled, size * nmemb - filled, u);
w = write_entry((compression_buffer ?: (char*) buf) + filled, read_size - filled, u);
if (w < 0)
return CURL_READFUNC_ABORT;
filled += w;
@ -298,6 +312,21 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
log_debug("Entry %zu (%s) has been uploaded.",
u->entries_sent, u->current_cursor);
u->entries_left--;
if (u->entries_left <= 1)
u->uploading = false;
}
if (filled > 0 && u->compression != COMPRESSION_NONE) {
size_t compressed_size = 0;
r = compress_blob(u->compression, compression_buffer, filled, buf, read_size, &compressed_size, u->compression_level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zu bytes using Compression=%s, CompressionLevel=%d)",
filled, compression_to_string(u->compression), u->compression_level);
return CURL_READFUNC_ABORT;
}
return compressed_size;
}
return filled;

View File

@ -11,15 +11,18 @@
#include "alloc-util.h"
#include "build.h"
#include "compress.h"
#include "conf-parser.h"
#include "constants.h"
#include "daemon-util.h"
#include "env-file.h"
#include "escape.h"
#include "fd-util.h"
#include "fileio.h"
#include "format-util.h"
#include "fs-util.h"
#include "glob-util.h"
#include "header-util.h"
#include "journal-upload.h"
#include "journal-util.h"
#include "log.h"
@ -42,11 +45,12 @@
#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
#define DEFAULT_PORT 19532
static const char* arg_url = NULL;
static const char *arg_url = NULL;
static const char *arg_key = NULL;
static const char *arg_cert = NULL;
static const char *arg_trust = NULL;
static const char *arg_directory = NULL;
static char **arg_headers = NULL;
static char **arg_file = NULL;
static const char *arg_cursor = NULL;
static bool arg_after_cursor = false;
@ -58,8 +62,14 @@ static bool arg_merge = false;
static int arg_follow = -1;
static const char *arg_save_state = NULL;
static usec_t arg_network_timeout_usec = USEC_INFINITY;
static Compression arg_compression = COMPRESSION_NONE;
static int arg_compression_level = -1;
static uint64_t arg_batch_max_bytes = UINT64_MAX;
static uint64_t arg_batch_max_entries = UINT64_MAX;
static usec_t arg_batch_timeout_usec = USEC_INFINITY;
STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
STATIC_DESTRUCTOR_REGISTER(arg_headers, strv_freep);
static void close_fd_input(Uploader *u);
@ -121,6 +131,48 @@ static int check_cursor_updating(Uploader *u) {
return 0;
}
int config_parse_header(
const char *unit,
const char *filename,
unsigned line,
const char *section,
unsigned section_line,
const char *lvalue,
int ltype,
const char *rvalue,
void *data,
void *userdata) {
char ***headers = ASSERT_PTR(data);
char *unescaped;
int r;
assert(filename);
assert(lvalue);
assert(rvalue);
if (isempty(rvalue)) {
/* Empty assignment resets the list */
*headers = strv_free(*headers);
return 0;
}
r = cunescape(rvalue, 0, &unescaped);
if (r < 0)
return log_syntax(unit, LOG_WARNING, filename, line, r,
"Failed to unescape headers: %s", rvalue);
if (!header_is_valid(rvalue))
return log_syntax(unit, LOG_WARNING, filename, line, 0,
"Invalid header, ignoring: %s", rvalue);
r = strv_header_replace_consume(headers, TAKE_PTR(unescaped));
if (r < 0)
return log_syntax(unit, LOG_WARNING, filename, line, r,
"Failed to update headers: %s", rvalue);
return 0;
}
static int update_cursor_state(Uploader *u) {
_cleanup_(unlink_and_freep) char *temp_path = NULL;
_cleanup_fclose_ FILE *f = NULL;
@ -156,6 +208,26 @@ fail:
return log_error_errno(r, "Failed to save state %s: %m", u->state_file);
}
static int refresh_timeout(Uploader *u) {
int r;
assert(u);
u->bytes_left = arg_batch_max_bytes;
u->entries_left = arg_batch_max_entries;
u->uploading = true;
if (arg_batch_timeout_usec != USEC_INFINITY) {
r = sd_event_source_set_time_relative(u->batch_timeout_event, arg_batch_timeout_usec);
if (r < 0)
return log_error_errno(r, "Failed to create a batch timer: %m");
r = sd_event_source_set_enabled(u->batch_timeout_event, SD_EVENT_ONESHOT);
if (r < 0)
return log_error_errno(r, "sd_event_source_set_enabled() failed: %m");
}
return 0;
}
static int load_cursor_state(Uploader *u) {
int r;
@ -180,6 +252,7 @@ int start_upload(Uploader *u,
size_t nmemb,
void *userdata),
void *data) {
int r;
CURLcode code;
assert(u);
@ -203,6 +276,25 @@ int start_upload(Uploader *u,
return log_oom();
h = l;
if (arg_compression != COMPRESSION_NONE) {
_cleanup_free_ char *header = NULL;
header = strjoin("Content-Encoding: ", compression_to_string(arg_compression));
if (!header)
return log_oom();
l = curl_slist_append(h, header);
if (!l)
return log_oom();
h = l;
}
STRV_FOREACH(header, arg_headers) {
l = curl_slist_append(h, *header);
if (!l)
return log_oom();
h = l;
}
u->header = TAKE_PTR(h);
}
@ -279,6 +371,10 @@ int start_upload(Uploader *u,
u->answer = mfree(u->answer);
}
r = refresh_timeout(u);
if (r < 0)
return r;
/* upload to this place */
code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
if (code)
@ -292,8 +388,11 @@ int start_upload(Uploader *u,
}
static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
int r;
Uploader *u = ASSERT_PTR(userp);
ssize_t n;
size_t read_size;
_cleanup_free_ char *compression_buffer = NULL;
assert(nmemb < SSIZE_MAX / size);
@ -302,10 +401,37 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
assert(!size_multiply_overflow(size, nmemb));
n = read(u->input, buf, size * nmemb);
log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n);
if (n > 0)
return n;
read_size = size * nmemb;
if (read_size > u->bytes_left)
read_size = u->bytes_left;
if (u->compression != COMPRESSION_NONE) {
compression_buffer = malloc(read_size);
if (!compression_buffer) {
log_oom();
return CURL_READFUNC_ABORT;
}
}
n = read(u->input, compression_buffer ?: buf, read_size);
log_debug("%s: allowed %zu, read %zd", __func__, read_size, n);
u->bytes_left -= n;
if (n > 0) {
if (u->compression == COMPRESSION_NONE)
return n;
size_t compressed_size;
r = compress_blob(u->compression, compression_buffer, n, buf, read_size, &compressed_size, u->compression_level);
if (r < 0) {
log_error_errno(r, "Failed to compress %ld bytes using Compression=%s, CompressionLevel=%d): %m",
n, compression_to_string(u->compression), u->compression_level);
return CURL_READFUNC_ABORT;
}
return compressed_size;
}
u->uploading = false;
if (n < 0) {
@ -380,6 +506,12 @@ static int open_file_for_upload(Uploader *u, const char *filename) {
return r;
}
static int finish_uploading(sd_event_source *s, uint64_t usec, void *userdata) {
Uploader *u = ASSERT_PTR(userdata);
u->uploading = false;
return 0;
}
static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
int r;
const char *host, *proto = "";
@ -389,6 +521,8 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
*u = (Uploader) {
.input = -1,
.compression = arg_compression,
.compression_level = arg_compression_level,
};
host = STARTSWITH_SET(url, "http://", "https://");
@ -423,6 +557,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
if (r < 0)
return log_error_errno(r, "Failed to install SIGINT/SIGTERM handlers: %m");
if (arg_batch_timeout_usec != USEC_INFINITY) {
r = sd_event_add_time_relative(u->event, &u->batch_timeout_event, CLOCK_MONOTONIC,
arg_batch_timeout_usec, 0, finish_uploading, u);
if (r < 0)
return log_error_errno(r, "Failed to start batch timer: %m");
}
(void) sd_watchdog_enabled(false, &u->watchdog_usec);
return load_cursor_state(u);
@ -441,6 +582,7 @@ static void destroy_uploader(Uploader *u) {
free(u->url);
u->input_event = sd_event_source_unref(u->input_event);
u->batch_timeout_event = sd_event_source_unref(u->batch_timeout_event);
close_fd_input(u);
close_journal_input(u);
@ -496,6 +638,12 @@ static int parse_config(void) {
{ "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert },
{ "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust },
{ "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec },
{ "Upload", "Header", config_parse_header, 0, &arg_headers },
{ "Upload", "Compression", config_parse_compression, 0, &arg_compression },
{ "Upload", "CompressionLevel", config_parse_int, 0, &arg_compression_level },
{ "Upload", "BatchMaxBytes", config_parse_iec_size, 0, &arg_batch_max_bytes },
{ "Upload", "BatchMaxEntries", config_parse_iec_size, 0, &arg_batch_max_entries },
{ "Upload", "BatchTimeoutSec", config_parse_sec, 0, &arg_batch_timeout_usec },
{}
};

View File

@ -7,8 +7,12 @@
#include "sd-event.h"
#include "sd-journal.h"
#include "compress.h"
#include "conf-parser.h"
#include "time-util.h"
CONFIG_PARSER_PROTOTYPE(config_parse_header);
typedef enum {
ENTRY_CURSOR = 0, /* Nothing actually written yet. */
ENTRY_REALTIME,
@ -49,10 +53,16 @@ typedef struct Uploader {
/* general metrics */
const char *state_file;
uint64_t bytes_left;
uint64_t entries_left;
sd_event_source *batch_timeout_event;
size_t entries_sent;
char *last_cursor, *current_cursor;
usec_t watchdog_timestamp;
usec_t watchdog_usec;
Compression compression;
int compression_level;
} Uploader;
#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)

View File

@ -3,6 +3,7 @@
systemd_journal_upload_sources = files(
'journal-upload-journal.c',
'journal-upload.c',
'header-util.c',
)
libsystemd_journal_remote_sources = files(
@ -88,6 +89,12 @@ executables += [
},
]
executables += [
test_template + {
'sources' : files('test-header-util.c', 'header-util.c'),
},
]
in_files = [
['journal-upload.conf',
conf.get('ENABLE_REMOTE') == 1 and conf.get('HAVE_LIBCURL') == 1 and install_sysconfdir_samples],

View File

@ -0,0 +1,39 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include "header-util.h"
#include "tests.h"
static void check_headers_consume(char ***headers, const char *header, int idx) {
size_t prevLen;
_cleanup_free_ char *in = NULL;
prevLen = strv_length(*headers);
in = strdup(header);
ASSERT_NOT_NULL(in);
if (idx >= 0) {
ASSERT_OK(strv_header_replace_consume(headers, TAKE_PTR(in)));
ASSERT_STREQ((*headers)[idx], header);
} else {
ASSERT_ERROR(strv_header_replace_consume(headers, TAKE_PTR(in)), EINVAL);
ASSERT_TRUE(strv_length(*headers) == prevLen);
}
}
TEST(strv_header_replace_consume) {
_cleanup_strv_free_ char **headers = NULL;
headers = strv_new("Name: Value1", "Other_Name: Value2");
ASSERT_NOT_NULL(headers);
check_headers_consume(&headers, "NewName: Val", 2);
check_headers_consume(&headers, "Name: Rewrite", 0);
check_headers_consume(&headers, "InvalidN@me: test", -1);
}
TEST(header_is_valid) {
ASSERT_TRUE(header_is_valid("Name: Value1"));
ASSERT_TRUE(header_is_valid("Other_Name: Value2"));
ASSERT_FALSE(header_is_valid("N@me: Val"));
}
DEFINE_TEST_MAIN(LOG_DEBUG);

View File

@ -1810,7 +1810,7 @@ static int maybe_compress_payload(JournalFile *f, uint8_t *dst, const uint8_t *s
if (c == COMPRESSION_NONE || size < f->compress_threshold_bytes)
return 0;
r = compress_blob(c, src, size, dst, size - 1, rsize);
r = compress_blob(c, src, size, dst, size - 1, rsize, /* level = */ -1);
if (r < 0)
return log_debug_errno(r, "Failed to compress data object using %s, ignoring: %m", compression_to_string(c));

View File

@ -11,6 +11,7 @@
#include "alloc-util.h"
#include "chase.h"
#include "calendarspec.h"
#include "compress.h"
#include "conf-files.h"
#include "conf-parser.h"
#include "constants.h"
@ -1385,6 +1386,36 @@ int config_parse_warn_compat(
return 0;
}
int config_parse_compression(
const char *unit,
const char *filename,
unsigned line,
const char *section,
unsigned section_line,
const char *lvalue,
int ltype,
const char *rvalue,
void *data,
void *userdata) {
char *compression = ASSERT_PTR(data);
if (isempty(rvalue)) {
*compression = COMPRESSION_NONE;
return 0;
}
Compression c = compression_from_string(rvalue);
if (c < 0)
return log_syntax_parse_error(unit, filename, line, c, lvalue, rvalue);
if (!compression_supported(c))
return log_syntax(unit, LOG_WARNING, filename, line, 0,
"Compression=%s is not supported on a system", rvalue);
*compression = c;
return 0;
}
int config_parse_log_facility(
const char *unit,
const char *filename,

View File

@ -288,6 +288,7 @@ CONFIG_PARSER_PROTOTYPE(config_parse_sec_def_unset);
CONFIG_PARSER_PROTOTYPE(config_parse_nsec);
CONFIG_PARSER_PROTOTYPE(config_parse_mode);
CONFIG_PARSER_PROTOTYPE(config_parse_warn_compat);
CONFIG_PARSER_PROTOTYPE(config_parse_compression);
CONFIG_PARSER_PROTOTYPE(config_parse_log_facility);
CONFIG_PARSER_PROTOTYPE(config_parse_log_level);
CONFIG_PARSER_PROTOTYPE(config_parse_signal);

View File

@ -13,7 +13,7 @@
#include "tests.h"
typedef int (compress_t)(const void *src, uint64_t src_size, void *dst,
size_t dst_alloc_size, size_t *dst_size);
size_t dst_alloc_size, size_t *dst_size, int level);
typedef int (decompress_t)(const void *src, uint64_t src_size,
void **dst, size_t* dst_size, size_t dst_max);
@ -100,7 +100,7 @@ static void test_compress_decompress(const char* label, const char* type,
memzero(buf, MIN(size + 1000, MAX_SIZE));
r = compress(text, size, buf, size, &j);
r = compress(text, size, buf, size, &j, /* level = */ -1);
/* assume compression must be successful except for small or random inputs */
assert_se(r >= 0 || (size < 2048 && r == -ENOBUFS) || streq(type, "random"));
@ -160,13 +160,13 @@ int main(int argc, char *argv[]) {
NULSTR_FOREACH(i, "zeros\0simple\0random\0") {
#if HAVE_XZ
test_compress_decompress("XZ", i, compress_blob_xz, decompress_blob_xz);
test_compress_decompress("xz", i, compress_blob_xz, decompress_blob_xz);
#endif
#if HAVE_LZ4
test_compress_decompress("LZ4", i, compress_blob_lz4, decompress_blob_lz4);
test_compress_decompress("lz4", i, compress_blob_lz4, decompress_blob_lz4);
#endif
#if HAVE_ZSTD
test_compress_decompress("ZSTD", i, compress_blob_zstd, decompress_blob_zstd);
test_compress_decompress("zstd", i, compress_blob_zstd, decompress_blob_zstd);
#endif
}
return 0;

View File

@ -33,7 +33,7 @@
#define HUGE_SIZE (4096*1024)
typedef int (compress_blob_t)(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
typedef int (decompress_blob_t)(const void *src, uint64_t src_size,
void **dst,
size_t* dst_size, size_t dst_max);
@ -62,7 +62,7 @@ _unused_ static void test_compress_decompress(
log_info("/* testing %s %s blob compression/decompression */",
compression, data);
r = compress(data, data_len, compressed, sizeof(compressed), &csize);
r = compress(data, data_len, compressed, sizeof(compressed), &csize, /* level = */ -1);
if (r == -ENOBUFS) {
log_info_errno(r, "compression failed: %m");
assert_se(may_fail);
@ -111,14 +111,14 @@ _unused_ static void test_decompress_startswith(const char *compression,
compressed = compressed1 = malloc(BUFSIZE_1);
assert_se(compressed1);
r = compress(data, data_len, compressed, BUFSIZE_1, &csize);
r = compress(data, data_len, compressed, BUFSIZE_1, &csize, /* level = */ -1);
if (r == -ENOBUFS) {
log_info_errno(r, "compression failed: %m");
assert_se(may_fail);
compressed = compressed2 = malloc(BUFSIZE_2);
assert_se(compressed2);
r = compress(data, data_len, compressed, BUFSIZE_2, &csize);
r = compress(data, data_len, compressed, BUFSIZE_2, &csize, /* level = */ -1);
}
assert_se(r >= 0);
@ -150,7 +150,7 @@ _unused_ static void test_decompress_startswith_short(const char *compression,
log_info("/* %s with %s */", __func__, compression);
r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize);
r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize, /* level = */ -1);
assert_se(r >= 0);
for (size_t i = 1; i < strlen(TEXT); i++) {
@ -292,25 +292,25 @@ int main(int argc, char *argv[]) {
random_bytes(data + 7, sizeof(data) - 7);
#if HAVE_XZ
test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz,
test_compress_decompress("xz", compress_blob_xz, decompress_blob_xz,
text, sizeof(text), false);
test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz,
test_compress_decompress("xz", compress_blob_xz, decompress_blob_xz,
data, sizeof(data), true);
test_decompress_startswith("XZ",
test_decompress_startswith("xz",
compress_blob_xz, decompress_startswith_xz,
text, sizeof(text), false);
test_decompress_startswith("XZ",
test_decompress_startswith("xz",
compress_blob_xz, decompress_startswith_xz,
data, sizeof(data), true);
test_decompress_startswith("XZ",
test_decompress_startswith("xz",
compress_blob_xz, decompress_startswith_xz,
huge, HUGE_SIZE, true);
test_compress_stream("XZ", "xzcat",
test_compress_stream("xz", "xzcat",
compress_stream_xz, decompress_stream_xz, srcfile);
test_decompress_startswith_short("XZ", compress_blob_xz, decompress_startswith_xz);
test_decompress_startswith_short("xz", compress_blob_xz, decompress_startswith_xz);
#else
log_info("/* XZ test skipped */");
@ -318,27 +318,27 @@ int main(int argc, char *argv[]) {
#if HAVE_LZ4
if (dlopen_lz4() >= 0) {
test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4,
test_compress_decompress("lz4", compress_blob_lz4, decompress_blob_lz4,
text, sizeof(text), false);
test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4,
test_compress_decompress("lz4", compress_blob_lz4, decompress_blob_lz4,
data, sizeof(data), true);
test_decompress_startswith("LZ4",
test_decompress_startswith("lz4",
compress_blob_lz4, decompress_startswith_lz4,
text, sizeof(text), false);
test_decompress_startswith("LZ4",
test_decompress_startswith("lz4",
compress_blob_lz4, decompress_startswith_lz4,
data, sizeof(data), true);
test_decompress_startswith("LZ4",
test_decompress_startswith("lz4",
compress_blob_lz4, decompress_startswith_lz4,
huge, HUGE_SIZE, true);
test_compress_stream("LZ4", "lz4cat",
test_compress_stream("lz4", "lz4cat",
compress_stream_lz4, decompress_stream_lz4, srcfile);
test_lz4_decompress_partial();
test_decompress_startswith_short("LZ4", compress_blob_lz4, decompress_startswith_lz4);
test_decompress_startswith_short("lz4", compress_blob_lz4, decompress_startswith_lz4);
} else
log_error("/* Can't load liblz4 */");
#else
@ -346,25 +346,25 @@ int main(int argc, char *argv[]) {
#endif
#if HAVE_ZSTD
test_compress_decompress("ZSTD", compress_blob_zstd, decompress_blob_zstd,
test_compress_decompress("zstd", compress_blob_zstd, decompress_blob_zstd,
text, sizeof(text), false);
test_compress_decompress("ZSTD", compress_blob_zstd, decompress_blob_zstd,
test_compress_decompress("zstd", compress_blob_zstd, decompress_blob_zstd,
data, sizeof(data), true);
test_decompress_startswith("ZSTD",
test_decompress_startswith("zstd",
compress_blob_zstd, decompress_startswith_zstd,
text, sizeof(text), false);
test_decompress_startswith("ZSTD",
test_decompress_startswith("zstd",
compress_blob_zstd, decompress_startswith_zstd,
data, sizeof(data), true);
test_decompress_startswith("ZSTD",
test_decompress_startswith("zstd",
compress_blob_zstd, decompress_startswith_zstd,
huge, HUGE_SIZE, true);
test_compress_stream("ZSTD", "zstdcat",
test_compress_stream("zstd", "zstdcat",
compress_stream_zstd, decompress_stream_zstd, srcfile);
test_decompress_startswith_short("ZSTD", compress_blob_zstd, decompress_startswith_zstd);
test_decompress_startswith_short("zstd", compress_blob_zstd, decompress_startswith_zstd);
#else
log_info("/* ZSTD test skipped */");
#endif