]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
switch to a single flush mutex+cond
authorYann Collet <cyan@fb.com>
Sun, 2 Feb 2025 02:37:23 +0000 (18:37 -0800)
committerYann Collet <cyan@fb.com>
Sun, 2 Feb 2025 02:37:23 +0000 (18:37 -0800)
which is transferred to the current oldest unfinished job.

lib/compress/huf_compress.c
lib/compress/zstd_opt.c
lib/compress/zstdmt_compress.c
tests/cli-tests/compression/adapt.sh

index ea000723209f97ef87d8e4191bc42ec22dc6ee25..3e68f76fb7d956af9baefe0e9be3c6b7b58fd152 100644 (file)
@@ -379,7 +379,7 @@ static U32 HUF_setMaxHeight(nodeElt* huffNode, U32 lastNonNull, U32 targetNbBits
     /* early exit : no elt > targetNbBits, so the tree is already valid. */
     if (largestBits <= targetNbBits) return largestBits;
 
-    DEBUGLOG(5, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);
+    DEBUGLOG(6, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);
 
     /* there are several too large elements (at least >= 2) */
     {   int totalCost = 0;
@@ -685,7 +685,7 @@ static int HUF_buildTree(nodeElt* huffNode, U32 maxSymbolValue)
     int lowS, lowN;
     int nodeNb = STARTNODE;
     int n, nodeRoot;
-    DEBUGLOG(5, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
+    DEBUGLOG(6, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
     /* init for parents */
     nonNullRank = (int)maxSymbolValue;
     while(huffNode[nonNullRank].count == 0) nonNullRank--;
@@ -764,7 +764,7 @@ HUF_buildCTable_wksp(HUF_CElt* CTable, const unsigned* count, U32 maxSymbolValue
 
     HUF_STATIC_ASSERT(HUF_CTABLE_WORKSPACE_SIZE == sizeof(HUF_buildCTable_wksp_tables));
 
-    DEBUGLOG(5, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);
+    DEBUGLOG(6, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);
 
     /* safety checks */
     if (wkspSize < sizeof(HUF_buildCTable_wksp_tables))
index 09de5a9d9be44b854536f3cdbcb957c56e7f5c40..22693c82e8a4b4d850024ffa1f2a75ba01093bfd 100644 (file)
@@ -124,7 +124,7 @@ static U32 ZSTD_scaleStats(unsigned* table, U32 lastEltIndex, U32 logTarget)
 {
     U32 const prevsum = sum_u32(table, lastEltIndex+1);
     U32 const factor = prevsum >> logTarget;
-    DEBUGLOG(5, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget);
+    DEBUGLOG(6, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget);
     assert(logTarget < 30);
     if (factor <= 1) return prevsum;
     return ZSTD_downscaleStats(table, lastEltIndex, ZSTD_highbit32(factor), base_1guaranteed);
index 3b572e4b00875d97b8f79a58c0a1317341ad4873..4af236fd00750e0e69e7a50fd3f2e0ff5cd341e9 100644 (file)
@@ -62,6 +62,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
     do {                                                                                \
         if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) {                                     \
             unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds();    \
+            DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "acquiring mutex %s", #mutex);             \
             ZSTD_pthread_mutex_lock(mutex);                                             \
             {   unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
                 unsigned long long const elapsedTime = (afterTime-beforeTime);          \
@@ -76,9 +77,22 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
         }                                                                               \
     } while (0)
 
+#define COND_WAIT_DLEVEL 6
+#define ZSTD_PTHREAD_COND_WAIT(_cond, _mutex)                               \
+    do {                                                                    \
+        if (DEBUGLEVEL >= COND_WAIT_DLEVEL) {                               \
+            DEBUGLOG(COND_WAIT_DLEVEL, "waiting on condition %s", #_cond);  \
+            ZSTD_pthread_cond_wait(_cond,_mutex);                           \
+            DEBUGLOG(COND_WAIT_DLEVEL, "condition %s triggered", #_cond);   \
+        } else {                                                            \
+            ZSTD_pthread_cond_wait(_cond,_mutex);                           \
+        }                                                                   \
+    } while (0)
+
 #else
 
 #  define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
+#  define ZSTD_PTHREAD_COND_WAIT(c,m) ZSTD_pthread_cond_wait(c,m)
 #  define DEBUG_PRINTHEX(l,p,n) do { } while (0)
 
 #endif
@@ -147,7 +161,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
     size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer);
     unsigned u;
     size_t totalBufferSize = 0;
-    ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
     for (u=0; u<bufPool->totalBuffers; u++)
         totalBufferSize += bufPool->buffers[u].capacity;
     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
@@ -161,7 +175,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
  * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
 static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
 {
-    ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
     DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);
     bufPool->bufferSize = bSize;
     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
@@ -193,7 +207,7 @@ static Buffer ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
 {
     size_t const bSize = bufPool->bufferSize;
     DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize);
-    ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
     if (bufPool->nbBuffers) {   /* try to use an existing buffer */
         Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)];
         size_t const availBufferSize = buf.capacity;
@@ -256,7 +270,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, Buffer buf)
 {
     DEBUGLOG(5, "ZSTDMT_releaseBuffer");
     if (buf.start == NULL) return;   /* compatible with release on NULL */
-    ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
     if (bufPool->nbBuffers < bufPool->totalBuffers) {
         bufPool->buffers[bufPool->nbBuffers++] = buf;  /* stored for later use */
         DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
@@ -417,7 +431,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
 /* only works during initialization phase, not during compression */
 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
 {
-    ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
     {   unsigned const nbWorkers = cctxPool->totalCCtx;
         size_t const poolSize = sizeof(*cctxPool);
         size_t const arraySize = cctxPool->totalCCtx * sizeof(ZSTD_CCtx*);
@@ -435,7 +449,7 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
 static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
 {
     DEBUGLOG(5, "ZSTDMT_getCCtx");
-    ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
     if (cctxPool->availCCtx) {
         cctxPool->availCCtx--;
         {   ZSTD_CCtx* const cctx = cctxPool->cctxs[cctxPool->availCCtx];
@@ -450,7 +464,7 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
 static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 {
     if (cctx==NULL) return;   /* compatibility with release on NULL */
-    ZSTD_pthread_mutex_lock(&pool->poolMutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&pool->poolMutex);
     if (pool->availCCtx < pool->totalCCtx)
         pool->cctxs[pool->availCCtx++] = cctx;
     else {
@@ -586,7 +600,7 @@ ZSTDMT_serialState_genSequences(SerialState* serialState,
     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
     while (serialState->nextJobID < jobID) {
         DEBUGLOG(5, "wait for serialState->cond");
-        ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
+        ZSTD_PTHREAD_COND_WAIT(&serialState->cond, &serialState->mutex);
     }
     /* A future job may error and skip our job */
     if (serialState->nextJobID == jobID) {
@@ -663,7 +677,8 @@ typedef struct {
     size_t   consumed;                 /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
     size_t   cSize;                    /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
     ZSTD_pthread_mutex_t job_mutex;    /* Thread-safe - used by mtctx and worker */
-    ZSTD_pthread_cond_t job_cond;      /* Thread-safe - used by mtctx and worker */
+    ZSTD_pthread_mutex_t* flush_mutex; /* Thread-safe - used by mtctx and worker */
+    ZSTD_pthread_cond_t* flush_cond;   /* Thread-safe - used by mtctx and worker */
     ZSTDMT_CCtxPool* cctxPool;         /* Thread-safe - used by mtctx and (all) workers */
     ZSTDMT_bufferPool* bufPool;        /* Thread-safe - used by mtctx and (all) workers */
     ZSTDMT_seqPool* seqPool;           /* Thread-safe - used by mtctx and (all) workers */
@@ -779,8 +794,14 @@ static void ZSTDMT_compressionJob(void* jobDescription)
             job->consumed = chunkSize * chunkNb;
             DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
-            ZSTD_pthread_cond_signal(&job->job_cond);   /* warns some more data is ready to be flushed */
-            ZSTD_pthread_mutex_unlock(&job->job_mutex);
+            if (job->flush_mutex != NULL) {
+                ZSTD_pthread_mutex_unlock(&job->job_mutex);
+                ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
+                ZSTD_pthread_cond_signal(job->flush_cond);   /* warns some more data is ready to be flushed */
+                ZSTD_pthread_mutex_unlock(job->flush_mutex);
+            } else {
+                ZSTD_pthread_mutex_unlock(&job->job_mutex);
+            }
         }
         /* last block */
         assert(chunkSize > 0);
@@ -815,7 +836,15 @@ _endJob:
     if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
     job->cSize += lastCBlockSize;
     job->consumed = job->src.size;  /* when job->consumed == job->src.size , compression job is presumed completed */
-    ZSTD_pthread_cond_signal(&job->job_cond);
+    if (job->flush_mutex != NULL) {
+        ZSTD_pthread_mutex_unlock(&job->job_mutex);
+        ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
+        ZSTD_pthread_cond_signal(job->flush_cond);   /* warns some more data is ready to be flushed */
+        ZSTD_pthread_mutex_unlock(job->flush_mutex);
+        ZSTD_pthread_mutex_lock(&job->job_mutex);
+    }
+    job->flush_mutex = NULL;
+    job->flush_cond = NULL;
     ZSTD_pthread_mutex_unlock(&job->job_mutex);
 }
 
@@ -870,13 +899,15 @@ struct ZSTDMT_CCtx_s {
     ZSTDMT_CCtxPool* cctxPool;
     ZSTDMT_seqPool* seqPool;
     ZSTD_CCtx_params params;
-    size_t targetSectionSize;
+    size_t targetJobSize;
     size_t targetPrefixSize;
     int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
     InBuff_t inBuff;
     RoundBuff_t roundBuff;
     SerialState serial;
     RSyncState_t rsync;
+    ZSTD_pthread_mutex_t flushMutex;
+    ZSTD_pthread_cond_t flushCond;
     unsigned jobIDMask;
     unsigned doneJobID;
     unsigned nextJobID;
@@ -897,7 +928,6 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS
     if (jobTable == NULL) return;
     for (jobNb=0; jobNb<nbJobs; jobNb++) {
         ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
-        ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
     }
     ZSTD_customFree(jobTable, cMem);
 }
@@ -918,7 +948,6 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
     *nbJobsPtr = nbJobs;
     for (jobNb=0; jobNb<nbJobs; jobNb++) {
         initError |= ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL);
-        initError |= ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL);
     }
     if (initError != 0) {
         ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
@@ -983,6 +1012,8 @@ ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZST
     mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
     mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
     initError = ZSTDMT_serialState_init(&mtctx->serial);
+    initError |= ZSTD_pthread_mutex_init(&mtctx->flushMutex, NULL);
+    initError |= ZSTD_pthread_cond_init(&mtctx->flushCond, NULL);
     mtctx->roundBuff = kNullRoundBuff;
     if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
         ZSTDMT_freeCCtx(mtctx);
@@ -1014,7 +1045,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
     for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
         /* Copy the mutex/cond out */
         ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;
-        ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;
 
         DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
         ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
@@ -1022,7 +1052,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
         /* Clear the job description, but keep the mutex/cond */
         ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));
         mtctx->jobs[jobID].job_mutex = mutex;
-        mtctx->jobs[jobID].job_cond = cond;
     }
     mtctx->inBuff.buffer = g_nullBuffer;
     mtctx->inBuff.filled = 0;
@@ -1036,8 +1065,17 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
         ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
-            DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
-            ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
+            DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);
+            /* we want to block and wait for data to flush */
+            if (mtctx->jobs[jobID].flush_mutex == NULL) {
+                mtctx->jobs[jobID].flush_mutex = &mtctx->flushMutex;
+                mtctx->jobs[jobID].flush_cond = &mtctx->flushCond;
+            }
+            ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
+            DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: let's wait for job progress");
+            ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex);
+            DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: waiting completed");
+            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
         }
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
         mtctx->doneJobID++;
@@ -1058,6 +1096,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     ZSTD_freeCDict(mtctx->cdictLocal);
     if (mtctx->roundBuff.buffer)
         ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);
+    ZSTD_pthread_mutex_destroy(&mtctx->flushMutex);
+    ZSTD_pthread_cond_destroy(&mtctx->flushCond);
     ZSTD_customFree(mtctx, mtctx->cMem);
     return 0;
 }
@@ -1129,7 +1169,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
             ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
-            ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+            ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
             {   size_t const cResult = jobPtr->cSize;
                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
                 size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
@@ -1157,7 +1197,7 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
     /* look into oldest non-fully-flushed job */
     {   unsigned const wJobID = jobID & mtctx->jobIDMask;
         ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
-        ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+        ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
         {   size_t const cResult = jobPtr->cSize;
             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
             size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
@@ -1279,15 +1319,15 @@ size_t ZSTDMT_initCStream_internal(
     mtctx->frameContentSize = pledgedSrcSize;
     mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(&params);
     DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
-    mtctx->targetSectionSize = params.jobSize;
-    if (mtctx->targetSectionSize == 0) {
-        mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(&params);
+    mtctx->targetJobSize = params.jobSize;
+    if (mtctx->targetJobSize == 0) {
+        mtctx->targetJobSize = 1ULL << ZSTDMT_computeTargetJobLog(&params);
     }
-    assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
+    assert(mtctx->targetJobSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
 
     if (params.rsyncable) {
         /* Aim for the targetsectionSize as the average job size. */
-        U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);
+        U32 const jobSizeKB = (U32)(mtctx->targetJobSize >> 10);
         U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);
         /* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our
          * expected job size is at least 4x larger. */
@@ -1297,24 +1337,24 @@ size_t ZSTDMT_initCStream_internal(
         mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
         mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
     }
-    if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
-    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);
-    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
-    ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
+    if (mtctx->targetJobSize < mtctx->targetPrefixSize) mtctx->targetJobSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
+    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetJobSize>>10), (U32)params.jobSize);
+    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetJobSize>>10));
+    ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetJobSize));
     {
         /* If ldm is enabled we need windowSize space. */
         size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;
         /* Two buffers of slack, plus extra space for the overlap
          * This is the minimum slack that LDM works with. One extra because
-         * flush might waste up to targetSectionSize-1 bytes. Another extra
+         * flush might waste up to targetJobSize-1 bytes. Another extra
          * for the overlap (if > 0), then one to fill which doesn't overlap
          * with the LDM window.
          */
         size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
-        size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
+        size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers;
         /* Compute the total size, and always have enough slack */
         size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
-        size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
+        size_t const sectionsSize = mtctx->targetJobSize * nbWorkers;
         size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
         if (mtctx->roundBuff.capacity < capacity) {
             if (mtctx->roundBuff.buffer)
@@ -1359,7 +1399,7 @@ size_t ZSTDMT_initCStream_internal(
         mtctx->cdict = cdict;
     }
 
-    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
+    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetJobSize,
                                  dict, dictSize, dictContentType))
         return ERROR(memory_allocation);
 
@@ -1436,6 +1476,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         mtctx->jobs[jobID].lastJob = endFrame;
         mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
         mtctx->jobs[jobID].dstFlushed = 0;
+        mtctx->jobs[jobID].flush_mutex = NULL;
+        mtctx->jobs[jobID].flush_cond = NULL;
 
         /* Update the round buffer pos and clear the input buffer to be reset */
         mtctx->roundBuff.pos += srcSize;
@@ -1455,12 +1497,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         }   }
 
         if ( (srcSize == 0)
-          && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {
+          && (mtctx->nextJobID>0) /*single job must also write frame header*/ ) {
             DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
             assert(endOp == ZSTD_e_end);  /* only possible case : need to end the frame with an empty last block */
             ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
             mtctx->nextJobID++;
-            return 0;
+            return 1;
         }
     }
 
@@ -1471,13 +1513,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
                 mtctx->nextJobID,
                 jobID);
 
-    if (ZSTDMT_anythingToFlush(mtctx)) {
+    if (1 || ZSTDMT_anythingToFlush(mtctx)) {
         if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
             mtctx->nextJobID++;
             mtctx->jobReady = 0;
+            return 1;
         } else {
-            DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+            DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
             mtctx->jobReady = 1;
+            return 0;
         }
     } else {
         /* block here, wait for next available job */
@@ -1485,8 +1529,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         mtctx->nextJobID++;
         mtctx->jobReady = 0;
     }
-
-    return 0;
+    return 1;
 }
 
 
