1
0
mirror of https://github.com/systemd/systemd.git synced 2025-01-19 14:04:03 +03:00

journal-remote: added batch max events, batch max size, batch timeout params

This commit is contained in:
Andrii Chubatiuk 2024-10-16 15:19:40 +03:00
parent 74e3f77ddc
commit 1f2e03ce63
4 changed files with 97 additions and 8 deletions

View File

@ -60,6 +60,30 @@
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchMaxBytes=</varname></term>
<listitem><para>Max payload size of journal entries batch in a single HTTP request.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchMaxEntries=</varname></term>
<listitem><para>Max number of journal entries in a batch in a single HTTP request.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>BatchTimeoutSec=</varname></term>
<listitem><para>Max time in seconds to wait since previous request before sending data.</para>
<xi:include href="version-info.xml" xpointer="v257"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>

View File

@ -255,23 +255,28 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
int r;
sd_journal *j;
size_t filled = 0;
size_t read_size = 0;
ssize_t w;
assert(nmemb <= SSIZE_MAX / size);
read_size = size * nmemb;
if (read_size > u->bytes_left)
read_size = u->bytes_left;
check_update_watchdog(u);
j = u->journal;
if (u->compression != COMPRESSION_NONE) {
compression_buffer = malloc_multiply(nmemb, size);
compression_buffer = malloc(read_size);
if (!compression_buffer) {
log_oom();
return CURL_READFUNC_ABORT;
}
}
while (j && filled < size * nmemb) {
while (j && filled < read_size && u->uploading) {
if (u->entry_state == ENTRY_DONE) {
r = sd_journal_next(j);
if (r < 0) {
@ -293,7 +298,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
u->entry_state = ENTRY_CURSOR;
}
w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u);
w = write_entry((compression_buffer ?: (char*) buf) + filled, read_size - filled, u);
if (w < 0)
return CURL_READFUNC_ABORT;
filled += w;
@ -307,11 +312,15 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
log_debug("Entry %zu (%s) has been uploaded.",
u->entries_sent, u->current_cursor);
u->entries_left--;
if (u->entries_left <= 1)
u->uploading = false;
}
if (filled > 0 && u->compression != COMPRESSION_NONE) {
size_t compressed_size = 0;
r = compress_blob(u->compression, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression_level);
r = compress_blob(u->compression, compression_buffer, filled, buf, read_size, &compressed_size, u->compression_level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zu bytes using Compression=%s, CompressionLevel=%d)",
filled, compression_to_string(u->compression), u->compression_level);

View File

@ -64,6 +64,9 @@ static const char *arg_save_state = NULL;
static usec_t arg_network_timeout_usec = USEC_INFINITY;
static Compression arg_compression = COMPRESSION_NONE;
static int arg_compression_level = -1;
static uint64_t arg_batch_max_bytes = UINT64_MAX;
static uint64_t arg_batch_max_entries = UINT64_MAX;
static usec_t arg_batch_timeout_usec = USEC_INFINITY;
STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
STATIC_DESTRUCTOR_REGISTER(arg_headers, strv_freep);
@ -205,6 +208,26 @@ fail:
return log_error_errno(r, "Failed to save state %s: %m", u->state_file);
}
static int refresh_timeout(Uploader *u) {
int r;
assert(u);
u->bytes_left = arg_batch_max_bytes;
u->entries_left = arg_batch_max_entries;
u->uploading = true;
if (arg_batch_timeout_usec != USEC_INFINITY) {
r = sd_event_source_set_time_relative(u->batch_timeout_event, arg_batch_timeout_usec);
if (r < 0)
return log_error_errno(r, "Failed to create a batch timer: %m");
r = sd_event_source_set_enabled(u->batch_timeout_event, SD_EVENT_ONESHOT);
if (r < 0)
return log_error_errno(r, "sd_event_source_set_enabled() failed: %m");
}
return 0;
}
static int load_cursor_state(Uploader *u) {
int r;
@ -229,6 +252,7 @@ int start_upload(Uploader *u,
size_t nmemb,
void *userdata),
void *data) {
int r;
CURLcode code;
assert(u);
@ -347,6 +371,10 @@ int start_upload(Uploader *u,
u->answer = mfree(u->answer);
}
r = refresh_timeout(u);
if (r < 0)
return r;
/* upload to this place */
code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
if (code)
@ -363,6 +391,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
int r;
Uploader *u = ASSERT_PTR(userp);
ssize_t n;
size_t read_size;
_cleanup_free_ char *compression_buffer = NULL;
assert(nmemb < SSIZE_MAX / size);
@ -372,16 +401,22 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
assert(!size_multiply_overflow(size, nmemb));
read_size = size * nmemb;
if (read_size > u->bytes_left)
read_size = u->bytes_left;
if (u->compression != COMPRESSION_NONE) {
compression_buffer = malloc_multiply(nmemb, size);
compression_buffer = malloc(read_size);
if (!compression_buffer) {
log_oom();
return CURL_READFUNC_ABORT;
}
}
n = read(u->input, compression_buffer ?: buf, size * nmemb);
log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n);
n = read(u->input, compression_buffer ?: buf, read_size);
log_debug("%s: allowed %zu, read %zd", __func__, read_size, n);
u->bytes_left -= n;
if (n > 0) {
if (u->compression == COMPRESSION_NONE)
@ -389,7 +424,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
size_t compressed_size;
r = compress_blob(u->compression, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression_level);
r = compress_blob(u->compression, compression_buffer, n, buf, read_size, &compressed_size, u->compression_level);
if (r < 0) {
log_error_errno(r, "Failed to compress %ld bytes using Compression=%s, CompressionLevel=%d): %m",
n, compression_to_string(u->compression), u->compression_level);
@ -471,6 +506,12 @@ static int open_file_for_upload(Uploader *u, const char *filename) {
return r;
}
static int finish_uploading(sd_event_source *s, uint64_t usec, void *userdata) {
Uploader *u = ASSERT_PTR(userdata);
u->uploading = false;
return 0;
}
static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
int r;
const char *host, *proto = "";
@ -516,6 +557,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
if (r < 0)
return log_error_errno(r, "Failed to install SIGINT/SIGTERM handlers: %m");
if (arg_batch_timeout_usec != USEC_INFINITY) {
r = sd_event_add_time_relative(u->event, &u->batch_timeout_event, CLOCK_MONOTONIC,
arg_batch_timeout_usec, 0, finish_uploading, u);
if (r < 0)
return log_error_errno(r, "Failed to start batch timer: %m");
}
(void) sd_watchdog_enabled(false, &u->watchdog_usec);
return load_cursor_state(u);
@ -534,6 +582,7 @@ static void destroy_uploader(Uploader *u) {
free(u->url);
u->input_event = sd_event_source_unref(u->input_event);
u->batch_timeout_event = sd_event_source_unref(u->batch_timeout_event);
close_fd_input(u);
close_journal_input(u);
@ -592,6 +641,9 @@ static int parse_config(void) {
{ "Upload", "Header", config_parse_header, 0, &arg_headers },
{ "Upload", "Compression", config_parse_compression, 0, &arg_compression },
{ "Upload", "CompressionLevel", config_parse_int, 0, &arg_compression_level },
{ "Upload", "BatchMaxBytes", config_parse_iec_size, 0, &arg_batch_max_bytes },
{ "Upload", "BatchMaxEntries", config_parse_iec_size, 0, &arg_batch_max_entries },
{ "Upload", "BatchTimeoutSec", config_parse_sec, 0, &arg_batch_timeout_usec },
{}
};

View File

@ -53,6 +53,10 @@ typedef struct Uploader {
/* general metrics */
const char *state_file;
uint64_t bytes_left;
uint64_t entries_left;
sd_event_source *batch_timeout_event;
size_t entries_sent;
char *last_cursor, *current_cursor;
usec_t watchdog_timestamp;