]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
journal-remote: added compression, compression-level and content-encoding negotiation 34822/head
authorAndrii Chubatiuk <andrew.chubatiuk@gmail.com>
Sat, 2 Nov 2024 20:03:26 +0000 (22:03 +0200)
committerAndrii Chubatiuk <andrew.chubatiuk@gmail.com>
Fri, 7 Feb 2025 05:05:02 +0000 (07:05 +0200)
16 files changed:
man/journal-remote.conf.xml
man/journal-upload.conf.xml
src/basic/compress.c
src/basic/compress.h
src/journal-remote/journal-compression-util.c [new file with mode: 0644]
src/journal-remote/journal-compression-util.h [new file with mode: 0644]
src/journal-remote/journal-remote-main.c
src/journal-remote/journal-remote-parse.c
src/journal-remote/journal-remote-parse.h
src/journal-remote/journal-upload-journal.c
src/journal-remote/journal-upload.c
src/journal-remote/journal-upload.h
src/journal-remote/meson.build
src/journal-remote/microhttpd-util.c
src/journal-remote/microhttpd-util.h
test/units/TEST-04-JOURNAL.journal-remote.sh

index 01dc94de5579891abb07dcbe6c905222a999b547..34ef21a326d23508f8625013b8146ab2a85c31a0 100644 (file)
     [Remote] section:</para>
 
     <variablelist class='config-directives'>
+      <varlistentry>
+        <term><varname>Compression=</varname></term>
+
+        <listitem><para>Acceptable compression algorithms to be used by <command>systemd-journal-upload</command>. Compression algorithms are
+        used for <literal>Accept-Encoding</literal> header contruction with priorities set according to an order in configuration.
+        This parameter takes space separated list of compression algorithms. Example:
+        <programlisting>Compression=zstd lz4</programlisting>
+        This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared.
+        </para>
+
+        <xi:include href="version-info.xml" xpointer="v258"/></listitem>
+      </varlistentry>
+
       <varlistentry>
         <term><varname>Seal=</varname></term>
 
index 7792617e51cb47cf1ba074c3df84399dae1f99b8..4785ef1ab3097c16c238669e6ab96ced07d45176 100644 (file)
         <xi:include href="version-info.xml" xpointer="v232"/></listitem>
       </varlistentry>
 
+      <varlistentry>
+        <term><varname>Compression=</varname></term>
+
+        <listitem><para>Takes a space separated list of compression algorithms to be applied to logs data before sending.
+        Supported algorithms are <literal>none</literal>, <literal>zstd</literal>, <literal>xz</literal>,
+        or <literal>lz4</literal>. Optionally, each algorithm (except for <literal>none</literal>)
+        followed by a colon (<literal>:</literal>) and its compression level, for example <literal>zstd:4</literal>.
+        The compression level is expected to be a positive integer. This option can be specified multiple times.
+        If an empty string is assigned, then all previous assignments are cleared.
+        Defaults to unset, and data will not be compressed.</para>
+
+        <para>Example:
+        <programlisting>Compression=zstd:4 lz4:2</programlisting></para>
+
+        <para>Even when compression is enabled, the initial requests are sent without compression.
+        It becomes effective either if <literal>ForceCompression=</literal> is enabled,
+        or the server response contains <literal>Accept-Encoding</literal> headers with a list of
+        compression algorithms that contains one of the algorithms specified in this option.</para>
+
+        <xi:include href="version-info.xml" xpointer="v258"/></listitem>
+      </varlistentry>
+
+      <varlistentry>
+        <term><varname>ForceCompression=</varname></term>
+
+        <listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
+        Defaults to <literal>false</literal>.</para>
+
+        <xi:include href="version-info.xml" xpointer="v258"/></listitem>
+      </varlistentry>
+
       <varlistentry>
         <term><varname>ServerKeyFile=</varname></term>
 
index 7f30fe61dae461a5636f4302019c2d2e9b4da328..1d16f8603840ec0e378ffca1dd0ab13dd8085b5a 100644 (file)
@@ -122,7 +122,15 @@ static const char* const compression_table[_COMPRESSION_MAX] = {
         [COMPRESSION_ZSTD] = "ZSTD",
 };
 
+static const char* const compression_lowercase_table[_COMPRESSION_MAX] = {
+        [COMPRESSION_NONE] = "none",
+        [COMPRESSION_XZ]   = "xz",
+        [COMPRESSION_LZ4]  = "lz4",
+        [COMPRESSION_ZSTD] = "zstd",
+};
+
 DEFINE_STRING_TABLE_LOOKUP(compression, Compression);