@@ -1515,7 +1558,15 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
             }
             DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
-            ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex);  /* block when nothing to flush but some to come */
+            if (mtctx->jobs[wJobID].flush_mutex == NULL) {
+                mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex;
+                mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond;
+            }
+            DEBUGLOG(6, "waiting to flush something (%zu left)",  mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed);
+            ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+            ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex);  /* block waiting for something to flush */
+            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
+            DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
     }   }
 
     /* try to flush something */
@@ -1565,6 +1616,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                 DEBUGLOG(5, "dstBuffer released");
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
+                mtctx->jobs[wJobID].flush_mutex = NULL;
+                mtctx->jobs[wJobID].flush_cond = NULL;
                 mtctx->consumed += srcSize;
                 mtctx->produced += cSize;
                 mtctx->doneJobID++;
@@ -1595,7 +1648,7 @@ static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
 
     /* no need to check during first round */
     size_t roundBuffCapacity = mtctx->roundBuff.capacity;
-    size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize;
+    size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetJobSize;
     if (lastJobID < nbJobs1stRoundMin) return kNullRange;
 
     for (jobID = firstJobID; jobID < lastJobID; ++jobID) {
@@ -1676,7 +1729,7 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, Buffer buffer)
         ZSTD_PTHREAD_MUTEX_LOCK(mutex);
         while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
             DEBUGLOG(5, "Waiting for LDM to finish...");
