]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Documentation and minor refactor to clarify MT memory management. 3000/head
authorElliot Gorokhovsky <embg@fb.com>
Fri, 14 Jan 2022 18:11:41 +0000 (11:11 -0700)
committerElliot Gorokhovsky <embg@fb.com>
Tue, 18 Jan 2022 16:43:05 +0000 (09:43 -0700)
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h

index f564822d4b2d9f25ec6667016a51462f3bd4469c..6bc14b035e17c3461e2f2022f31055408a92e832 100644 (file)
@@ -102,9 +102,8 @@ typedef struct ZSTDMT_bufferPool_s {
     buffer_t bTable[1];   /* variable size */
 } ZSTDMT_bufferPool;
 
-static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
 {
-    unsigned const maxNbBuffers = 2*nbWorkers + 3;
     ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(
         sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
     if (bufPool==NULL) return NULL;
@@ -160,9 +159,8 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const
 }
 
 
-static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
+static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
 {
-    unsigned const maxNbBuffers = 2*nbWorkers + 3;
     if (srcBufPool==NULL) return NULL;
     if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
         return srcBufPool;
@@ -171,7 +169,7 @@ static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool,
         size_t const bSize = srcBufPool->bufferSize;   /* forward parameters */
         ZSTDMT_bufferPool* newBufPool;
         ZSTDMT_freeBufferPool(srcBufPool);
-        newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+        newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
         if (newBufPool==NULL) return newBufPool;
         ZSTDMT_setBufferSize(newBufPool, bSize);
         return newBufPool;
@@ -263,6 +261,16 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
     ZSTD_customFree(buf.start, bufPool->cMem);
 }
 
+/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
+ * The 3 additional buffers are as follows:
+ *   1 buffer for input loading
+ *   1 buffer for "next input" when submitting current one
+ *   1 buffer stuck in queue */
+#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) 2*nbWorkers + 3
+
+/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
+ * So we only need one seq buffer per worker. */
+#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) nbWorkers
 
 /* =====   Seq Pool Wrapper   ====== */
 
@@ -316,7 +324,7 @@ static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
 
 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
 {
-    ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+    ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
     if (seqPool == NULL) return NULL;
     ZSTDMT_setNbSeq(seqPool, 0);
     return seqPool;
@@ -329,7 +337,7 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
 
 static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
 {
-    return ZSTDMT_expandBufferPool(pool, nbWorkers);
+    return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));
 }
 
 
@@ -936,7 +944,7 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
     mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
     assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */
     mtctx->jobIDMask = nbJobs - 1;
-    mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+    mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
     mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
     mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
     initError = ZSTDMT_serialState_init(&mtctx->serial);
@@ -1039,7 +1047,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
 {
     if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
     FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
-    mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
+    mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));
     if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
     mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
     if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
index 2fee2ec7455c2152fb5e39c5dbbb555299d85ca0..271eb1ac71f58a4c037937167d604b692d682545 100644 (file)
@@ -65,8 +65,11 @@ size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
  *  Private use only. Init streaming operation.
  *  expects params to be valid.
  *  must receive dict, or cdict, or none, but not both.
+ *  mtctx can be freshly constructed or reused from a prior compression.
+ *  If mtctx is reused, memory allocations from the prior compression may not be freed,
+ *  even if they are not needed for the current compression.
  *  @return : 0, or an error code */
-size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
+size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx,
                     const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
                     const ZSTD_CDict* cdict,
                     ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);