]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Allow external creation of POOLs that can be shared. 2328/head
authorMartin Liska <mliska@suse.cz>
Fri, 25 Sep 2020 12:12:14 +0000 (14:12 +0200)
committerMartin Liska <mliska@suse.cz>
Wed, 7 Oct 2020 10:44:33 +0000 (12:44 +0200)
examples/streaming_compression_thread_pool.c [new file with mode: 0644]
lib/common/pool.c
lib/compress/zstd_compress.c
lib/compress/zstd_compress_internal.h
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
lib/zstd.h

diff --git a/examples/streaming_compression_thread_pool.c b/examples/streaming_compression_thread_pool.c
new file mode 100644 (file)
index 0000000..22c3b2e
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2020, Martin Liska, SUSE, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+
+#include <stdio.h>     // printf
+#include <stdlib.h>    // free
+#include <string.h>    // memset, strcat, strlen
+#include <zstd.h>      // presumes zstd library is installed
+#include "common.h"    // Helper functions, CHECK(), and CHECK_ZSTD()
+#include <pthread.h>
+
+typedef struct compress_args
+{
+  const char *fname;
+  char *outName;
+  int cLevel;
+#if defined(ZSTD_STATIC_LINKING_ONLY)
+  ZSTD_threadPool *pool;
+#endif
+} compress_args_t;
+
+static void *compressFile_orDie(void *data)
+{
+    compress_args_t *args = (compress_args_t *)data;
+    fprintf (stderr, "Starting compression of %s with level %d\n", args->fname, args->cLevel);
+    /* Open the input and output files. */
+    FILE* const fin  = fopen_orDie(args->fname, "rb");
+    FILE* const fout = fopen_orDie(args->outName, "wb");
+    /* Create the input and output buffers.
+     * They may be any size, but we recommend using these functions to size them.
+     * Performance will only suffer significantly for very tiny buffers.
+     */
+    size_t const buffInSize = ZSTD_CStreamInSize();
+    void*  const buffIn  = malloc_orDie(buffInSize);
+    size_t const buffOutSize = ZSTD_CStreamOutSize();
+    void*  const buffOut = malloc_orDie(buffOutSize);
+
+    /* Create the context. */
+    ZSTD_CCtx* const cctx = ZSTD_createCCtx();
+    CHECK(cctx != NULL, "ZSTD_createCCtx() failed!");
+
+#if defined(ZSTD_STATIC_LINKING_ONLY)
+    size_t r = ZSTD_CCtx_refThreadPool(cctx, args->pool);
+    CHECK(r == 0, "ZSTD_CCtx_refThreadPool failed!");
+#endif
+
+    /* Set any parameters you want.
+     * Here we set the compression level, and enable the checksum.
+     */
+    CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, args->cLevel) );
+    CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1) );
+    ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, 16);
+
+    /* This loop read from the input file, compresses that entire chunk,
+     * and writes all output produced to the output file.
+     */
+    size_t const toRead = buffInSize;
+    for (;;) {
+        size_t read = fread_orDie(buffIn, toRead, fin);
+        /* Select the flush mode.
+         * If the read may not be finished (read == toRead) we use
+         * ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
+         * Zstd optimizes the case where the first flush mode is ZSTD_e_end,
+         * since it knows it is compressing the entire source in one pass.
+         */
+        int const lastChunk = (read < toRead);
+        ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
+        /* Set the input buffer to what we just read.
+         * We compress until the input buffer is empty, each time flushing the
+         * output.
+         */
+        ZSTD_inBuffer input = { buffIn, read, 0 };
+        int finished;
+        do {
+            /* Compress into the output buffer and write all of the output to
+             * the file so we can reuse the buffer next iteration.
+             */
+            ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
+            size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode);
+            CHECK_ZSTD(remaining);
+            fwrite_orDie(buffOut, output.pos, fout);
+            /* If we're on the last chunk we're finished when zstd returns 0,
+             * which means its consumed all the input AND finished the frame.
+             * Otherwise, we're finished when we've consumed all the input.
+             */
+            finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
+        } while (!finished);
+        CHECK(input.pos == input.size,
+              "Impossible: zstd only returns 0 when the input is completely consumed!");
+
+        if (lastChunk) {
+            break;
+        }
+    }
+
+    fprintf (stderr, "Finishing compression of %s\n", args->outName);
+
+    ZSTD_freeCCtx(cctx);
+    fclose_orDie(fout);
+    fclose_orDie(fin);
+    free(buffIn);
+    free(buffOut);
+    free(args->outName);
+
+    return NULL;
+}
+
+
+static char* createOutFilename_orDie(const char* filename)
+{
+    size_t const inL = strlen(filename);
+    size_t const outL = inL + 5;
+    void* const outSpace = malloc_orDie(outL);
+    memset(outSpace, 0, outL);
+    strcat(outSpace, filename);
+    strcat(outSpace, ".zst");
+    return (char*)outSpace;
+}
+
+int main(int argc, const char** argv)
+{
+    const char* const exeName = argv[0];
+
+    if (argc<=3) {
+        printf("wrong arguments\n");
+        printf("usage:\n");
+        printf("%s POOL_SIZE LEVEL FILES\n", exeName);
+        return 1;
+    }
+
+    int pool_size = atoi (argv[1]);
+    CHECK(pool_size != 0, "can't parse POOL_SIZE!");
+
+    int level = atoi (argv[2]);
+    CHECK(level != 0, "can't parse LEVEL!");
+
+    argc -= 3;
+    argv += 3;
+
+#if defined(ZSTD_STATIC_LINKING_ONLY)
+    ZSTD_threadPool *pool = ZSTD_createThreadPool (pool_size);
+    CHECK(pool != NULL, "ZSTD_createThreadPool() failed!");
+    fprintf (stderr, "Using shared thread pool of size %d\n", pool_size);
+#else
+    fprintf (stderr, "All threads use its own thread pool\n");
+#endif
+
+    pthread_t *threads = malloc_orDie(argc * sizeof(pthread_t));
+    compress_args_t *args = malloc_orDie(argc * sizeof(compress_args_t));
+
+    for (unsigned i = 0; i < argc; i++)
+    {
+      args[i].fname = argv[i];
+      args[i].outName = createOutFilename_orDie(args[i].fname);
+      args[i].cLevel = level;
+#if defined(ZSTD_STATIC_LINKING_ONLY)
+      args[i].pool = pool;
+#endif
+
+      pthread_create (&threads[i], NULL, compressFile_orDie, &args[i]);
+    }
+
+    for (unsigned i = 0; i < argc; i++)
+      pthread_join (threads[i], NULL);
+
+#if defined(ZSTD_STATIC_LINKING_ONLY)
+    ZSTD_freeThreadPool (pool);
+#endif
+
+    return 0;
+}
index 5def2bd556cfd46e43e7cbd2dd3210c2439674f3..4c1b83376f4e7c7a16ae844e282125575469cb21 100644 (file)
@@ -105,6 +105,10 @@ static void* POOL_thread(void* opaque) {
     assert(0);  /* Unreachable */
 }
 
