From f7a2b8a3fab21e476ef1a6cc9cc910397d2a49b1 Mon Sep 17 00:00:00 2001 From: Ethan Halsall Date: Fri, 23 Dec 2022 16:22:44 -0600 Subject: [PATCH] feat: add threads to zstd compression --- compat.c | 1 + options.c | 3 +++ token.c | 21 ++++++++++++--------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/compat.c b/compat.c index 4ce8c6d01..5b3bfff0c 100644 --- a/compat.c +++ b/compat.c @@ -52,6 +52,7 @@ extern int need_messages_from_generator; extern int delete_mode, delete_before, delete_during, delete_after; extern int do_compression; extern int do_compression_level; +extern int do_compression_threads; extern int saw_stderr_opt; extern int msgs2stderr; extern char *shell_cmd; diff --git a/options.c b/options.c index 578507c6e..d85293ecd 100644 --- a/options.c +++ b/options.c @@ -86,6 +86,7 @@ int sparse_files = 0; int preallocate_files = 0; int do_compression = 0; int do_compression_level = CLVL_NOT_SPECIFIED; +int do_compression_threads = 0; int am_root = 0; /* 0 = normal, 1 = root, 2 = --super, -1 = --fake-super */ int am_server = 0; int am_sender = 0; @@ -754,6 +755,8 @@ static struct poptOption long_options[] = { {"compress-choice", 0, POPT_ARG_STRING, &compress_choice, 0, 0, 0 }, {"zc", 0, POPT_ARG_STRING, &compress_choice, 0, 0, 0 }, {"skip-compress", 0, POPT_ARG_STRING, &skip_compress, 0, 0, 0 }, + {"compress-threads", 0, POPT_ARG_INT, &do_compression_threads, 0, 0, 0 }, + {"zt", 0, POPT_ARG_INT, &do_compression_threads, 0, 0, 0 }, {"compress-level", 0, POPT_ARG_INT, &do_compression_level, 0, 0, 0 }, {"zl", 0, POPT_ARG_INT, &do_compression_level, 0, 0, 0 }, {0, 'P', POPT_ARG_NONE, 0, 'P', 0, 0 }, diff --git a/token.c b/token.c index c108b3af5..91091be1f 100644 --- a/token.c +++ b/token.c @@ -33,6 +33,7 @@ extern int do_compression; extern int protocol_version; extern int module_id; extern int do_compression_level; +extern int do_compression_threads; extern char *skip_compress; #ifndef Z_INSERT_ONLY @@ -692,6 +693,8 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of obuf = new_array(char, OBUF_SIZE); ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level); + ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_nbWorkers, do_compression_threads); + zstd_out_buff.dst = obuf + 2; comp_init_done = 1; @@ -729,12 +732,11 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of zstd_in_buff.src = map_ptr(buf, offset, nb); zstd_in_buff.size = nb; zstd_in_buff.pos = 0; - + + int finished; do { - if (zstd_out_buff.size == 0) { - zstd_out_buff.size = MAX_DATA_COUNT; - zstd_out_buff.pos = 0; - } + zstd_out_buff.size = MAX_DATA_COUNT; + zstd_out_buff.pos = 0; /* File ended, flush */ if (token != -2) @@ -752,20 +754,21 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of * state and send a smaller buffer so that the remote side can * finish the file. */ - if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) { + finished = (flush == ZSTD_e_flush) ? (r == 0) : (zstd_in_buff.pos == zstd_in_buff.size); + + if (zstd_out_buff.pos != 0) { n = zstd_out_buff.pos; obuf[0] = DEFLATED_DATA + (n >> 8); obuf[1] = n; write_buf(f, obuf, n+2); - - zstd_out_buff.size = 0; } /* * Loop while the input buffer isn't full consumed or the * internal state isn't fully flushed. */ - } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0); + } while (!finished); + flush_pending = token == -2; } -- 2.47.3