+DEFINE_STRING_TABLE_LOOKUP(compression_lowercase, Compression);
 
 bool compression_supported(Compression c) {
         static const unsigned supported =
index 2f3ab2f39c972ae874b41d028b4089de77e19211..fd265bb02616ed7b393bb07219ab3cb51a76d7a9 100644 (file)
@@ -24,6 +24,8 @@ typedef enum Compression {
 
 const char* compression_to_string(Compression compression);
 Compression compression_from_string(const char *compression);
+const char* compression_lowercase_to_string(Compression compression);
+Compression compression_lowercase_from_string(const char *compression);
 
 bool compression_supported(Compression c);
 
diff --git a/src/journal-remote/journal-compression-util.c b/src/journal-remote/journal-compression-util.c
new file mode 100644 (file)
index 0000000..4def84b
--- /dev/null
@@ -0,0 +1,91 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include "extract-word.h"
+#include "journal-compression-util.h"
+#include "parse-util.h"
+
+void compression_args_clear(CompressionArgs *args) {
+        assert(args);
+        args->size = 0;
+        args->opts = mfree(args->opts);
+}
+
+int config_parse_compression(
+                const char *unit,
+                const char *filename,
+                unsigned line,
+                const char *section,
+                unsigned section_line,
+                const char *lvalue,
+                int ltype,
+                const char *rvalue,
+                void *data,
+                void *userdata) {
+
+        CompressionArgs *args = ASSERT_PTR(data);
+        bool parse_level = ltype;
+        int r;
+
+        assert(filename);
+        assert(lvalue);
+        assert(rvalue);
+
+        if (isempty(rvalue)) {
+                compression_args_clear(args);
+                return 1;
+        }
+
+        for (const char *p = rvalue;;) {
+                _cleanup_free_ char *algorithm = NULL, *word = NULL;
+                int level = -1;
+
+                r = extract_first_word(&p, &word, NULL, 0);
+                if (r < 0)
+                        return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
+                if (r == 0)
+                        return 1;
+
+                if (parse_level) {
+                        const char *q = word;
+                        r = extract_first_word(&q, &algorithm, ":", 0);
+                        if (r < 0)
+                                return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
+                        if (!isempty(q)) {
+                                r = safe_atoi(q, &level);
+                                if (r < 0) {
+                                        log_syntax(unit, LOG_WARNING, filename, line, r,
+                                                   "Compression level %s should be positive, ignoring.", q);
+                                        continue;
+                                }
+                        }
+                } else
+                        algorithm = TAKE_PTR(word);
+
+                Compression c = compression_lowercase_from_string(algorithm);
+                if (c < 0 || !compression_supported(c)) {
+                        log_syntax(unit, LOG_WARNING, filename, line, c,
+                                   "Compression=%s is not supported on a system, ignoring.", algorithm);
+                        continue;
+                }
+
+                bool found = false;
+                FOREACH_ARRAY(opt, args->opts, args->size)
+                        if (opt->algorithm == c) {
+                                found = true;
+                                if (parse_level)
+                                        opt->level = level;
+                                break;
+                        }
+
+                if (found)
+                        continue;
+
+                if (!GREEDY_REALLOC(args->opts, args->size + 1))
+                        return log_oom();
+
+                args->opts[args->size++] = (CompressionOpts) {
+                        .algorithm = c,
+                        .level = level,
+                };
+        }
+}
diff --git a/src/journal-remote/journal-compression-util.h b/src/journal-remote/journal-compression-util.h
new file mode 100644 (file)
index 0000000..785ede6
--- /dev/null
@@ -0,0 +1,19 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+#include "compress.h"
+#include "conf-parser.h"
+
+typedef struct CompressionOpts {
+        Compression algorithm;
+        int level;
+} CompressionOpts;
+
+typedef struct CompressionArgs {
+        CompressionOpts *opts;
+        size_t size;
+} CompressionArgs;
+
+CONFIG_PARSER_PROTOTYPE(config_parse_compression);
+
+void compression_args_clear(CompressionArgs *args);
index c48b7df2014e45134d7049c3d3d27bf7df0a515c..3c780b8f76235a1fe78c5e6e8f34352fa5f6b669 100644 (file)
@@ -11,6 +11,7 @@
 #include "daemon-util.h"
 #include "fd-util.h"
 #include "fileio.h"
+#include "journal-compression-util.h"
 #include "journal-remote-write.h"
 #include "journal-remote.h"
 #include "logs-show.h"
@@ -37,6 +38,7 @@ static const char *arg_getter = NULL;
 static const char *arg_listen_raw = NULL;
 static const char *arg_listen_http = NULL;
 static const char *arg_listen_https = NULL;
+static CompressionArgs arg_compression = {};
 static char **arg_files = NULL; /* Do not free this. */
 static bool arg_compress = true;
 static bool arg_seal = false;
@@ -65,6 +67,7 @@ STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
 STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
 STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
 STATIC_DESTRUCTOR_REGISTER(arg_output, freep);
+STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
 
 static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = {
         [JOURNAL_WRITE_SPLIT_NONE] = "none",
@@ -152,6 +155,22 @@ static int dispatch_http_event(sd_event_source *event,
                                uint32_t revents,
                                void *userdata);
 
+static int build_accept_encoding(char **ret) {
+        assert(ret);
+
+        float q = 1.0, step = 1.0 / arg_compression.size;
+        _cleanup_free_ char *buf = NULL;
+        FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
+                const char *c = compression_lowercase_to_string(opt->algorithm);
+                if (strextendf_with_separator(&buf, ",", "%s;q=%.1f", c, q) < 0)
+                        return -ENOMEM;
+                q -= step;
+        }
+
+        *ret = TAKE_PTR(buf);
+        return 0;
+}
+
 static int request_meta(void **connection_cls, int fd, char *hostname) {
         RemoteSource *source;
         Writer *writer;
@@ -174,6 +193,11 @@ static int request_meta(void **connection_cls, int fd, char *hostname) {
 
         log_debug("Added RemoteSource as connection metadata %p", source);
 
+        r = build_accept_encoding(&source->encoding);
+        if (r < 0)
+                return log_oom();
+
+        source->compression = COMPRESSION_NONE;
         *connection_cls = source;
         return 0;
 }
@@ -212,8 +236,17 @@ static int process_http_upload(
         if (*upload_data_size) {
                 log_trace("Received %zu bytes", *upload_data_size);
 
-                r = journal_importer_push_data(&source->importer,
-                                               upload_data, *upload_data_size);
+                if (source->compression != COMPRESSION_NONE) {
+                        _cleanup_free_ char *buf = NULL;
+                        size_t buf_size;
+
+                        r = decompress_blob(source->compression, upload_data, *upload_data_size, (void **) &buf, &buf_size, 0);
+                        if (r < 0)
+                                return mhd_respondf(connection, r, MHD_HTTP_BAD_REQUEST, "Decompression of received blob falied.");
+
+                        r = journal_importer_push_data(&source->importer, buf, buf_size);
+                } else
+                        r = journal_importer_push_data(&source->importer, upload_data, *upload_data_size);
                 if (r < 0)
                         return mhd_respond_oom(connection);
 
@@ -253,7 +286,7 @@ static int process_http_upload(
                                     remaining);
         }
 
-        return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.");
+        return mhd_respond_with_encoding(connection, MHD_HTTP_ACCEPTED, source->encoding, "OK.");
 };
 
 static mhd_result request_handler(
@@ -278,10 +311,20 @@ static mhd_result request_handler(
 
         log_trace("Handling a connection %s %s %s", method, url, version);
 
-        if (*connection_cls)
+        if (*connection_cls) {
+                RemoteSource *source = *connection_cls;
+                header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Encoding");
+                if (header) {
+                        Compression c = compression_lowercase_from_string(header);
+                        if (c < 0 || !compression_supported(c))
+                                return mhd_respondf(connection, 0, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
+                                                    "Unsupported Content-Encoding type: %s", header);
+                        source->compression = c;
+                }
                 return process_http_upload(connection,
                                            upload_data, upload_data_size,
-                                           *connection_cls);
+                                           source);
+        }
 
         if (!streq(method, "POST"))
                 return mhd_respond(connection, MHD_HTTP_NOT_ACCEPTABLE, "Unsupported method.");
@@ -722,6 +765,7 @@ static int parse_config(void) {
                 { "Remote",  "MaxFileSize",            config_parse_iec_uint64,       0, &arg_max_size    },
                 { "Remote",  "MaxFiles",               config_parse_uint64,           0, &arg_n_max_files },
                 { "Remote",  "KeepFree",               config_parse_iec_uint64,       0, &arg_keep_free   },
+                { "Remote",  "Compression",            config_parse_compression,      0, &arg_compression },
                 {}
         };
 
index e23012c472e171888d483cbebabfeeae87e69dc2..d743b217b011a10c5da9ac93cf9ffa64db8135e6 100644 (file)
@@ -18,6 +18,7 @@ void source_free(RemoteSource *source) {
         sd_event_source_unref(source->event);
         sd_event_source_unref(source->buffer_event);
 
+        free(source->encoding);
         free(source);
 }
 
index 703035b1ecbb43df5828eb4d470443c6cfeeda63..89d30b8721b444dbd61c7569c61b31cf6bb12ac0 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "sd-event.h"
 
+#include "compress.h"
 #include "journal-importer.h"
 #include "journal-remote-write.h"
 
@@ -13,6 +14,8 @@ typedef struct RemoteSource {
 
         sd_event_source *event;
         sd_event_source *buffer_event;
+        Compression compression;
+        char *encoding;
 } RemoteSource;
 
 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
index 23ad3b2d3121b0940a55d968efc52edbd2f5b30a..ecb323b21796393b33af428859779ff3f013567c 100644 (file)
@@ -251,6 +251,7 @@ static void check_update_watchdog(Uploader *u) {
 
 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
         Uploader *u = ASSERT_PTR(userp);
+        _cleanup_free_ char *compression_buffer = NULL;
         int r;
         sd_journal *j;
         size_t filled = 0;
@@ -262,6 +263,14 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
 
         j = u->journal;
 
+        if (u->compression.algorithm != COMPRESSION_NONE) {
+                compression_buffer = malloc_multiply(nmemb, size);
+                if (!compression_buffer) {
+                        log_oom();
+                        return CURL_READFUNC_ABORT;
+                }
+        }
+
         while (j && filled < size * nmemb) {
                 if (u->entry_state == ENTRY_DONE) {
                         r = sd_journal_next(j);
@@ -284,7 +293,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
                         u->entry_state = ENTRY_CURSOR;
                 }
 
-                w = write_entry((char*)buf + filled, size * nmemb - filled, u);
+                w = write_entry((compression_buffer ?: (char*) buf) + filled, size * nmemb - filled, u);
                 if (w < 0)
                         return CURL_READFUNC_ABORT;
                 filled += w;
@@ -300,6 +309,19 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
                           u->entries_sent, u->current_cursor);
         }
 
+        if (filled > 0 && u->compression.algorithm != COMPRESSION_NONE) {
+                size_t compressed_size;
+                r = compress_blob(u->compression.algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression.level);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to compress %zu bytes (Compression=%s, Level=%d): %m",
+                                        filled, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
+                        return CURL_READFUNC_ABORT;
+                }
+
+                assert(compressed_size <= size * nmemb);
+                return compressed_size;
+        }
+
         return filled;
 }
 
index c702b00806ec192aede95083b3fbaf0fd1af9680..ca891a4e00e61fd5993f5874fb2f164b6dcfa422 100644 (file)
@@ -58,8 +58,11 @@ static bool arg_merge = false;
 static int arg_follow = -1;
 static const char *arg_save_state = NULL;
 static usec_t arg_network_timeout_usec = USEC_INFINITY;
+static CompressionArgs arg_compression = {};
+static bool arg_force_compression = false;
 
 STATIC_DESTRUCTOR_REGISTER(arg_file, strv_freep);
+STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
 
 static void close_fd_input(Uploader *u);
 
@@ -203,6 +206,17 @@ int start_upload(Uploader *u,
                         return log_oom();
                 h = l;
 
+                if (u->compression.algorithm != COMPRESSION_NONE) {
+                        _cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
+                        if (!header)
+                                return log_oom();
+
+                        l = curl_slist_append(h, header);
+                        if (!l)
+                                return log_oom();
+                        h = l;
+                }
+
                 u->header = TAKE_PTR(h);
         }
 
@@ -292,8 +306,10 @@ int start_upload(Uploader *u,
 }
 
 static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
+        _cleanup_free_ char *compression_buffer = NULL;
         Uploader *u = ASSERT_PTR(userp);
         ssize_t n;
+        int r;
 
         assert(nmemb < SSIZE_MAX / size);
 
@@ -302,17 +318,35 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
 
         assert(!size_multiply_overflow(size, nmemb));
 
-        n = read(u->input, buf, size * nmemb);
-        log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n);
-        if (n > 0)
-                return n;
+        if (u->compression.algorithm != COMPRESSION_NONE) {
+                compression_buffer = malloc_multiply(nmemb, size);
+                if (!compression_buffer) {
+                        log_oom();
+                        return CURL_READFUNC_ABORT;
+                }
+        }
+
+        n = read(u->input, compression_buffer ?: buf, size * nmemb);
+        if (n > 0) {
+                log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n);
+                if (u->compression.algorithm == COMPRESSION_NONE)
+                        return n;
 
-        u->uploading = false;
-        if (n < 0) {
+                size_t compressed_size;
+                r = compress_blob(u->compression.algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression.level);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to compress %zd bytes using (Compression=%s, Level=%d): %m",
+                                        n, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
+                        return CURL_READFUNC_ABORT;
+                }
+                assert(compressed_size <= size * nmemb);
+                return compressed_size;
+        } else if (n < 0) {
                 log_error_errno(errno, "Aborting transfer after read error on input: %m.");
                 return CURL_READFUNC_ABORT;
         }
 
