]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
make zstdmt resize its context
authorYann Collet <cyan@fb.com>
Wed, 20 Jun 2018 00:28:56 +0000 (17:28 -0700)
committerYann Collet <cyan@fb.com>
Wed, 20 Jun 2018 00:28:56 +0000 (17:28 -0700)
when nbThreads change.

Technically, it only expands.
But when instructed to use less threads,
the thread pool will limit nb of concurrent threads.

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h

index c071c57382f1fcd280469f27b7ea1549fc2066a4..d268cef08bc1d647e8ee7bd78c655a8dbc84fc18 100644 (file)
@@ -3645,13 +3645,9 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
         }
         if (params.nbWorkers > 0) {
             /* mt context creation */
-            if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) {
+            if (cctx->mtctx == NULL) {
                 DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u",
                             params.nbWorkers);
-                if (cctx->mtctx != NULL)
-                    DEBUGLOG(4, "ZSTD_compress_generic: previous nbWorkers was %u",
-                                ZSTDMT_getNbWorkers(cctx->mtctx));
-                ZSTDMT_freeCCtx(cctx->mtctx);
                 cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem);
                 if (cctx->mtctx == NULL) return ERROR(memory_allocation);
             }
index 0c59c6a72ce22ce1e2f51f30d8d65ee2cc4f2162..4a2a4cdd5a0d6bff9fd04a92adfe4666587c67eb 100644 (file)
@@ -159,6 +159,25 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const
     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
 }
 
+
+static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
+{
+    unsigned const maxNbBuffers = 2*nbWorkers + 3;
+    if (srcBufPool==NULL) return NULL;
+    if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
+        return srcBufPool;
+    /* need a larger buffer pool */
+    {   ZSTD_customMem const cMem = srcBufPool->cMem;
+        size_t const bSize = srcBufPool->bufferSize;   /* forward parameters */
+        ZSTDMT_bufferPool* newBufPool;
+        ZSTDMT_freeBufferPool(srcBufPool);
+        newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+        if (newBufPool==NULL) return newBufPool;
+        ZSTDMT_setBufferSize(newBufPool, bSize);
+        return newBufPool;
+    }
+}
+
 /** ZSTDMT_getBuffer() :
  *  assumption : bufPool must be valid
  * @return : a buffer, with start pointer and size
@@ -309,6 +328,10 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
     ZSTDMT_freeBufferPool(seqPool);
 }
 
+static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
+{
+    return ZSTDMT_expandBufferPool(pool, nbWorkers);
+}
 
 
 /* =====   CCtx Pool   ===== */
@@ -354,6 +377,18 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers,
     return cctxPool;
 }
 
+static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
+                                              unsigned nbWorkers)
+{
+    if (srcPool==NULL) return NULL;
+    if (nbWorkers <= srcPool->totalCCtx) return srcPool;   /* good enough */
+    /* need a larger cctx pool */
+    {   ZSTD_customMem const cMem = srcPool->cMem;
+        ZSTDMT_freeCCtxPool(srcPool);
+        return ZSTDMT_createCCtxPool(nbWorkers, cMem);
+    }
+}
+
 /* only works during initialization phase, not during compression */
 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
 {
@@ -745,9 +780,9 @@ struct ZSTDMT_CCtx_s {
     ZSTD_CCtx_params params;
     size_t targetSectionSize;
     size_t targetPrefixSize;
-    roundBuff_t roundBuff;
+    int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
     inBuff_t inBuff;
-    int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
+    roundBuff_t roundBuff;
     serialState_t serial;
     unsigned singleBlockingThread;
     unsigned jobIDMask;
@@ -798,6 +833,20 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
     return jobTable;
 }
 
+static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
+    U32 nbJobs = nbWorkers + 2;
+    if (nbJobs > mtctx->jobIDMask+1) {  /* need more job capacity */
+        ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
+        mtctx->jobIDMask = 0;
+        mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
+        if (mtctx->jobs==NULL) return ERROR(memory_allocation);
+        assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0));  /* ensure nbJobs is a power of 2 */
+        mtctx->jobIDMask = nbJobs - 1;
+    }
+    return 0;
+}
+
+
 /* ZSTDMT_CCtxParam_setNbWorkers():
  * Internal use only */
 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
@@ -964,6 +1013,25 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
     return jobParams;
 }
 
+
+/* ZSTDMT_resize() :
+ * @return : error code if fails, 0 on success */
+static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
+{
+    mtctx->factory = POOL_resize(mtctx->factory, nbWorkers);
+    if (mtctx->factory == NULL) return ERROR(memory_allocation);
+    CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
+    mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
+    if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
+    mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
+    if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
+    mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers);
+    if (mtctx->seqPool == NULL) return ERROR(memory_allocation);
+    ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
+    return 0;
+}
+
+
 /*! ZSTDMT_updateCParams_whileCompressing() :
  *  Updates only a selected set of compression parameters, to remain compatible with current frame.
  *  New parameters will be applied to next compression job. */
@@ -980,15 +1048,6 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
     }
 }
 
-/* ZSTDMT_getNbWorkers():
- * @return nb threads currently active in mtctx.
- * mtctx must be valid */
-unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
-{
-    assert(mtctx != NULL);
-    return mtctx->params.nbWorkers;
-}
-
 /* ZSTDMT_getFrameProgression():
  * tells how much data has been consumed (input) and produced (output) for current frame.
  * able to count progression inside worker threads.
@@ -1089,15 +1148,7 @@ static size_t ZSTDMT_compress_advanced_internal(
     if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
         return ERROR(memory_allocation);
 
-    if (nbJobs > mtctx->jobIDMask+1) {  /* enlarge job table */
-        U32 jobsTableSize = nbJobs;
-        ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
-        mtctx->jobIDMask = 0;
-        mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem);
-        if (mtctx->jobs==NULL) return ERROR(memory_allocation);
-        assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0));  /* ensure jobsTableSize is a power of 2 */
-        mtctx->jobIDMask = jobsTableSize - 1;
-    }
+    CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
 
     {   unsigned u;
         for (u=0; u<nbJobs; u++) {
@@ -1222,12 +1273,15 @@ size_t ZSTDMT_initCStream_internal(
 {
     DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
                 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
-    /* params are supposed to be fully validated at this point */
+
+    /* params supposed partially fully validated at this point */
     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
-    assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
 
     /* init */
+    if (params.nbWorkers != mtctx->params.nbWorkers)
+        ZSTDMT_resize(mtctx, params.nbWorkers);
+
     if (params.jobSize == 0) {
         params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
     }
index f79e3b4418004d4aba30683cd0957b430bcddea5..4249a82dea872eb3ce3a92f84e783e888e52e5b2 100644 (file)
@@ -126,11 +126,6 @@ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorker
  *  New parameters will be applied to next compression job. */
 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams);
 
-/* ZSTDMT_getNbWorkers():
- * @return nb threads currently active in mtctx.
- * mtctx must be valid */
-unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx);
-
 /* ZSTDMT_getFrameProgression():
  * tells how much data has been consumed (input) and produced (output) for current frame.
  * able to count progression inside worker threads.