+POOL_ctx* ZSTD_createThreadPool(size_t numThreads) {
+    return POOL_create (numThreads, 0);
+}
+
 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
 }
@@ -184,7 +188,9 @@ void POOL_free(POOL_ctx *ctx) {
     ZSTD_customFree(ctx, ctx->customMem);
 }
 
-
+void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
+  POOL_free (pool);
+}
 
 size_t POOL_sizeof(POOL_ctx *ctx) {
     if (ctx==NULL) return 0;  /* supports sizeof NULL */
index fd47ccd3f60f8baac88d3eaf0ff1e500cf1f1e32..26bf1e12b1296e9dfbca04a853b98c008f4bcf0b 100644 (file)
@@ -958,6 +958,14 @@ size_t ZSTD_CCtx_refCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict)
     return 0;
 }
 
+size_t ZSTD_CCtx_refThreadPool(ZSTD_CCtx* cctx, ZSTD_threadPool* pool)
+{
+    RETURN_ERROR_IF(cctx->streamStage != zcss_init, stage_wrong,
+                    "Can't ref a pool when ctx not in init stage.");
+    cctx->pool = pool;
+    return 0;
+}
+
 size_t ZSTD_CCtx_refPrefix(ZSTD_CCtx* cctx, const void* prefix, size_t prefixSize)
 {
     return ZSTD_CCtx_refPrefix_advanced(cctx, prefix, prefixSize, ZSTD_dct_rawContent);
@@ -4115,7 +4123,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
             if (cctx->mtctx == NULL) {
                 DEBUGLOG(4, "ZSTD_compressStream2: creating new mtctx for nbWorkers=%u",
                             params.nbWorkers);
-                cctx->mtctx = ZSTDMT_createCCtx_advanced((U32)params.nbWorkers, cctx->customMem);
+                cctx->mtctx = ZSTDMT_createCCtx_advanced((U32)params.nbWorkers, cctx->customMem, cctx->pool);
                 RETURN_ERROR_IF(cctx->mtctx == NULL, memory_allocation, "NULL pointer!");
             }
             /* mt compression */
index b161a208cf0c37300440decfac969452ba73d9b2..b00366da2b13ef7e841514963f52ce22ccd92e64 100644 (file)
@@ -255,6 +255,7 @@ struct ZSTD_CCtx_s {
     unsigned long long producedCSize;
     XXH64_state_t xxhState;
     ZSTD_customMem customMem;
+    ZSTD_threadPool* pool;
     size_t staticSize;
     SeqCollector seqCollector;
     int isFirstBlock;
index baf6ef4ca6da2f472191773d49f343ded5ee392f..b809d602e616f18767fa2a509417c05d1f5dd92b 100644 (file)
@@ -831,6 +831,7 @@ struct ZSTDMT_CCtx_s {
     ZSTD_customMem cMem;
     ZSTD_CDict* cdictLocal;
     const ZSTD_CDict* cdict;
+    unsigned providedFactory: 1;
 };
 
 static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
@@ -889,7 +890,7 @@ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorker
     return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
 }
 
-MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem)
+MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
 {
     ZSTDMT_CCtx* mtctx;
     U32 nbJobs = nbWorkers + 2;
@@ -907,7 +908,14 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
     ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
     mtctx->cMem = cMem;
     mtctx->allJobsCompleted = 1;
-    mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);
+    if (pool != NULL) {
+      mtctx->factory = pool;
+      mtctx->providedFactory = 1;
+    }
+    else {
+      mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);
+      mtctx->providedFactory = 0;
+    }
     mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
     assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */
     mtctx->jobIDMask = nbJobs - 1;