+        u->uploading = false;
         log_debug("Reached EOF");
         close_fd_input(u);
         return 0;
@@ -389,8 +423,13 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
 
         *u = (Uploader) {
                 .input = -1,
+                .compression.algorithm = COMPRESSION_NONE,
+                .compression.level = -1,
         };
 
+        if (arg_force_compression && arg_compression.size > 0)
+                u->compression = arg_compression.opts[0];
+
         host = STARTSWITH_SET(url, "http://", "https://");
         if (!host) {
                 host = url;
@@ -448,6 +487,66 @@ static void destroy_uploader(Uploader *u) {
         sd_event_unref(u->event);
 }
 
+#if LIBCURL_VERSION_NUM >= 0x075300
+static int update_content_encoding(Uploader *u, const char *accept_encoding) {
+        int r;
+
+        assert(u);
+
+        for (const char *p = accept_encoding;;) {
+                _cleanup_free_ char *encoding_value = NULL, *alg = NULL;
+                Compression algorithm;
+                CURLcode code;
+
+                r = extract_first_word(&p, &encoding_value, ",", 0);
+                if (r < 0)
+                        return log_error_errno(r, "Failed to extract Accept-Encoding header value: %m");
+                if (r == 0)
+                        return 0;
+
+                const char *q = encoding_value;
+                r = extract_first_word(&q, &alg, ";", 0);
+                if (r < 0)
+                        return log_error_errno(r, "Failed to extract compression algorithm from Accept-Encoding header: %m");
+
+                algorithm = compression_lowercase_from_string(alg);
+                if (algorithm <= 0 || !compression_supported(algorithm)) {
+                        continue;
+                }
+
+                FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
+                        if (opt->algorithm != algorithm)
+                                continue;
+
+                        _cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
+                        if (!header)
+                                return log_oom();
+
+                        /* First, update existing Content-Encoding header. */
+                        bool found = false;
+                        for (struct curl_slist *l = u->header; l; l = l->next)
+                                if (startswith(l->data, "Content-Encoding:")) {
+                                        free_and_replace(l->data, header);
+                                        found = true;
+                                        break;
+                                }
+
+                        /* If Content-Encoding header is not found, append new one. */
+                        if (!found) {
+                                struct curl_slist *l = curl_slist_append(u->header, header);
+                                if (!l)
+                                        return log_oom();
+                                u->header = l;
+                        }
+
+                        easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL);
+                        u->compression = *opt;
+                        return 0;
+                }
+        }
+}
+#endif
+
 static int perform_upload(Uploader *u) {
         CURLcode code;
         long status;
@@ -480,9 +579,25 @@ static int perform_upload(Uploader *u) {
                 return log_error_errno(SYNTHETIC_ERRNO(EIO),
                                        "Upload to %s finished with unexpected code %ld: %s",
                                        u->url, status, strna(u->answer));
-        else
+        else {
+#if LIBCURL_VERSION_NUM >= 0x075300
+                int r;
+                if (u->compression.algorithm == COMPRESSION_NONE) {
+                        struct curl_header *encoding_header;
+                        CURLHcode hcode;
+
+                        hcode = curl_easy_header(u->easy, "Accept-Encoding", 0, CURLH_HEADER, -1, &encoding_header);
+                        if (hcode == CURLHE_OK && encoding_header && encoding_header->value) {
+                                r = update_content_encoding(u, encoding_header->value);
+                                if (r < 0)
+                                        return r;
+                        }
+                }
+#endif
+
                 log_debug("Upload finished successfully with code %ld: %s",
                           status, strna(u->answer));
+        }
 
         free_and_replace(u->last_cursor, u->current_cursor);
 
@@ -496,6 +611,8 @@ static int parse_config(void) {
                 { "Upload",  "ServerCertificateFile",  config_parse_path_or_ignore, 0,                        &arg_cert                 },
                 { "Upload",  "TrustedCertificateFile", config_parse_path_or_ignore, 0,                        &arg_trust                },
                 { "Upload",  "NetworkTimeoutSec",      config_parse_sec,            0,                        &arg_network_timeout_usec },
+                { "Upload",  "Compression",            config_parse_compression,    true,                     &arg_compression          },
+                { "Upload",  "ForceCompression",       config_parse_bool,           0,                        &arg_force_compression    },
                 {}
         };
 
index fe6abb75a9e5427be7e016841675cf4dda64fdb2..95f79a26383afb1b11d5c6977359fa31473cdd72 100644 (file)
@@ -7,6 +7,7 @@
 #include "sd-event.h"
 #include "sd-journal.h"
 
+#include "journal-compression-util.h"
 #include "time-util.h"
 
 typedef enum {
@@ -53,6 +54,7 @@ typedef struct Uploader {
         char *last_cursor, *current_cursor;
         usec_t watchdog_timestamp;
         usec_t watchdog_usec;
+        CompressionOpts compression;
 } Uploader;
 
 #define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)
index 10a82751d77f5b361f7d75a011e641f4f380c5bc..0f3a91a621dc11faa29cca6bfb0d7368c2b87959 100644 (file)
@@ -1,11 +1,13 @@
 # SPDX-License-Identifier: LGPL-2.1-or-later
 
 systemd_journal_upload_sources = files(
+        'journal-compression-util.c',
         'journal-upload-journal.c',
         'journal-upload.c',
 )
 
 libsystemd_journal_remote_sources = files(
+        'journal-compression-util.c',
         'journal-remote-parse.c',
         'journal-remote-write.c',
         'journal-remote.c',
index 0d5997a03fb6a4c5ae1c321915fbe9ad12c98dcf..9f68cc65ca3e16cae69e3f1ff424971285affc34 100644 (file)
@@ -28,6 +28,7 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) {
 int mhd_respond_internal(
                 struct MHD_Connection *connection,
                 enum MHD_RequestTerminationCode code,
+                const char *encoding,
                 const char *buffer,
                 size_t size,
                 enum MHD_ResponseMemoryMode mode) {
@@ -40,6 +41,10 @@ int mhd_respond_internal(
                 return MHD_NO;
 
         log_debug("Queueing response %u: %s", code, buffer);
+        if (encoding)
+                if (MHD_add_response_header(response, "Accept-Encoding", encoding) == MHD_NO)
+                        return MHD_NO;
+
         if (MHD_add_response_header(response, "Content-Type", "text/plain") == MHD_NO)
                 return MHD_NO;
         return MHD_queue_response(connection, code, response);
@@ -53,6 +58,7 @@ int mhd_respondf_internal(
                 struct MHD_Connection *connection,
                 int error,
                 enum MHD_RequestTerminationCode code,
+                const char *encoding,
                 const char *format, ...) {
 
         char *m;
@@ -72,7 +78,7 @@ int mhd_respondf_internal(
         if (r < 0)
                 return respond_oom(connection);
 
-        return mhd_respond_internal(connection, code, m, r, MHD_RESPMEM_MUST_FREE);
+        return mhd_respond_internal(connection, code, encoding, m, r, MHD_RESPMEM_MUST_FREE);
 }
 
 #if HAVE_GNUTLS
index 309c39aab08917e2059d18214c815c62844d096b..bb888e8aedad1705686d367fe68a011d32d0a6b9 100644 (file)
@@ -65,29 +65,34 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) _printf_(2, 0);
 int mhd_respond_internal(
                 struct MHD_Connection *connection,
                 enum MHD_RequestTerminationCode code,
+                const char *encoding,
                 const char *buffer,
                 size_t size,
                 enum MHD_ResponseMemoryMode mode);
 
-#define mhd_respond(connection, code, message)                  \
-        mhd_respond_internal(                                   \
-             connection, code,                                  \
-             message "\n",                                      \
-             strlen(message) + 1,                               \
+#define mhd_respond_with_encoding(connection, code, encoding, message)   \
+        mhd_respond_internal(                                            \
+             (connection), (code), (encoding),                           \
+             message "\n",                                               \
+             strlen(message) + 1,                                        \
              MHD_RESPMEM_PERSISTENT)
 
+#define mhd_respond(connection, code, message)                     \
+        mhd_respond_with_encoding(connection, code, NULL, message) \
+
 int mhd_respond_oom(struct MHD_Connection *connection);
 
 int mhd_respondf_internal(
                 struct MHD_Connection *connection,
                 int error,
                 enum MHD_RequestTerminationCode code,
-                const char *format, ...) _printf_(4,5);
+                const char *encoding,
+                const char *format, ...) _printf_(5,6);
 
-#define mhd_respondf(connection, error, code, format, ...)      \
-        mhd_respondf_internal(                                  \
-                connection, error, code,                        \
-                format "\n",                                    \
+#define mhd_respondf(connection, error, code, format, ...)   \
+        mhd_respondf_internal(                               \
+                connection, error, code, NULL,               \
+                format "\n",                                 \
                 ##__VA_ARGS__)
 
 int check_permissions(struct MHD_Connection *connection, int *code, char **hostname);
index c7b99b11fbb7ffdcab8f00740168a5c879163e32..df39a50b049443c5bb30e4c03a37cc91f3294c2d 100755 (executable)
@@ -97,7 +97,7 @@ rm -rf /var/log/journal/remote/*
 echo "$TEST_MESSAGE" | systemd-cat -t "$TEST_TAG"
 journalctl --sync
 
-mkdir /run/systemd/remote-pki
+mkdir -p /run/systemd/remote-pki
 cat >/run/systemd/remote-pki/ca.conf <<EOF
 [ req ]
 prompt = no
@@ -228,3 +228,47 @@ chmod -R g+rwX /run/systemd/journal-remote-tls
 systemctl restart systemd-journal-upload
 timeout 10 bash -xec 'while [[ "$(systemctl show -P ActiveState systemd-journal-upload)" != failed ]]; do sleep 1; done'
 (! systemctl status systemd-journal-upload)
+
+systemctl stop systemd-journal-upload
+systemctl stop systemd-journal-remote.{socket,service}
+rm -rf /var/log/journal/remote/*
+
+# Let's test sending data with compression enabled
+for c in none xz lz4 zstd; do
+    echo "$TEST_MESSAGE" | systemd-cat -t "$TEST_TAG"
+    journalctl --sync
+
+    cat >/run/systemd/journal-remote.conf.d/99-test.conf <<EOF
+[Remote]
+SplitMode=host
+Compression=zstd xz
+Compression=lz4
+ServerKeyFile=/run/systemd/remote-pki/server.key
+ServerCertificateFile=/run/systemd/remote-pki/server.crt
+TrustedCertificateFile=/run/systemd/remote-pki/ca.crt
+EOF
+    cat >/run/systemd/journal-upload.conf.d/99-test.conf <<EOF
+[Upload]
+URL=https://localhost:19532
+Compression=${c}:3
+ServerKeyFile=/run/systemd/remote-pki/client.key
+ServerCertificateFile=/run/systemd/remote-pki/client.crt
+TrustedCertificateFile=/run/systemd/remote-pki/ca.crt
+EOF
+    systemd-analyze cat-config systemd/journal-remote.conf
+    systemd-analyze cat-config systemd/journal-upload.conf
+
+    systemctl restart systemd-journal-remote.socket
+    systemctl restart systemd-journal-upload
+    timeout 15 bash -xec 'until systemctl -q is-active systemd-journal-remote.service; do sleep 1; done'
+    systemctl status systemd-journal-{remote,upload}
+
+    # It may take a bit until the whole journal is transferred
+    timeout 30 bash -xec "until journalctl --directory=/var/log/journal/remote --identifier='$TEST_TAG' --grep='$TEST_MESSAGE'; do sleep 1; done"
+
+    systemctl stop systemd-journal-upload
+    systemctl stop systemd-journal-remote.{socket,service}
+    rm -rf /var/log/journal/remote/*
+    rm /run/systemd/journal-upload.conf.d/99-test.conf
+    rm /run/systemd/journal-remote.conf.d/99-test.conf
+done