1
0
mirror of https://github.com/systemd/systemd.git synced 2025-03-25 18:50:18 +03:00

journal-remote: several follow-ups for compression support, and trivial cleanups (#36334)

Follow-ups for cfaf78001c3451d549bcb1ee4adca3e85b934e56 (#34822).
This commit is contained in:
Luca Boccassi 2025-02-16 14:33:47 +00:00 committed by GitHub
commit 1f1b403d03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 401 additions and 307 deletions

View File

@ -58,19 +58,6 @@
[Remote] section:</para>
<variablelist class='config-directives'>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem><para>Acceptable compression algorithms to be used by <command>systemd-journal-upload</command>. Compression algorithms are
used for <literal>Accept-Encoding</literal> header construction with priorities set according to an order in configuration.
This parameter takes space separated list of compression algorithms. Example:
<programlisting>Compression=zstd lz4</programlisting>
This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared.
</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Seal=</varname></term>
@ -142,6 +129,27 @@
<xi:include href="version-info.xml" xpointer="v253"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem>
<para>Configurs acceptable compression algorithms to be announced through
<literal>Accept-Encoding</literal> HTTP header. The header suggests
<command>systemd-journal-upload</command> to compress data to be sent. Takes a space separated
list of compression algorithms, or <literal>no</literal>. Supported algorithms are
<literal>zstd</literal>, <literal>xz</literal>, or <literal>lz4</literal>. When a list of
algorithms is specified, <literal>Accept-Encoding</literal> header will be constructed with
priorities based on the order of the algorithms in the list. When <literal>no</literal>,
<literal>Accept-Encoding</literal> header will not be sent. This option can be specified multiple
times. If an empty string is assigned, then all the previous assignments are cleared. Defaults to
unset and all supported compression algorithms will be listed in the header.</para>
<para>Example:
<programlisting>Compression=zstd lz4</programlisting></para>
<xi:include href="version-info.xml" xpointer="v258"/>
</listitem>
</varlistentry>
</variablelist>
</refsect1>

View File

@ -60,37 +60,6 @@
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem><para>Takes a space separated list of compression algorithms to be applied to logs data before sending.
Supported algorithms are <literal>none</literal>, <literal>zstd</literal>, <literal>xz</literal>,
or <literal>lz4</literal>. Optionally, each algorithm (except for <literal>none</literal>)
followed by a colon (<literal>:</literal>) and its compression level, for example <literal>zstd:4</literal>.
The compression level is expected to be a positive integer. This option can be specified multiple times.
If an empty string is assigned, then all previous assignments are cleared.
Defaults to unset, and data will not be compressed.</para>
<para>Example:
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>
<para>Even when compression is enabled, the initial requests are sent without compression.
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
compression algorithms that contains one of the algorithms specified in this option.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>ForceCompression=</varname></term>
<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
Defaults to <literal>false</literal>.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>ServerKeyFile=</varname></term>
@ -128,6 +97,40 @@
<xi:include href="version-info.xml" xpointer="v249"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem>
<para>Configures compression algorithm to be applied to logs data before sending. Takes a space
separated list of compression algorithms, or <literal>no</literal>. Supported algorithms are
<literal>zstd</literal>, <literal>xz</literal>, or <literal>lz4</literal>. Optionally, each
algorithm followed by a colon (<literal>:</literal>) and its compression level, for example
<literal>zstd:4</literal>. The compression level is expected to be a positive integer. When
<literal>no</literal> is specified, no compression algorithm will be applied to data to be sent.
This option can be specified multiple times. If an empty string is assigned, then all previous
assignments are cleared. Defaults to unset, and all supported compression algorithms with their
default compression levels are listed.</para>
<para>Example:
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>
<para>Even when compression is enabled, the initial requests are sent without compression.
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
compression algorithms that contains one of the algorithms specified in this option.</para>
<xi:include href="version-info.xml" xpointer="v258"/>
</listitem>
</varlistentry>
<varlistentry>
<term><varname>ForceCompression=</varname></term>
<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
Defaults to <literal>false</literal>.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
</variablelist>
</refsect1>

View File

@ -4,10 +4,76 @@
#include "journal-compression-util.h"
#include "parse-util.h"
void compression_args_clear(CompressionArgs *args) {
assert(args);
args->size = 0;
args->opts = mfree(args->opts);
static int compression_config_put(OrderedHashmap **configs, Compression c, int level) {
assert(configs);
if (!compression_supported(c))
return 0;
/* If the compression algorithm is already specified, update the compression level. */
CompressionConfig *cc = ordered_hashmap_get(*configs, INT_TO_PTR(c));
if (cc)
cc->level = level;
else {
_cleanup_free_ CompressionConfig *new_config = new(CompressionConfig, 1);
if (!new_config)
return log_oom();
*new_config = (CompressionConfig) {
.algorithm = c,
.level = level,
};
if (ordered_hashmap_ensure_put(configs, &trivial_hash_ops_value_free, INT_TO_PTR(c), new_config) < 0)
return log_oom();
TAKE_PTR(new_config);
}
if (c == COMPRESSION_NONE) {
/* disables all configs except for 'none' */
ORDERED_HASHMAP_FOREACH(cc, *configs)
if (cc->algorithm != COMPRESSION_NONE)
free(ordered_hashmap_remove(*configs, INT_TO_PTR(cc->algorithm)));
} else
/* otherwise, drop 'none' if stored. */
free(ordered_hashmap_get(*configs, INT_TO_PTR(COMPRESSION_NONE)));
return 1;
}
int compression_configs_mangle(OrderedHashmap **configs) {
int r;
/* When compression is explicitly disabled, then free the list. */
if (ordered_hashmap_contains(*configs, INT_TO_PTR(COMPRESSION_NONE))) {
*configs = ordered_hashmap_free(*configs);
return 0;
}
/* When compression algorithms are exlicitly specifed, then honor the list. */
if (!ordered_hashmap_isempty(*configs))
return 0;
/* If nothing specified, then list all supported algorithms with the default compression level. */
_cleanup_(ordered_hashmap_freep) OrderedHashmap *h = NULL;
/* First, put the default algorithm. */
if (DEFAULT_COMPRESSION != COMPRESSION_NONE) {
r = compression_config_put(&h, DEFAULT_COMPRESSION, -1);
if (r < 0)
return r;
}
/* Then, list all other algorithms. */
for (Compression c = 1; c < _COMPRESSION_MAX; c++) {
r = compression_config_put(&h, c, -1);
if (r < 0)
return r;
}
return free_and_replace_full(*configs, h, ordered_hashmap_free);
}
int config_parse_compression(
@ -22,21 +88,22 @@ int config_parse_compression(
void *data,
void *userdata) {
CompressionArgs *args = ASSERT_PTR(data);
OrderedHashmap **configs = ASSERT_PTR(data);
bool parse_level = ltype;
int r;
assert(filename);
assert(lvalue);
assert(rvalue);
if (isempty(rvalue)) {
compression_args_clear(args);
/* an empty string clears the previous assignments. */
*configs = ordered_hashmap_free(*configs);
return 1;
}
if (parse_boolean(rvalue) == 0)
/* 'no' disables compression. To indicate that, store 'none'. */
return compression_config_put(configs, COMPRESSION_NONE, -1);
for (const char *p = rvalue;;) {
_cleanup_free_ char *algorithm = NULL, *word = NULL;
_cleanup_free_ char *word = NULL;
int level = -1;
r = extract_first_word(&p, &word, NULL, 0);
@ -46,46 +113,28 @@ int config_parse_compression(
return 1;
if (parse_level) {
const char *q = word;
r = extract_first_word(&q, &algorithm, ":", 0);
if (r < 0)
return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
if (!isempty(q)) {
char *q = strchr(word, ':');
if (q) {
*q++ = '\0';
r = safe_atoi(q, &level);
if (r < 0) {
log_syntax(unit, LOG_WARNING, filename, line, r,
"Compression level %s should be positive, ignoring.", q);
"Compression level must be positive, ignoring: %s", q);
continue;
}
}
} else
algorithm = TAKE_PTR(word);
}
Compression c = compression_lowercase_from_string(algorithm);
if (c < 0 || !compression_supported(c)) {
Compression c = compression_lowercase_from_string(word);
if (c <= 0 || !compression_supported(c)) {
log_syntax(unit, LOG_WARNING, filename, line, c,
"Compression=%s is not supported on a system, ignoring.", algorithm);
"Compression algorithm '%s' is not supported on the system, ignoring.", word);
continue;
}
bool found = false;
FOREACH_ARRAY(opt, args->opts, args->size)
if (opt->algorithm == c) {
found = true;
if (parse_level)
opt->level = level;
break;
}
if (found)
continue;
if (!GREEDY_REALLOC(args->opts, args->size + 1))
return log_oom();
args->opts[args->size++] = (CompressionOpts) {
.algorithm = c,
.level = level,
};
r = compression_config_put(configs, c, level);
if (r < 0)
return r;
}
}

View File

@ -3,17 +3,13 @@
#include "compress.h"
#include "conf-parser.h"
#include "hashmap.h"
typedef struct CompressionOpts {
typedef struct CompressionConfig {
Compression algorithm;
int level;
} CompressionOpts;
} CompressionConfig;
typedef struct CompressionArgs {
CompressionOpts *opts;
size_t size;
} CompressionArgs;
int compression_configs_mangle(OrderedHashmap **configs);
CONFIG_PARSER_PROTOTYPE(config_parse_compression);
void compression_args_clear(CompressionArgs *args);

View File

@ -43,12 +43,13 @@ static char *arg_cert_pem = NULL;
static char *arg_trust_pem = NULL;
static bool arg_merge = false;
static int arg_journal_type = 0;
static const char *arg_directory = NULL;
static char *arg_directory = NULL;
static char **arg_file = NULL;
STATIC_DESTRUCTOR_REGISTER(arg_key_pem, erase_and_freep);
STATIC_DESTRUCTOR_REGISTER(arg_cert_pem, freep);
STATIC_DESTRUCTOR_REGISTER(arg_trust_pem, freep);
STATIC_DESTRUCTOR_REGISTER(arg_directory, freep);
STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
typedef struct RequestMeta {
@ -1061,7 +1062,9 @@ static int parse_argv(int argc, char *argv[]) {
break;
case 'D':
arg_directory = optarg;
r = free_and_strdup_warn(&arg_directory, optarg);
if (r < 0)
return r;
break;
case ARG_FILE:

View File

@ -33,13 +33,12 @@
#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-remote.pem"
#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
static const char *arg_url = NULL;
static const char *arg_getter = NULL;
static const char *arg_listen_raw = NULL;
static const char *arg_listen_http = NULL;
static const char *arg_listen_https = NULL;
static CompressionArgs arg_compression = {};
static char **arg_files = NULL; /* Do not free this. */
static char *arg_url = NULL;
static char *arg_getter = NULL;
static char *arg_listen_raw = NULL;
static char *arg_listen_http = NULL;
static char *arg_listen_https = NULL;
static char **arg_files = NULL;
static bool arg_compress = true;
static bool arg_seal = false;
static int http_socket = -1, https_socket = -1;
@ -62,12 +61,20 @@ static uint64_t arg_max_size = UINT64_MAX;
static uint64_t arg_n_max_files = UINT64_MAX;
static uint64_t arg_keep_free = UINT64_MAX;
static OrderedHashmap *arg_compression = NULL;
STATIC_DESTRUCTOR_REGISTER(arg_url, freep);
STATIC_DESTRUCTOR_REGISTER(arg_getter, freep);
STATIC_DESTRUCTOR_REGISTER(arg_listen_raw, freep);
STATIC_DESTRUCTOR_REGISTER(arg_listen_http, freep);
STATIC_DESTRUCTOR_REGISTER(arg_listen_https, freep);
STATIC_DESTRUCTOR_REGISTER(arg_files, strv_freep);
STATIC_DESTRUCTOR_REGISTER(arg_gnutls_log, strv_freep);
STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
STATIC_DESTRUCTOR_REGISTER(arg_output, freep);
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
STATIC_DESTRUCTOR_REGISTER(arg_compression, ordered_hashmap_freep);
static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = {
[JOURNAL_WRITE_SPLIT_NONE] = "none",
@ -158,10 +165,17 @@ static int dispatch_http_event(sd_event_source *event,
static int build_accept_encoding(char **ret) {
assert(ret);
float q = 1.0, step = 1.0 / arg_compression.size;
if (ordered_hashmap_isempty(arg_compression)) {
*ret = NULL;
return 0;
}
_cleanup_free_ char *buf = NULL;
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
const char *c = compression_lowercase_to_string(opt->algorithm);
float q = 1.0, step = 1.0 / ordered_hashmap_size(arg_compression);
const CompressionConfig *cc;
ORDERED_HASHMAP_FOREACH(cc, arg_compression) {
const char *c = compression_lowercase_to_string(cc->algorithm);
if (strextendf_with_separator(&buf, ",", "%s;q=%.1f", c, q) < 0)
return -ENOMEM;
q -= step;
@ -316,11 +330,13 @@ static mhd_result request_handler(
header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Encoding");
if (header) {
Compression c = compression_lowercase_from_string(header);
if (c < 0 || !compression_supported(c))
if (c <= 0 || !compression_supported(c))
return mhd_respondf(connection, 0, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
"Unsupported Content-Encoding type: %s", header);
source->compression = c;
}
} else
source->compression = COMPRESSION_NONE;
return process_http_upload(connection,
upload_data, upload_data_size,
source);
@ -866,27 +882,21 @@ static int parse_argv(int argc, char *argv[]) {
return version();
case ARG_URL:
if (arg_url)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot currently set more than one --url=");
arg_url = optarg;
r = free_and_strdup_warn(&arg_url, optarg);
if (r < 0)
return r;
break;
case ARG_GETTER:
if (arg_getter)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot currently use --getter= more than once");
arg_getter = optarg;
r = free_and_strdup_warn(&arg_getter, optarg);
if (r < 0)
return r;
break;
case ARG_LISTEN_RAW:
if (arg_listen_raw)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot currently use --listen-raw= more than once");
arg_listen_raw = optarg;
r = free_and_strdup_warn(&arg_listen_raw, optarg);
if (r < 0)
return r;
break;
case ARG_LISTEN_HTTP:
@ -897,8 +907,11 @@ static int parse_argv(int argc, char *argv[]) {
r = negative_fd(optarg);
if (r >= 0)
http_socket = r;
else
arg_listen_http = optarg;
else {
r = free_and_strdup_warn(&arg_listen_http, optarg);
if (r < 0)
return r;
}
break;
case ARG_LISTEN_HTTPS:
@ -909,53 +922,36 @@ static int parse_argv(int argc, char *argv[]) {
r = negative_fd(optarg);
if (r >= 0)
https_socket = r;
else
arg_listen_https = optarg;
else {
r = free_and_strdup_warn(&arg_listen_https, optarg);
if (r < 0)
return r;
}
break;
case ARG_KEY:
if (arg_key)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Key file specified twice");
arg_key = strdup(optarg);
if (!arg_key)
return log_oom();
r = free_and_strdup_warn(&arg_key, optarg);
if (r < 0)
return r;
break;
case ARG_CERT:
if (arg_cert)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Certificate file specified twice");
arg_cert = strdup(optarg);
if (!arg_cert)
return log_oom();
r = free_and_strdup_warn(&arg_cert, optarg);
if (r < 0)
return r;
break;
case ARG_TRUST:
#if HAVE_GNUTLS
if (arg_trust)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use --trust more= than once");
arg_trust = strdup(optarg);
if (!arg_trust)
return log_oom();
r = free_and_strdup_warn(&arg_trust, optarg);
if (r < 0)
return r;
#else
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Option --trust= is not available.");
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Option --trust= is not available.");
#endif
break;
case 'o':
if (arg_output)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use --output=/-o more than once");
r = parse_path_argument(optarg, /* suppress_root = */ false, &arg_output);
if (r < 0)
return r;
@ -990,16 +986,13 @@ static int parse_argv(int argc, char *argv[]) {
if (r == 0)
break;
if (strv_push(&arg_gnutls_log, word) < 0)
if (strv_consume(&arg_gnutls_log, TAKE_PTR(word)) < 0)
return log_oom();
word = NULL;
}
break;
#else
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Option --gnutls-log= is not available.");
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Option --gnutls-log= is not available.");
#endif
break;
case '?':
return -EINVAL;
@ -1008,8 +1001,9 @@ static int parse_argv(int argc, char *argv[]) {
assert_not_reached();
}
if (optind < argc)
arg_files = argv + optind;
arg_files = strv_copy(strv_skip(argv, optind));
if (!arg_files)
return log_oom();
type_a = arg_getter || !strv_isempty(arg_files);
type_b = arg_url
@ -1121,6 +1115,10 @@ static int run(int argc, char **argv) {
if (r <= 0)
return r;
r = compression_configs_mangle(&arg_compression);
if (r < 0)
return r;
journal_browse_prepare();
if (arg_listen_http || arg_listen_https) {

View File

@ -31,7 +31,7 @@
#define filename_escape(s) xescape((s), "/ ")
#if HAVE_MICROHTTPD
MHDDaemonWrapper *MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
MHDDaemonWrapper* MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
if (!d)
return NULL;

View File

@ -26,3 +26,4 @@
# KeepFree=
# MaxFileSize=
# MaxFiles=
# Compression=zstd lz4 xz

View File

@ -21,7 +21,7 @@ struct MHDDaemonWrapper {
sd_event_source *timer_event;
};
MHDDaemonWrapper *MHDDaemonWrapper_free(MHDDaemonWrapper *d);
MHDDaemonWrapper* MHDDaemonWrapper_free(MHDDaemonWrapper *d);
DEFINE_TRIVIAL_CLEANUP_FUNC(MHDDaemonWrapper*, MHDDaemonWrapper_free);
#endif

View File

@ -263,7 +263,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
j = u->journal;
if (u->compression.algorithm != COMPRESSION_NONE) {
if (u->compression) {
compression_buffer = malloc_multiply(nmemb, size);
if (!compression_buffer) {
log_oom();
@ -309,12 +309,12 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
u->entries_sent, u->current_cursor);
}
if (filled > 0 && u->compression.algorithm != COMPRESSION_NONE) {
if (filled > 0 && u->compression) {
size_t compressed_size;
r = compress_blob(u->compression.algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression.level);
r = compress_blob(u->compression->algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression->level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zu bytes (Compression=%s, Level=%d): %m",
filled, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
log_error_errno(r, "Failed to compress %zu bytes by %s with level %i: %m",
filled, compression_lowercase_to_string(u->compression->algorithm), u->compression->level);
return CURL_READFUNC_ABORT;
}

View File

@ -42,27 +42,36 @@
#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
#define DEFAULT_PORT 19532
static const char *arg_url = NULL;
static const char *arg_key = NULL;
static const char *arg_cert = NULL;
static const char *arg_trust = NULL;
static const char *arg_directory = NULL;
static char *arg_url = NULL;
static char *arg_key = NULL;
static char *arg_cert = NULL;
static char *arg_trust = NULL;
static char *arg_directory = NULL;
static char **arg_file = NULL;
static const char *arg_cursor = NULL;
static char *arg_cursor = NULL;
static bool arg_after_cursor = false;
static int arg_journal_type = 0;
static int arg_namespace_flags = 0;
static const char *arg_machine = NULL;
static const char *arg_namespace = NULL;
static char *arg_machine = NULL;
static char *arg_namespace = NULL;
static bool arg_merge = false;
static int arg_follow = -1;
static const char *arg_save_state = NULL;
static char *arg_save_state = NULL;
static usec_t arg_network_timeout_usec = USEC_INFINITY;
static CompressionArgs arg_compression = {};
static OrderedHashmap *arg_compression = NULL;
static bool arg_force_compression = false;
STATIC_DESTRUCTOR_REGISTER(arg_url, freep);
STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
STATIC_DESTRUCTOR_REGISTER(arg_directory, freep);
STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
STATIC_DESTRUCTOR_REGISTER(arg_cursor, freep);
STATIC_DESTRUCTOR_REGISTER(arg_machine, freep);
STATIC_DESTRUCTOR_REGISTER(arg_namespace, freep);
STATIC_DESTRUCTOR_REGISTER(arg_save_state, freep);
STATIC_DESTRUCTOR_REGISTER(arg_compression, ordered_hashmap_freep);
static void close_fd_input(Uploader *u);
@ -206,8 +215,8 @@ int start_upload(Uploader *u,
return log_oom();
h = l;
if (u->compression.algorithm != COMPRESSION_NONE) {
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
if (u->compression) {
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression->algorithm));
if (!header)
return log_oom();
@ -318,7 +327,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
assert(!size_multiply_overflow(size, nmemb));
if (u->compression.algorithm != COMPRESSION_NONE) {
if (u->compression) {
compression_buffer = malloc_multiply(nmemb, size);
if (!compression_buffer) {
log_oom();
@ -329,14 +338,14 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
n = read(u->input, compression_buffer ?: buf, size * nmemb);
if (n > 0) {
log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n);
if (u->compression.algorithm == COMPRESSION_NONE)
if (!u->compression)
return n;
size_t compressed_size;
r = compress_blob(u->compression.algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression.level);
r = compress_blob(u->compression->algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression->level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zd bytes using (Compression=%s, Level=%d): %m",
n, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
log_error_errno(r, "Failed to compress %zd bytes by %s with level %i: %m",
n, compression_lowercase_to_string(u->compression->algorithm), u->compression->level);
return CURL_READFUNC_ABORT;
}
assert(compressed_size <= size * nmemb);
@ -423,12 +432,10 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
*u = (Uploader) {
.input = -1,
.compression.algorithm = COMPRESSION_NONE,
.compression.level = -1,
};
if (arg_force_compression && arg_compression.size > 0)
u->compression = arg_compression.opts[0];
if (arg_force_compression)
u->compression = ordered_hashmap_first(arg_compression);
host = STARTSWITH_SET(url, "http://", "https://");
if (!host) {
@ -488,64 +495,117 @@ static void destroy_uploader(Uploader *u) {
}
#if LIBCURL_VERSION_NUM >= 0x075300
static int update_content_encoding(Uploader *u, const char *accept_encoding) {
static int update_content_encoding_header(Uploader *u, const CompressionConfig *cc) {
bool update_header = false;
assert(u);
if (cc == u->compression)
return 0; /* Already picked the algorithm. Let's shortcut. */
if (cc) {
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(cc->algorithm));
if (!header)
return log_oom();
/* First, try to update existing Content-Encoding header. */
bool found = false;
for (struct curl_slist *l = u->header; l; l = l->next)
if (startswith(l->data, "Content-Encoding:")) {
free_and_replace(l->data, header);
found = true;
break;
}
/* If Content-Encoding header is not found, append new one. */
if (!found) {
struct curl_slist *l = curl_slist_append(u->header, header);
if (!l)
return log_oom();
u->header = l;
}
update_header = true;
} else
/* Remove Content-Encoding header. */
for (struct curl_slist *l = u->header, *prev = NULL; l; prev = l, l = l->next)
if (startswith(l->data, "Content-Encoding:")) {
if (prev)
prev->next = TAKE_PTR(l->next);
else
u->header = TAKE_PTR(l->next);
curl_slist_free_all(l);
update_header = true;
break;
}
if (update_header) {
CURLcode code;
easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_WARNING, return -EXFULL);
}
u->compression = cc;
if (cc)
log_debug("Using compression algorithm %s with compression level %i.", compression_lowercase_to_string(cc->algorithm), cc->level);
else
log_debug("Disabled compression algorithm.");
return 0;
}
#endif
static int parse_accept_encoding_header(Uploader *u) {
#if LIBCURL_VERSION_NUM >= 0x075300
int r;
assert(u);
for (const char *p = accept_encoding;;) {
_cleanup_free_ char *encoding_value = NULL, *alg = NULL;
Compression algorithm;
CURLcode code;
if (ordered_hashmap_isempty(arg_compression))
return update_content_encoding_header(u, NULL);
r = extract_first_word(&p, &encoding_value, ",", 0);
struct curl_header *header;
CURLHcode hcode = curl_easy_header(u->easy, "Accept-Encoding", 0, CURLH_HEADER, -1, &header);
if (hcode != CURLHE_OK)
goto not_found;
for (const char *p = header->value;;) {
_cleanup_free_ char *word = NULL;
r = extract_first_word(&p, &word, ",", 0);
if (r < 0)
return log_error_errno(r, "Failed to extract Accept-Encoding header value: %m");
return log_warning_errno(r, "Failed to parse Accept-Encoding header value, ignoring: %m");
if (r == 0)
return 0;
break;
const char *q = encoding_value;
r = extract_first_word(&q, &alg, ";", 0);
if (r < 0)
return log_error_errno(r, "Failed to extract compression algorithm from Accept-Encoding header: %m");
/* Cut the quality value waiting. */
char *q = strchr(word, ';');
if (q)
*q = '\0';
algorithm = compression_lowercase_from_string(alg);
if (algorithm <= 0 || !compression_supported(algorithm)) {
continue;
}
if (streq(word, "*"))
return update_content_encoding_header(u, ordered_hashmap_first(arg_compression));
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
if (opt->algorithm != algorithm)
continue;
Compression c = compression_lowercase_from_string(word);
if (c <= 0 || !compression_supported(c))
continue; /* unsupported or invalid algorithm. */
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
if (!header)
return log_oom();
const CompressionConfig *cc = ordered_hashmap_get(arg_compression, INT_TO_PTR(c));
if (!cc)
continue; /* The specified algorithm is not enabled. */
/* First, update existing Content-Encoding header. */
bool found = false;
for (struct curl_slist *l = u->header; l; l = l->next)
if (startswith(l->data, "Content-Encoding:")) {
free_and_replace(l->data, header);
found = true;
break;
}
/* If Content-Encoding header is not found, append new one. */
if (!found) {
struct curl_slist *l = curl_slist_append(u->header, header);
if (!l)
return log_oom();
u->header = l;
}
easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL);
u->compression = *opt;
return 0;
}
return update_content_encoding_header(u, cc);
}
}
not_found:
if (arg_force_compression)
return update_content_encoding_header(u, ordered_hashmap_first(arg_compression));
return update_content_encoding_header(u, NULL);
#else
return 0;
#endif
}
static int perform_upload(Uploader *u) {
CURLcode code;
@ -575,29 +635,15 @@ static int perform_upload(Uploader *u) {
return log_error_errno(SYNTHETIC_ERRNO(EIO),
"Upload to %s failed with code %ld: %s",
u->url, status, strna(u->answer));
else if (status < 200)
if (status < 200)
return log_error_errno(SYNTHETIC_ERRNO(EIO),
"Upload to %s finished with unexpected code %ld: %s",
u->url, status, strna(u->answer));
else {
#if LIBCURL_VERSION_NUM >= 0x075300
int r;
if (u->compression.algorithm == COMPRESSION_NONE) {
struct curl_header *encoding_header;
CURLHcode hcode;
hcode = curl_easy_header(u->easy, "Accept-Encoding", 0, CURLH_HEADER, -1, &encoding_header);
if (hcode == CURLHE_OK && encoding_header && encoding_header->value) {
r = update_content_encoding(u, encoding_header->value);
if (r < 0)
return r;
}
}
#endif
(void) parse_accept_encoding_header(u);
log_debug("Upload finished successfully with code %ld: %s",
status, strna(u->answer));
}
log_debug("Upload finished successfully with code %ld: %s",
status, strna(u->answer));
free_and_replace(u->last_cursor, u->current_cursor);
@ -611,7 +657,7 @@ static int parse_config(void) {
{ "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert },
{ "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust },
{ "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec },
{ "Upload", "Compression", config_parse_compression, true, &arg_compression },
{ "Upload", "Compression", config_parse_compression, /* with_level */ true, &arg_compression },
{ "Upload", "ForceCompression", config_parse_bool, 0, &arg_force_compression },
{}
};
@ -716,35 +762,27 @@ static int parse_argv(int argc, char *argv[]) {
return version();
case 'u':
if (arg_url)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --url=");
arg_url = optarg;
r = free_and_strdup_warn(&arg_url, optarg);
if (r < 0)
return r;
break;
case ARG_KEY:
if (arg_key)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --key=");
arg_key = optarg;
r = free_and_strdup_warn(&arg_key, optarg);
if (r < 0)
return r;
break;
case ARG_CERT:
if (arg_cert)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --cert=");
arg_cert = optarg;
r = free_and_strdup_warn(&arg_cert, optarg);
if (r < 0)
return r;
break;
case ARG_TRUST:
if (arg_trust)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --trust=");
arg_trust = optarg;
r = free_and_strdup_warn(&arg_trust, optarg);
if (r < 0)
return r;
break;
case ARG_SYSTEM:
@ -760,36 +798,35 @@ static int parse_argv(int argc, char *argv[]) {
break;
case 'M':
if (arg_machine)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --machine=/-M");
arg_machine = optarg;
r = free_and_strdup_warn(&arg_machine, optarg);
if (r < 0)
return r;
break;
case ARG_NAMESPACE:
if (streq(optarg, "*")) {
arg_namespace_flags = SD_JOURNAL_ALL_NAMESPACES;
arg_namespace = NULL;
arg_namespace = mfree(arg_namespace);
r = 0;
} else if (startswith(optarg, "+")) {
arg_namespace_flags = SD_JOURNAL_INCLUDE_DEFAULT_NAMESPACE;
arg_namespace = optarg + 1;
r = free_and_strdup_warn(&arg_namespace, optarg + 1);
} else if (isempty(optarg)) {
arg_namespace_flags = 0;
arg_namespace = NULL;
arg_namespace = mfree(arg_namespace);
r = 0;
} else {
arg_namespace_flags = 0;
arg_namespace = optarg;
r = free_and_strdup_warn(&arg_namespace, optarg);
}
if (r < 0)
return r;
break;
case 'D':
if (arg_directory)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --directory=/-D");
arg_directory = optarg;
r = free_and_strdup_warn(&arg_directory, optarg);
if (r < 0)
return r;
break;
case ARG_FILE:
@ -799,20 +836,11 @@ static int parse_argv(int argc, char *argv[]) {
break;
case ARG_CURSOR:
if (arg_cursor)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --cursor=/--after-cursor=");
arg_cursor = optarg;
break;
case ARG_AFTER_CURSOR:
if (arg_cursor)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
"Cannot use more than one --cursor=/--after-cursor=");
arg_cursor = optarg;
arg_after_cursor = true;
r = free_and_strdup_warn(&arg_cursor, optarg);
if (r < 0)
return r;
arg_after_cursor = c == ARG_AFTER_CURSOR;
break;
case ARG_FOLLOW:
@ -823,7 +851,9 @@ static int parse_argv(int argc, char *argv[]) {
break;
case ARG_SAVE_STATE:
arg_save_state = optarg ?: STATE_FILE;
r = free_and_strdup_warn(&arg_save_state, optarg ?: STATE_FILE);
if (r < 0)
return r;
break;
case '?':
@ -891,6 +921,10 @@ static int run(int argc, char **argv) {
if (r <= 0)
return r;
r = compression_configs_mangle(&arg_compression);
if (r < 0)
return r;
journal_browse_prepare();
r = setup_uploader(&u, arg_url, arg_save_state);

View File

@ -21,3 +21,5 @@
# ServerKeyFile={{CERTIFICATE_ROOT}}/private/journal-upload.pem
# ServerCertificateFile={{CERTIFICATE_ROOT}}/certs/journal-upload.pem
# TrustedCertificateFile={{CERTIFICATE_ROOT}}/ca/trusted.pem
# Compression=zstd lz4 xz
# ForceCompression=no

View File

@ -54,7 +54,7 @@ typedef struct Uploader {
char *last_cursor, *current_cursor;
usec_t watchdog_timestamp;
usec_t watchdog_usec;
CompressionOpts compression;
const CompressionConfig *compression;
} Uploader;
#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)