]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
removed mtctx->cstream
authorYann Collet <cyan@fb.com>
Tue, 30 May 2017 23:37:19 +0000 (16:37 -0700)
committerYann Collet <cyan@fb.com>
Tue, 30 May 2017 23:37:19 +0000 (16:37 -0700)
use the first cctx in pool when ZSTDMT is used in single-thread mode
now that cctx and cstream are the same object.

lib/compress/zstdmt_compress.c

index bd3de4586e51f2177c214e7f6ef68990fde4ed09..ca5cd6aee7877c32e0b76ad35134f2e19a6833f4 100644 (file)
 
 
 /* ======   Debug   ====== */
-#if defined(ZSTDMT_DEBUG)
+#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=1)
+#  include <assert.h>
+#else
+#  define assert(condition) ((void)0)
+#endif
+
+#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=2)
 
 #  include <stdio.h>
 #  include <unistd.h>
@@ -309,7 +315,6 @@ struct ZSTDMT_CCtx_s {
     size_t sectionSize;
     ZSTD_customMem cMem;
     ZSTD_CDict* cdict;
-    ZSTD_CStream* cstream;
 };
 
 ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
@@ -343,11 +348,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
-    if (nbThreads==1) {
-        mtctx->cstream = ZSTD_createCStream_advanced(cMem);
-        if (!mtctx->cstream) {
-            ZSTDMT_freeCCtx(mtctx); return NULL;
-    }   }
     pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL);   /* Todo : check init function return */
     pthread_cond_init(&mtctx->jobCompleted_cond, NULL);
     DEBUGLOG(4, "mt_cctx created, for %u threads", nbThreads);
@@ -387,7 +387,6 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     ZSTD_free(mtctx->jobs, mtctx->cMem);
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
     ZSTD_freeCDict(mtctx->cdict);
-    ZSTD_freeCStream(mtctx->cstream);
     pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
     pthread_cond_destroy(&mtctx->jobCompleted_cond);
     ZSTD_free(mtctx, mtctx->cMem);
@@ -540,8 +539,12 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
                                     ZSTD_parameters params, unsigned long long pledgedSrcSize)
 {
     ZSTD_customMem const cmem = { NULL, NULL, NULL };
-    DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
-    if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
+    DEBUGLOG(3, "Started new compression, with windowLog : %u",
+                    params.cParams.windowLog);
+    if (zcs->nbThreads==1)
+        return ZSTD_initCStream_advanced(zcs->cctxPool->cctx[0],
+                                        dict, dictSize,
+                                        params, pledgedSrcSize);
     if (zcs->allJobsCompleted == 0) {   /* previous job not correctly finished */
         ZSTDMT_waitForAllJobsCompleted(zcs);
         ZSTDMT_releaseAllJobResources(zcs);
@@ -587,7 +590,8 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
  * pledgedSrcSize is optional and can be zero == unknown */
 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
 {
-    if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
+    if (zcs->nbThreads==1)
+        return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
     return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
 }
 
@@ -612,13 +616,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
         return ERROR(memory_allocation);
     }
 
-    DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
+    DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
+                zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
     zcs->jobs[jobID].src = zcs->inBuff.buffer;
     zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
     zcs->jobs[jobID].srcSize = srcSize;
-    zcs->jobs[jobID].dictSize = zcs->dictSize;   /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */
+    zcs->jobs[jobID].dictSize = zcs->dictSize;
+    assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
     zcs->jobs[jobID].params = zcs->params;
-    if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
+    /* do not calculate checksum within sections, but write it in header for first section */
+    if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
     zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
     zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
     zcs->jobs[jobID].dstBuff = dstBuffer;
@@ -643,8 +650,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
         }
         DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled);
         zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
-        DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize));
-        memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled);
+        DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src",
+                    (U32)zcs->inBuff.filled, (U32)newDictSize,
+                    (U32)(zcs->inBuff.filled - newDictSize));
+        memmove(zcs->inBuff.buffer.start,
+            (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
+            zcs->inBuff.filled);
         DEBUGLOG(5, "new inBuff pre-filled");
         zcs->dictSize = newDictSize;
     } else {
@@ -653,10 +664,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
         zcs->dictSize = 0;
         zcs->frameEnded = 1;
         if (zcs->nextJobID == 0)
-            zcs->params.fParams.checksumFlag = 0;   /* single chunk : checksum is calculated directly within worker thread */
+            /* single chunk exception : checksum is calculated directly within worker thread */
+            zcs->params.fParams.checksumFlag = 0;
     }
 
-    DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+    DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
+                zcs->nextJobID,
+                (U32)zcs->jobs[jobID].srcSize,
+                zcs->jobs[jobID].lastChunk,
+                zcs->doneJobID,
+                zcs->doneJobID & zcs->jobIDMask);
     POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
     zcs->nextJobID++;
     return 0;
@@ -729,8 +746,11 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
     size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
-    if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Only flush is allowed. Restart with init */
-    if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
+    if (zcs->frameEnded)
+        /* current frame being ended. Only flush is allowed. Restart with init */
+        return ERROR(stage_wrong);
+    if (zcs->nbThreads==1)
+        return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
 
     /* fill input buffer */
     {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@@ -770,12 +790,14 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 
 size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
-    if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
+    if (zcs->nbThreads==1)
+        return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
     return ZSTDMT_flushStream_internal(zcs, output, 0);
 }
 
 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
-    if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
+    if (zcs->nbThreads==1)
+        return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
     return ZSTDMT_flushStream_internal(zcs, output, 1);
 }