]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ZSTDMT streaming : fall back to (regular) single thread mode
authorYann Collet <cyan@fb.com>
Mon, 23 Jan 2017 09:43:58 +0000 (01:43 -0800)
committerYann Collet <cyan@fb.com>
Mon, 23 Jan 2017 09:43:58 +0000 (01:43 -0800)
when nbThreads==1

lib/compress/zstdmt_compress.c

index 18ed7441786940797574ff1e12cab16fd6a2964c..135d274f8a5b040f29f6e65d4b9bfc05ac27976e 100644 (file)
@@ -172,7 +172,10 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
     ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*));
     if (!cctxPool) return NULL;
     cctxPool->totalCCtx = nbThreads;
-    cctxPool->availCCtx = 0;
+    cctxPool->availCCtx = 1;   /* at least one cctx for single-thread mode */
+    cctxPool->cctx[0] = ZSTD_createCCtx();
+    if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
+    DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads);
     return cctxPool;
 }
 
@@ -278,6 +281,7 @@ struct ZSTDMT_CCtx_s {
     unsigned allJobsCompleted;
     unsigned long long frameContentSize;
     ZSTD_CDict* cdict;
+    ZSTD_CStream* cstream;
     ZSTDMT_jobDescription jobs[1];   /* variable size (must lies at the end) */
 };
 
@@ -287,7 +291,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     U32 const minNbJobs = nbThreads + 2;
     U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
     U32 const nbJobs = 1 << nbJobsLog2;
-    DEBUGLOG(4, "nbThreads : %u  ; minNbJobs : %u ;  nbJobsLog2 : %u ;  nbJobs : %u  \n",
+    DEBUGLOG(5, "nbThreads : %u  ; minNbJobs : %u ;  nbJobsLog2 : %u ;  nbJobs : %u  \n",
             nbThreads, minNbJobs, nbJobsLog2, nbJobs);
     if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
     cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
@@ -302,8 +306,14 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
         ZSTDMT_freeCCtx(cctx);
         return NULL;
     }
+    if (nbThreads==1) {
+        cctx->cstream = ZSTD_createCStream();
+        if (!cctx->cstream) {
+            ZSTDMT_freeCCtx(cctx); return NULL;
+    }   }
     pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);   /* Todo : check init function return */
     pthread_cond_init(&cctx->jobCompleted_cond, NULL);
+    DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads);
     return cctx;
 }
 
@@ -329,11 +339,12 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
 {
     if (mtctx==NULL) return 0;   /* compatible with free on NULL */
-    ZSTD_freeCDict(mtctx->cdict);
     POOL_free(mtctx->factory);
     if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
     ZSTDMT_freeBufferPool(mtctx->buffPool);  /* release job resources into pools first */
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+    ZSTD_freeCDict(mtctx->cdict);
+    ZSTD_freeCStream(mtctx->cstream);
     pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
     pthread_cond_destroy(&mtctx->jobCompleted_cond);
     free(mtctx);
@@ -361,12 +372,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
     params.fParams.contentSizeFlag = 1;
 
     if (nbChunks==1) {   /* fallback to single-thread mode */
-        size_t result;
-        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
-        if (!cctx) return ERROR(memory_allocation);
-        result = ZSTD_compressCCtx(mtctx->cctxPool->cctx[0], dst, dstCapacity, src, srcSize, compressionLevel);
-        ZSTDMT_releaseCCtx(mtctx->cctxPool, cctx);
-        return result;
+        ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
+        return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel);
     }
 
     {   unsigned u;
@@ -461,6 +468,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
                                     ZSTD_parameters params, unsigned long long pledgedSrcSize)
 {
     ZSTD_customMem const cmem = { NULL, NULL, NULL };
+    if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
     if (zcs->allJobsCompleted == 0) {   /* previous job not correctly finished */
         ZSTDMT_waitForAllJobsCompleted(zcs);
         ZSTDMT_releaseAllJobResources(zcs);
@@ -498,6 +506,7 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
  * pledgedSrcSize is optional and can be zero == unknown */
 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
 {
+    if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
     return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
 }
 
@@ -510,6 +519,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
     if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Only flush is allowed. Restart with init */
+    if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
 
     /* fill input buffer */
     {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@@ -708,10 +718,12 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 
 size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
+    if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
     return ZSTDMT_flushStream_internal(zcs, output, 0);
 }
 
 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
+    if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
     return ZSTDMT_flushStream_internal(zcs, output, 1);
 }