mirror of
https://github.com/systemd/systemd.git
synced 2025-03-19 22:50:17 +03:00
journal-remote: added compression, compression-level and content-encoding negotiation
This commit is contained in:
parent
91d6f1ee53
commit
cfaf78001c
@ -58,6 +58,19 @@
|
||||
[Remote] section:</para>
|
||||
|
||||
<variablelist class='config-directives'>
|
||||
<varlistentry>
|
||||
<term><varname>Compression=</varname></term>
|
||||
|
||||
<listitem><para>Acceptable compression algorithms to be used by <command>systemd-journal-upload</command>. Compression algorithms are
|
||||
used for <literal>Accept-Encoding</literal> header contruction with priorities set according to an order in configuration.
|
||||
This parameter takes space separated list of compression algorithms. Example:
|
||||
<programlisting>Compression=zstd lz4</programlisting>
|
||||
This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared.
|
||||
</para>
|
||||
|
||||
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><varname>Seal=</varname></term>
|
||||
|
||||
|
@ -60,6 +60,37 @@
|
||||
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><varname>Compression=</varname></term>
|
||||
|
||||
<listitem><para>Takes a space separated list of compression algorithms to be applied to logs data before sending.
|
||||
Supported algorithms are <literal>none</literal>, <literal>zstd</literal>, <literal>xz</literal>,
|
||||
or <literal>lz4</literal>. Optionally, each algorithm (except for <literal>none</literal>)
|
||||
followed by a colon (<literal>:</literal>) and its compression level, for example <literal>zstd:4</literal>.
|
||||
The compression level is expected to be a positive integer. This option can be specified multiple times.
|
||||
If an empty string is assigned, then all previous assignments are cleared.
|
||||
Defaults to unset, and data will not be compressed.</para>
|
||||
|
||||
<para>Example:
|
||||
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>
|
||||
|
||||
<para>Even when compression is enabled, the initial requests are sent without compression.
|
||||
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
|
||||
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
|
||||
compression algorithms that contains one of the algorithms specified in this option.</para>
|
||||
|
||||
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><varname>ForceCompression=</varname></term>
|
||||
|
||||
<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
|
||||
Defaults to <literal>false</literal>.</para>
|
||||
|
||||
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><varname>ServerKeyFile=</varname></term>
|
||||
|
||||
|
@ -122,7 +122,15 @@ static const char* const compression_table[_COMPRESSION_MAX] = {
|
||||
[COMPRESSION_ZSTD] = "ZSTD",
|
||||
};
|
||||
|
||||
static const char* const compression_lowercase_table[_COMPRESSION_MAX] = {
|
||||
[COMPRESSION_NONE] = "none",
|
||||
[COMPRESSION_XZ] = "xz",
|
||||
[COMPRESSION_LZ4] = "lz4",
|
||||
[COMPRESSION_ZSTD] = "zstd",
|
||||
};
|
||||
|
||||
DEFINE_STRING_TABLE_LOOKUP(compression, Compression);
|
||||
DEFINE_STRING_TABLE_LOOKUP(compression_lowercase, Compression);
|
||||
|
||||
bool compression_supported(Compression c) {
|
||||
static const unsigned supported =
|
||||
|
@ -24,6 +24,8 @@ typedef enum Compression {
|
||||
|
||||
const char* compression_to_string(Compression compression);
|
||||
Compression compression_from_string(const char *compression);
|
||||
const char* compression_lowercase_to_string(Compression compression);
|
||||
Compression compression_lowercase_from_string(const char *compression);
|
||||
|
||||
bool compression_supported(Compression c);
|
||||
|
||||
|
91
src/journal-remote/journal-compression-util.c
Normal file
91
src/journal-remote/journal-compression-util.c
Normal file
@ -0,0 +1,91 @@
|
||||
/* SPDX-License-Identifier: LGPL-2.1-or-later */
|
||||
|
||||
#include "extract-word.h"
|
||||
#include "journal-compression-util.h"
|
||||
#include "parse-util.h"
|
||||
|
||||
void compression_args_clear(CompressionArgs *args) {
|
||||
assert(args);
|
||||
args->size = 0;
|
||||
args->opts = mfree(args->opts);
|
||||
}
|
||||
|
||||
int config_parse_compression(
|
||||
const char *unit,
|
||||
const char *filename,
|
||||
unsigned line,
|
||||
const char *section,
|
||||
unsigned section_line,
|
||||
const char *lvalue,
|
||||
int ltype,
|
||||
const char *rvalue,
|
||||
void *data,
|
||||
void *userdata) {
|
||||
|
||||
CompressionArgs *args = ASSERT_PTR(data);
|
||||
bool parse_level = ltype;
|
||||
int r;
|
||||
|
||||
assert(filename);
|
||||
assert(lvalue);
|
||||
assert(rvalue);
|
||||
|
||||
if (isempty(rvalue)) {
|
||||
compression_args_clear(args);
|
||||
return 1;
|
||||
}
|
||||
|
||||
for (const char *p = rvalue;;) {
|
||||
_cleanup_free_ char *algorithm = NULL, *word = NULL;
|
||||
int level = -1;
|
||||
|
||||
r = extract_first_word(&p, &word, NULL, 0);
|
||||
if (r < 0)
|
||||
return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
|
||||
if (r == 0)
|
||||
return 1;
|
||||
|
||||
if (parse_level) {
|
||||
const char *q = word;
|
||||
r = extract_first_word(&q, &algorithm, ":", 0);
|
||||
if (r < 0)
|
||||
return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
|
||||
if (!isempty(q)) {
|
||||
r = safe_atoi(q, &level);
|
||||
if (r < 0) {
|
||||
log_syntax(unit, LOG_WARNING, filename, line, r,
|
||||
"Compression level %s should be positive, ignoring.", q);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else
|
||||
algorithm = TAKE_PTR(word);
|
||||
|
||||
Compression c = compression_lowercase_from_string(algorithm);
|
||||
if (c < 0 || !compression_supported(c)) {
|
||||
log_syntax(unit, LOG_WARNING, filename, line, c,
|
||||
"Compression=%s is not supported on a system, ignoring.", algorithm);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
FOREACH_ARRAY(opt, args->opts, args->size)
|
||||
if (opt->algorithm == c) {
|
||||
found = true;
|
||||
if (parse_level)
|
||||
opt->level = level;
|
||||
break;
|
||||
}
|
||||
|
||||
if (found)
|
||||
continue;
|
||||
|
||||
if (!GREEDY_REALLOC(args->opts, args->size + 1))
|
||||
return log_oom();
|
||||
|
||||
args->opts[args->size++] = (CompressionOpts) {
|
||||
.algorithm = c,
|
||||
.level = level,
|
||||
};
|
||||
}
|
||||
}
|
19
src/journal-remote/journal-compression-util.h
Normal file
19
src/journal-remote/journal-compression-util.h
Normal file
@ -0,0 +1,19 @@
|
||||
/* SPDX-License-Identifier: LGPL-2.1-or-later */
|
||||
#pragma once
|
||||
|
||||
#include "compress.h"
|
||||
#include "conf-parser.h"
|
||||
|
||||
typedef struct CompressionOpts {
|
||||
Compression algorithm;
|
||||
int level;
|
||||
} CompressionOpts;
|
||||
|
||||
typedef struct CompressionArgs {
|
||||
CompressionOpts *opts;
|
||||
size_t size;
|
||||
} CompressionArgs;
|
||||
|
||||
CONFIG_PARSER_PROTOTYPE(config_parse_compression);
|
||||
|
||||
void compression_args_clear(CompressionArgs *args);
|
@ -11,6 +11,7 @@
|
||||
#include "daemon-util.h"
|
||||
#include "fd-util.h"
|
||||
#include "fileio.h"
|
||||
#include "journal-compression-util.h"
|
||||
#include "journal-remote-write.h"
|
||||
#include "journal-remote.h"
|
||||
#include "logs-show.h"
|
||||
@ -37,6 +38,7 @@ static const char *arg_getter = NULL;
|
||||
static const char *arg_listen_raw = NULL;
|
||||
static const char *arg_listen_http = NULL;
|
||||
static const char *arg_listen_https = NULL;
|
||||
static CompressionArgs arg_compression = {};
|
||||
static char **arg_files = NULL; /* Do not free this. */
|
||||
static bool arg_compress = true;
|
||||
static bool arg_seal = false;
|
||||
@ -65,6 +67,7 @@ STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_output, freep);
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
|
||||
|
||||
static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = {
|
||||
[JOURNAL_WRITE_SPLIT_NONE] = "none",
|
||||
@ -152,6 +155,22 @@ static int dispatch_http_event(sd_event_source *event,
|
||||
uint32_t revents,
|
||||
void *userdata);
|
||||
|
||||
static int build_accept_encoding(char **ret) {
|
||||
assert(ret);
|
||||
|
||||
float q = 1.0, step = 1.0 / arg_compression.size;
|
||||
_cleanup_free_ char *buf = NULL;
|
||||
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
|
||||
const char *c = compression_lowercase_to_string(opt->algorithm);
|
||||
if (strextendf_with_separator(&buf, ",", "%s;q=%.1f", c, q) < 0)
|
||||
return -ENOMEM;
|
||||
q -= step;
|
||||
}
|
||||
|
||||
*ret = TAKE_PTR(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int request_meta(void **connection_cls, int fd, char *hostname) {
|
||||
RemoteSource *source;
|
||||
Writer *writer;
|
||||
@ -174,6 +193,11 @@ static int request_meta(void **connection_cls, int fd, char *hostname) {
|
||||
|
||||
log_debug("Added RemoteSource as connection metadata %p", source);
|
||||
|
||||
r = build_accept_encoding(&source->encoding);
|
||||
if (r < 0)
|
||||
return log_oom();
|
||||
|
||||
source->compression = COMPRESSION_NONE;
|
||||
*connection_cls = source;
|
||||
return 0;
|
||||
}
|
||||
@ -212,8 +236,17 @@ static int process_http_upload(
|
||||
if (*upload_data_size) {
|
||||
log_trace("Received %zu bytes", *upload_data_size);
|
||||
|
||||
r = journal_importer_push_data(&source->importer,
|
||||
upload_data, *upload_data_size);
|
||||
if (source->compression != COMPRESSION_NONE) {
|
||||
_cleanup_free_ char *buf = NULL;
|
||||
size_t buf_size;
|
||||
|
||||
r = decompress_blob(source->compression, upload_data, *upload_data_size, (void **) &buf, &buf_size, 0);
|
||||
if (r < 0)
|
||||
return mhd_respondf(connection, r, MHD_HTTP_BAD_REQUEST, "Decompression of received blob falied.");
|
||||
|
||||
r = journal_importer_push_data(&source->importer, buf, buf_size);
|
||||
} else
|
||||
r = journal_importer_push_data(&source->importer, upload_data, *upload_data_size);
|
||||
if (r < 0)
|
||||
return mhd_respond_oom(connection);
|
||||
|
||||
@ -253,7 +286,7 @@ static int process_http_upload(
|
||||
remaining);
|
||||
}
|
||||
|
||||
return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.");
|
||||
return mhd_respond_with_encoding(connection, MHD_HTTP_ACCEPTED, source->encoding, "OK.");
|
||||
};
|
||||
|
||||
static mhd_result request_handler(
|
||||
@ -278,10 +311,20 @@ static mhd_result request_handler(
|
||||
|
||||
log_trace("Handling a connection %s %s %s", method, url, version);
|
||||
|
||||
if (*connection_cls)
|
||||
if (*connection_cls) {
|
||||
RemoteSource *source = *connection_cls;
|
||||
header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Encoding");
|
||||
if (header) {
|
||||
Compression c = compression_lowercase_from_string(header);
|
||||
if (c < 0 || !compression_supported(c))
|
||||
return mhd_respondf(connection, 0, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
|
||||
"Unsupported Content-Encoding type: %s", header);
|
||||
source->compression = c;
|
||||
}
|
||||
return process_http_upload(connection,
|
||||
upload_data, upload_data_size,
|
||||
*connection_cls);
|
||||
source);
|
||||
}
|
||||
|
||||
if (!streq(method, "POST"))
|
||||
return mhd_respond(connection, MHD_HTTP_NOT_ACCEPTABLE, "Unsupported method.");
|
||||
@ -722,6 +765,7 @@ static int parse_config(void) {
|
||||
{ "Remote", "MaxFileSize", config_parse_iec_uint64, 0, &arg_max_size },
|
||||
{ "Remote", "MaxFiles", config_parse_uint64, 0, &arg_n_max_files },
|
||||
{ "Remote", "KeepFree", config_parse_iec_uint64, 0, &arg_keep_free },
|
||||
{ "Remote", "Compression", config_parse_compression, 0, &arg_compression },
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -18,6 +18,7 @@ void source_free(RemoteSource *source) {
|
||||
sd_event_source_unref(source->event);
|
||||
sd_event_source_unref(source->buffer_event);
|
||||
|
||||
free(source->encoding);
|
||||
free(source);
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
#include "sd-event.h"
|
||||
|
||||
#include "compress.h"
|
||||
#include "journal-importer.h"
|
||||
#include "journal-remote-write.h"
|
||||
|
||||
@ -13,6 +14,8 @@ typedef struct RemoteSource {
|
||||
|
||||
sd_event_source *event;
|
||||
sd_event_source *buffer_event;
|
||||
Compression compression;
|
||||
char *encoding;
|
||||
} RemoteSource;
|
||||
|
||||
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
|
||||
|
@ -251,6 +251,7 @@ static void check_update_watchdog(Uploader *u) {
|
||||
|
||||
static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
|
||||
Uploader *u = ASSERT_PTR(userp);
|
||||
_cleanup_free_ char *compression_buffer = NULL;
|
||||
int r;
|
||||
sd_journal *j;
|
||||
size_t filled = 0;
|
||||
@ -262,6 +263,14 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
|
||||
|
||||
j = u->journal;
|
||||
|
||||
if (u->compression.algorithm != COMPRESSION_NONE) {
|
||||
compression_buffer = malloc_multiply(nmemb, size);
|
||||
if (!compression_buffer) {
|
||||
log_oom();
|
||||
return CURL_READFUNC_ABORT;
|
||||
}
|
||||
}
|
||||
|
||||
while (j && filled < size * nmemb) {
|
||||
if (u->entry_state == ENTRY_DONE) {
|
||||
r = sd_journal_next(j);
|
||||
@ -284,7 +293,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
|
||||
u->entry_state = ENTRY_CURSOR;
|
||||
}
|
||||
|
||||
w = write_entry((char*)buf + filled, size * nmemb - filled, u);
|
||||
w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u);
|
||||
if (w < 0)
|
||||
return CURL_READFUNC_ABORT;
|
||||
filled += w;
|
||||
@ -300,6 +309,19 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
|
||||
u->entries_sent, u->current_cursor);
|
||||
}
|
||||
|
||||
if (filled > 0 && u->compression.algorithm != COMPRESSION_NONE) {
|
||||
size_t compressed_size;
|
||||
r = compress_blob(u->compression.algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression.level);
|
||||
if (r < 0) {
|
||||
log_error_errno(r, "Failed to compress %zu bytes (Compression=%s, Level=%d): %m",
|
||||
filled, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
|
||||
return CURL_READFUNC_ABORT;
|
||||
}
|
||||
|
||||
assert(compressed_size <= size * nmemb);
|
||||
return compressed_size;
|
||||
}
|
||||
|
||||
return filled;
|
||||
}
|
||||
|
||||
|
@ -58,8 +58,11 @@ static bool arg_merge = false;
|
||||
static int arg_follow = -1;
|
||||
static const char *arg_save_state = NULL;
|
||||
static usec_t arg_network_timeout_usec = USEC_INFINITY;
|
||||
static CompressionArgs arg_compression = {};
|
||||
static bool arg_force_compression = false;
|
||||
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
|
||||
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
|
||||
|
||||
static void close_fd_input(Uploader *u);
|
||||
|
||||
@ -203,6 +206,17 @@ int start_upload(Uploader *u,
|
||||
return log_oom();
|
||||
h = l;
|
||||
|
||||
if (u->compression.algorithm != COMPRESSION_NONE) {
|
||||
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
|
||||
if (!header)
|
||||
return log_oom();
|
||||
|
||||
l = curl_slist_append(h, header);
|
||||
if (!l)
|
||||
return log_oom();
|
||||
h = l;
|
||||
}
|
||||
|
||||
u->header = TAKE_PTR(h);
|
||||
}
|
||||
|
||||
@ -292,8 +306,10 @@ int start_upload(Uploader *u,
|
||||
}
|
||||
|
||||
static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
|
||||
_cleanup_free_ char *compression_buffer = NULL;
|
||||
Uploader *u = ASSERT_PTR(userp);
|
||||
ssize_t n;
|
||||
int r;
|
||||
|
||||
assert(nmemb < SSIZE_MAX / size);
|
||||
|
||||
@ -302,17 +318,35 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
|
||||
|
||||
assert(!size_multiply_overflow(size, nmemb));
|
||||
|
||||
n = read(u->input, buf, size * nmemb);
|
||||
log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n);
|
||||
if (n > 0)
|
||||
return n;
|
||||
if (u->compression.algorithm != COMPRESSION_NONE) {
|
||||
compression_buffer = malloc_multiply(nmemb, size);
|
||||
if (!compression_buffer) {
|
||||
log_oom();
|
||||
return CURL_READFUNC_ABORT;
|
||||
}
|
||||
}
|
||||
|
||||
u->uploading = false;
|
||||
if (n < 0) {
|
||||
n = read(u->input, compression_buffer ?: buf, size * nmemb);
|
||||
if (n > 0) {
|
||||
log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n);
|
||||
if (u->compression.algorithm == COMPRESSION_NONE)
|
||||
return n;
|
||||
|
||||
size_t compressed_size;
|
||||
r = compress_blob(u->compression.algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression.level);
|
||||
if (r < 0) {
|
||||
log_error_errno(r, "Failed to compress %zd bytes using (Compression=%s, Level=%d): %m",
|
||||
n, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
|
||||
return CURL_READFUNC_ABORT;
|
||||
}
|
||||
assert(compressed_size <= size * nmemb);
|
||||
return compressed_size;
|
||||
} else if (n < 0) {
|
||||
log_error_errno(errno, "Aborting transfer after read error on input: %m.");
|
||||
return CURL_READFUNC_ABORT;
|
||||
}
|
||||
|
||||
u->uploading = false;
|
||||
log_debug("Reached EOF");
|
||||
close_fd_input(u);
|
||||
return 0;
|
||||
@ -389,8 +423,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
|
||||
|
||||
*u = (Uploader) {
|
||||
.input = -1,
|
||||
.compression.algorithm = COMPRESSION_NONE,
|
||||
.compression.level = -1,
|
||||
};
|
||||
|
||||
if (arg_force_compression && arg_compression.size > 0)
|
||||
u->compression = arg_compression.opts[0];
|
||||
|
||||
host = STARTSWITH_SET(url, "http://", "https://");
|
||||
if (!host) {
|
||||
host = url;
|
||||
@ -448,6 +487,66 @@ static void destroy_uploader(Uploader *u) {
|
||||
sd_event_unref(u->event);
|
||||
}
|
||||
|
||||
#if LIBCURL_VERSION_NUM >= 0x075300
|
||||
static int update_content_encoding(Uploader *u, const char *accept_encoding) {
|
||||
int r;
|
||||
|
||||
assert(u);
|
||||
|
||||
for (const char *p = accept_encoding;;) {
|
||||
_cleanup_free_ char *encoding_value = NULL, *alg = NULL;
|
||||
Compression algorithm;
|
||||
CURLcode code;
|
||||
|
||||
r = extract_first_word(&p, &encoding_value, ",", 0);
|
||||
if (r < 0)
|
||||
return log_error_errno(r, "Failed to extract Accept-Encoding header value: %m");
|
||||
if (r == 0)
|
||||
return 0;
|
||||
|
||||
const char *q = encoding_value;
|
||||
r = extract_first_word(&q, &alg, ";", 0);
|
||||
if (r < 0)
|
||||
return log_error_errno(r, "Failed to extract compression algorithm from Accept-Encoding header: %m");
|
||||
|
||||
algorithm = compression_lowercase_from_string(alg);
|
||||
if (algorithm <= 0 || !compression_supported(algorithm)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
|
||||
if (opt->algorithm != algorithm)
|
||||
continue;
|
||||
|
||||
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
|
||||
if (!header)
|
||||
return log_oom();
|
||||
|
||||
/* First, update existing Content-Encoding header. */
|
||||
bool found = false;
|
||||
for (struct curl_slist *l = u->header; l; l = l->next)
|
||||
if (startswith(l->data, "Content-Encoding:")) {
|
||||
free_and_replace(l->data, header);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
|
||||
/* If Content-Encoding header is not found, append new one. */
|
||||
if (!found) {
|
||||
struct curl_slist *l = curl_slist_append(u->header, header);
|
||||
if (!l)
|
||||
return log_oom();
|
||||
u->header = l;
|
||||
}
|
||||
|
||||
easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL);
|
||||
u->compression = *opt;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static int perform_upload(Uploader *u) {
|
||||
CURLcode code;
|
||||
long status;
|
||||
@ -480,9 +579,25 @@ static int perform_upload(Uploader *u) {
|
||||
return log_error_errno(SYNTHETIC_ERRNO(EIO),
|
||||
"Upload to %s finished with unexpected code %ld: %s",
|
||||
u->url, status, strna(u->answer));
|
||||
else
|
||||
else {
|
||||
#if LIBCURL_VERSION_NUM >= 0x075300
|
||||
int r;
|
||||
if (u->compression.algorithm == COMPRESSION_NONE) {
|
||||
struct curl_header *encoding_header;
|
||||
CURLHcode hcode;
|
||||
|
||||
hcode = curl_easy_header(u->easy, "Accept-Encoding", 0, CURLH_HEADER, -1, &encoding_header);
|
||||
if (hcode == CURLHE_OK && encoding_header && encoding_header->value) {
|
||||
r = update_content_encoding(u, encoding_header->value);
|
||||
if (r < 0)
|
||||
return r;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
log_debug("Upload finished successfully with code %ld: %s",
|
||||
status, strna(u->answer));
|
||||
}
|
||||
|
||||
free_and_replace(u->last_cursor, u->current_cursor);
|
||||
|
||||
@ -496,6 +611,8 @@ static int parse_config(void) {
|
||||
{ "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert },
|
||||
{ "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust },
|
||||
{ "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec },
|
||||
{ "Upload", "Compression", config_parse_compression, true, &arg_compression },
|
||||
{ "Upload", "ForceCompression", config_parse_bool, 0, &arg_force_compression },
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "sd-event.h"
|
||||
#include "sd-journal.h"
|
||||
|
||||
#include "journal-compression-util.h"
|
||||
#include "time-util.h"
|
||||
|
||||
typedef enum {
|
||||
@ -53,6 +54,7 @@ typedef struct Uploader {
|
||||
char *last_cursor, *current_cursor;
|
||||
usec_t watchdog_timestamp;
|
||||
usec_t watchdog_usec;
|
||||
CompressionOpts compression;
|
||||
} Uploader;
|
||||
|
||||
#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)
|
||||
|
@ -1,11 +1,13 @@
|
||||
# SPDX-License-Identifier: LGPL-2.1-or-later
|
||||
|
||||
systemd_journal_upload_sources = files(
|
||||
'journal-compression-util.c',
|
||||
'journal-upload-journal.c',
|
||||
'journal-upload.c',
|
||||
)
|
||||
|
||||
libsystemd_journal_remote_sources = files(
|
||||
'journal-compression-util.c',
|
||||
'journal-remote-parse.c',
|
||||
'journal-remote-write.c',
|
||||
'journal-remote.c',
|
||||
|
@ -28,6 +28,7 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) {
|
||||
int mhd_respond_internal(
|
||||
struct MHD_Connection *connection,
|
||||
enum MHD_RequestTerminationCode code,
|
||||
const char *encoding,
|
||||
const char *buffer,
|
||||
size_t size,
|
||||
enum MHD_ResponseMemoryMode mode) {
|
||||
@ -40,6 +41,10 @@ int mhd_respond_internal(
|
||||
return MHD_NO;
|
||||
|
||||
log_debug("Queueing response %u: %s", code, buffer);
|
||||
if (encoding)
|
||||
if (MHD_add_response_header(response, "Accept-Encoding", encoding) == MHD_NO)
|
||||
return MHD_NO;
|
||||
|
||||
if (MHD_add_response_header(response, "Content-Type", "text/plain") == MHD_NO)
|
||||
return MHD_NO;
|
||||
return MHD_queue_response(connection, code, response);
|
||||
@ -53,6 +58,7 @@ int mhd_respondf_internal(
|
||||
struct MHD_Connection *connection,
|
||||
int error,
|
||||
enum MHD_RequestTerminationCode code,
|
||||
const char *encoding,
|
||||
const char *format, ...) {
|
||||
|
||||
char *m;
|
||||
@ -72,7 +78,7 @@ int mhd_respondf_internal(
|
||||
if (r < 0)
|
||||
return respond_oom(connection);
|
||||
|
||||
return mhd_respond_internal(connection, code, m, r, MHD_RESPMEM_MUST_FREE);
|
||||
return mhd_respond_internal(connection, code, encoding, m, r, MHD_RESPMEM_MUST_FREE);
|
||||
}
|
||||
|
||||
#if HAVE_GNUTLS
|
||||
|
@ -65,29 +65,34 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) _printf_(2, 0);
|
||||
int mhd_respond_internal(
|
||||
struct MHD_Connection *connection,
|
||||
enum MHD_RequestTerminationCode code,
|
||||
const char *encoding,
|
||||
const char *buffer,
|
||||
size_t size,
|
||||
enum MHD_ResponseMemoryMode mode);
|
||||
|
||||
#define mhd_respond(connection, code, message) \
|
||||
mhd_respond_internal( \
|
||||
connection, code, \
|
||||
message "\n", \
|
||||
strlen(message) + 1, \
|
||||
#define mhd_respond_with_encoding(connection, code, encoding, message) \
|
||||
mhd_respond_internal( \
|
||||
(connection), (code), (encoding), \
|
||||
message "\n", \
|
||||
strlen(message) + 1, \
|
||||
MHD_RESPMEM_PERSISTENT)
|
||||
|
||||
#define mhd_respond(connection, code, message) \
|
||||
mhd_respond_with_encoding(connection, code, NULL, message) \
|
||||
|
||||
int mhd_respond_oom(struct MHD_Connection *connection);
|
||||
|
||||
int mhd_respondf_internal(
|
||||
struct MHD_Connection *connection,
|
||||
int error,
|
||||
enum MHD_RequestTerminationCode code,
|
||||
const char *format, ...) _printf_(4,5);
|
||||
const char *encoding,
|
||||
const char *format, ...) _printf_(5,6);
|
||||
|
||||
#define mhd_respondf(connection, error, code, format, ...) \
|
||||
mhd_respondf_internal( \
|
||||
connection, error, code, \
|
||||
format "\n", \
|
||||
#define mhd_respondf(connection, error, code, format, ...) \
|
||||
mhd_respondf_internal( \
|
||||
connection, error, code, NULL, \
|
||||
format "\n", \
|
||||
##__VA_ARGS__)
|
||||
|
||||
int check_permissions(struct MHD_Connection *connection, int *code, char **hostname);
|
||||
|
@ -97,7 +97,7 @@ rm -rf /var/log/journal/remote/*
|
||||
echo "$TEST_MESSAGE" | systemd-cat -t "$TEST_TAG"
|
||||
journalctl --sync
|
||||
|
||||
mkdir /run/systemd/remote-pki
|
||||
mkdir -p /run/systemd/remote-pki
|
||||
cat >/run/systemd/remote-pki/ca.conf <<EOF
|
||||
[ req ]
|
||||
prompt = no
|
||||
@ -228,3 +228,47 @@ chmod -R g+rwX /run/systemd/journal-remote-tls
|
||||
systemctl restart systemd-journal-upload
|
||||
timeout 10 bash -xec 'while [[ "$(systemctl show -P ActiveState systemd-journal-upload)" != failed ]]; do sleep 1; done'
|
||||
(! systemctl status systemd-journal-upload)
|
||||
|
||||
systemctl stop systemd-journal-upload
|
||||
systemctl stop systemd-journal-remote.{socket,service}
|
||||
rm -rf /var/log/journal/remote/*
|
||||
|
||||
# Let's test sending data with compression enabled
|
||||
for c in none xz lz4 zstd; do
|
||||
echo "$TEST_MESSAGE" | systemd-cat -t "$TEST_TAG"
|
||||
journalctl --sync
|
||||
|
||||
cat >/run/systemd/journal-remote.conf.d/99-test.conf <<EOF
|
||||
[Remote]
|
||||
SplitMode=host
|
||||
Compression=zstd xz
|
||||
Compression=lz4
|
||||
ServerKeyFile=/run/systemd/remote-pki/server.key
|
||||
ServerCertificateFile=/run/systemd/remote-pki/server.crt
|
||||
TrustedCertificateFile=/run/systemd/remote-pki/ca.crt
|
||||
EOF
|
||||
cat >/run/systemd/journal-upload.conf.d/99-test.conf <<EOF
|
||||
[Upload]
|
||||
URL=https://localhost:19532
|
||||
Compression=${c}:3
|
||||
ServerKeyFile=/run/systemd/remote-pki/client.key
|
||||
ServerCertificateFile=/run/systemd/remote-pki/client.crt
|
||||
TrustedCertificateFile=/run/systemd/remote-pki/ca.crt
|
||||
EOF
|
||||
systemd-analyze cat-config systemd/journal-remote.conf
|
||||
systemd-analyze cat-config systemd/journal-upload.conf
|
||||
|
||||
systemctl restart systemd-journal-remote.socket
|
||||
systemctl restart systemd-journal-upload
|
||||
timeout 15 bash -xec 'until systemctl -q is-active systemd-journal-remote.service; do sleep 1; done'
|
||||
systemctl status systemd-journal-{remote,upload}
|
||||
|
||||
# It may take a bit until the whole journal is transferred
|
||||
timeout 30 bash -xec "until journalctl --directory=/var/log/journal/remote --identifier='$TEST_TAG' --grep='$TEST_MESSAGE'; do sleep 1; done"
|
||||
|
||||
systemctl stop systemd-journal-upload
|
||||
systemctl stop systemd-journal-remote.{socket,service}
|
||||
rm -rf /var/log/journal/remote/*
|
||||
rm /run/systemd/journal-upload.conf.d/99-test.conf
|
||||
rm /run/systemd/journal-remote.conf.d/99-test.conf
|
||||
done
|
||||
|
Loading…
x
Reference in New Issue
Block a user