-            ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
+            ZSTD_PTHREAD_COND_WAIT(&mtctx->serial.ldmWindowCond, mutex);
         }
         DEBUGLOG(6, "Done waiting for LDM to finish");
         ZSTD_pthread_mutex_unlock(mutex);
@@ -1692,7 +1745,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
 {
     Range const inUse = ZSTDMT_getInputDataInUse(mtctx);
     size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
-    size_t const spaceNeeded = mtctx->targetSectionSize;
+    size_t const spaceNeeded = mtctx->targetJobSize;
     Buffer buffer;
 
     DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
@@ -1765,7 +1818,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
     BYTE const* prev;
     size_t pos;
 
-    syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
+    syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetJobSize - mtctx->inBuff.filled);
     syncPoint.flush = 0;
     if (!mtctx->params.rsyncable)
         /* Rsync is disabled. */
@@ -1781,7 +1834,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
          * window. However, since it depends only in the internal buffers, if the
          * state is already synchronized, we will remain synchronized.
          * Additionally, the probability that we miss a synchronization point is
-         * low: RSYNC_LENGTH / targetSectionSize.
+         * low: RSYNC_LENGTH / targetJobSize.
          */
         return syncPoint;
     /* Initialize the loop variables. */
@@ -1825,7 +1878,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
      * through the input. If we hit a synchronization point, then cut the
      * job off, and tell the compressor to flush the job. Otherwise, load
      * all the bytes and continue as normal.