@@ -924,20 +932,21 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
     return mtctx;
 }
 
-ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
 {
 #ifdef ZSTD_MULTITHREAD
-    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem);
+    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem, pool);
 #else
     (void)nbWorkers;
     (void)cMem;
+    (void)pool;
     return NULL;
 #endif
 }
 
 ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
 {
-    return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
+    return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem, NULL);
 }
 
 
@@ -983,7 +992,8 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
 {
     if (mtctx==NULL) return 0;   /* compatible with free on NULL */
-    POOL_free(mtctx->factory);   /* stop and free worker threads */
+    if (!mtctx->providedFactory)
+        POOL_free(mtctx->factory);   /* stop and free worker threads */
     ZSTDMT_releaseAllJobResources(mtctx);  /* release job resources into pools first */
     ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
     ZSTDMT_freeBufferPool(mtctx->bufPool);
index 623fd97dff19f1ec342c1849e2614cea723242f8..8aad78c4f46a0f54f4d04b5fb36d55eb687a0d83 100644 (file)
@@ -60,7 +60,8 @@ typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
 ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers);
 /* Requires ZSTD_MULTITHREAD to be defined during compilation, otherwise it will return NULL. */
 ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
-                                                    ZSTD_customMem cMem);
+                                                    ZSTD_customMem cMem,
+                                                    ZSTD_threadPool *pool);
 ZSTDMT_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
 
 ZSTDMT_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx);
index 0a93ee6f6741cae5734c7c95a4785be9cc2f5758..c6043ed67347cd8991f7892f9992ef40db2af28f 100644 (file)
@@ -1409,6 +1409,21 @@ ZSTDLIB_API ZSTD_CDict* ZSTD_createCDict_advanced(const void* dict, size_t dictS
                                                   ZSTD_compressionParameters cParams,
                                                   ZSTD_customMem customMem);
 
+/* ! Thread pool :
+ * These prototypes make it possible to share a thread pool among multiple compression contexts.
+ * This can limit resources for applications with multiple threads where each one uses
+ * a threaded compression mode (via ZSTD_c_nbWorkers parameter).
+ * ZSTD_createThreadPool creates a new thread pool with a given number of threads.
+ * Note that the lifetime of such pool must exist while being used.
+ * ZSTD_CCtx_refThreadPool assigns a thread pool to a context (use NULL argument value
+ * to use an internal thread pool).
+ * ZSTD_freeThreadPool frees a thread pool.
+ */
+typedef struct POOL_ctx_s ZSTD_threadPool;
+ZSTDLIB_API ZSTD_threadPool* ZSTD_createThreadPool(size_t numThreads);
+ZSTDLIB_API void ZSTD_freeThreadPool (ZSTD_threadPool* pool);
+ZSTDLIB_API size_t ZSTD_CCtx_refThreadPool(ZSTD_CCtx* cctx, ZSTD_threadPool* pool);
+
 /**
  * This API is temporary and is expected to change or disappear in the future!
  */
@@ -1424,8 +1439,6 @@ ZSTDLIB_API ZSTD_DDict* ZSTD_createDDict_advanced(const void* dict, size_t dictS
                                                   ZSTD_dictContentType_e dictContentType,
                                                   ZSTD_customMem customMem);
 
-
-
 /***************************************
 *  Advanced compression functions
 ***************************************/