]> git.ipfire.org Git - thirdparty/rsync.git/commitdiff
feat: add threads to zstd compression
authorEthan Halsall <ethanhalsall11@augustana.edu>
Fri, 23 Dec 2022 22:22:44 +0000 (16:22 -0600)
committerAndrew Tridgell <andrew@tridgell.net>
Sat, 23 Aug 2025 07:13:49 +0000 (17:13 +1000)
compat.c
options.c
token.c

index 4ce8c6d010d8033838152b31a6b9c59d36381ee7..5b3bfff0c79b1ccaf105e17e518c5c59ed65bc92 100644 (file)
--- a/compat.c
+++ b/compat.c
@@ -52,6 +52,7 @@ extern int need_messages_from_generator;
 extern int delete_mode, delete_before, delete_during, delete_after;
 extern int do_compression;
 extern int do_compression_level;
+extern int do_compression_threads;
 extern int saw_stderr_opt;
 extern int msgs2stderr;
 extern char *shell_cmd;
index 578507c6e9cdcb8f76cec93de08ffaddf8279b70..d85293ecd813d28f39af1b4e4fad72a8d3253e8f 100644 (file)
--- a/options.c
+++ b/options.c
@@ -86,6 +86,7 @@ int sparse_files = 0;
 int preallocate_files = 0;
 int do_compression = 0;
 int do_compression_level = CLVL_NOT_SPECIFIED;
+int do_compression_threads = 0;
 int am_root = 0; /* 0 = normal, 1 = root, 2 = --super, -1 = --fake-super */
 int am_server = 0;
 int am_sender = 0;
@@ -754,6 +755,8 @@ static struct poptOption long_options[] = {
   {"compress-choice",  0,  POPT_ARG_STRING, &compress_choice, 0, 0, 0 },
   {"zc",               0,  POPT_ARG_STRING, &compress_choice, 0, 0, 0 },
   {"skip-compress",    0,  POPT_ARG_STRING, &skip_compress, 0, 0, 0 },
+  {"compress-threads", 0,  POPT_ARG_INT,    &do_compression_threads, 0, 0, 0 },
+  {"zt",               0,  POPT_ARG_INT,    &do_compression_threads, 0, 0, 0 },
   {"compress-level",   0,  POPT_ARG_INT,    &do_compression_level, 0, 0, 0 },
   {"zl",               0,  POPT_ARG_INT,    &do_compression_level, 0, 0, 0 },
   {0,                 'P', POPT_ARG_NONE,   0, 'P', 0, 0 },
diff --git a/token.c b/token.c
index c108b3af576e1a8c343b062e3552627ea7aa0373..91091be1f521f242de7f38d5329d592e7ac49032 100644 (file)
--- a/token.c
+++ b/token.c
@@ -33,6 +33,7 @@ extern int do_compression;
 extern int protocol_version;
 extern int module_id;
 extern int do_compression_level;
+extern int do_compression_threads;
 extern char *skip_compress;
 
 #ifndef Z_INSERT_ONLY
@@ -692,6 +693,8 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
                obuf = new_array(char, OBUF_SIZE);
 
                ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
+               ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_nbWorkers, do_compression_threads);
+
                zstd_out_buff.dst = obuf + 2;
 
                comp_init_done = 1;
@@ -729,12 +732,11 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
                zstd_in_buff.src = map_ptr(buf, offset, nb);
                zstd_in_buff.size = nb;
                zstd_in_buff.pos = 0;
-
+               
+               int finished;
                do {
-                       if (zstd_out_buff.size == 0) {
-                               zstd_out_buff.size = MAX_DATA_COUNT;
-                               zstd_out_buff.pos = 0;
-                       }
+                       zstd_out_buff.size = MAX_DATA_COUNT;
+                       zstd_out_buff.pos = 0;
 
                        /* File ended, flush */
                        if (token != -2)
@@ -752,20 +754,21 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
                         * state and send a smaller buffer so that the remote side can
                         * finish the file.
                         */
-                       if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
+                       finished = (flush == ZSTD_e_flush) ? (r == 0) : (zstd_in_buff.pos == zstd_in_buff.size);
+
+                       if (zstd_out_buff.pos != 0) {
                                n = zstd_out_buff.pos;
 
                                obuf[0] = DEFLATED_DATA + (n >> 8);
                                obuf[1] = n;
                                write_buf(f, obuf, n+2);
-
-                               zstd_out_buff.size = 0;
                        }
                        /*
                         * Loop while the input buffer isn't full consumed or the
                         * internal state isn't fully flushed.
                         */
-               } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
+               } while (!finished);
+
                flush_pending = token == -2;
        }