-     * If we go too long without a synchronization point (targetSectionSize)
+     * If we go too long without a synchronization point (targetJobSize)
      * then a block will be emitted anyways, but this is okay, since if we
      * are already synchronized we will remain synchronized.
      */
@@ -1852,8 +1905,8 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
 
 size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
 {
-    size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
-    if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
+    size_t hintInSize = mtctx->targetJobSize - mtctx->inBuff.filled;
+    if (hintInSize==0) hintInSize = mtctx->targetJobSize;
     return hintInSize;
 }
 
@@ -1866,7 +1919,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                                      ZSTD_inBuffer* input,
                                      ZSTD_EndDirective endOp)
 {
-    unsigned forwardInputProgress = 0;
+    unsigned forwardProgress = 0;
     DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
                 (U32)endOp, (U32)(input->size - input->pos));
     assert(output->pos <= output->size);
@@ -1896,13 +1949,13 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
             if (syncPoint.flush && endOp == ZSTD_e_continue) {
                 endOp = ZSTD_e_flush;
             }
-            assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
+            assert(mtctx->inBuff.buffer.capacity >= mtctx->targetJobSize);
             DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
-                        (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
+                        (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetJobSize);
             ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);
             input->pos += syncPoint.toLoad;
             mtctx->inBuff.filled += syncPoint.toLoad;
-            forwardInputProgress = syncPoint.toLoad>0;
+            forwardProgress = syncPoint.toLoad>0;
         }
     }
     if ((input->pos < input->size) && (endOp == ZSTD_e_end)) {
@@ -1912,21 +1965,23 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
             * - We filled the input buffer: flush this job but don't end the frame.
             * - We hit a synchronization point: flush this job but don't end the frame.
             */
-        assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable);
+        assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetJobSize || mtctx->params.rsyncable);
         endOp = ZSTD_e_flush;
     }
 
     if ( (mtctx->jobReady)
-      || (mtctx->inBuff.filled >= mtctx->targetSectionSize)  /* filled enough : let's compress */
+      || (mtctx->inBuff.filled >= mtctx->targetJobSize)  /* filled enough : let's compress */
       || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0))  /* something to flush : let's go */
       || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) {   /* must finish the frame with a zero-size block */
         size_t const jobSize = mtctx->inBuff.filled;
-        assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
-        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , "");
+        size_t const jobPosted = ZSTDMT_createCompressionJob(mtctx, jobSize, endOp);
+        assert(mtctx->inBuff.filled <= mtctx->targetJobSize);
+        FORWARD_IF_ERROR(jobPosted , "");
+        if (jobPosted) forwardProgress = 1;
     }
 
     /* check for potential compressed data ready to be flushed */
-    {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
+    {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardProgress, endOp); /* block if there was no forward input progress */
         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
         DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
         return remainingToFlush;
index 30b9afaa03bd225194e115898a677cfec7971df1..2981ada3858669d6ffecd253e0b3d50b4e7f0a9b 100755 (executable)
@@ -8,7 +8,7 @@ zstd -f file --adapt -c | zstd -t
 datagen -g100M > file100M
 
 # Pick parameters to force fast adaptation, even on slow systems
-zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
+zstd --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
 
 # Adaption still happens with --no-progress
-zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
+zstd --no-progress --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"