]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ZSTDMT_{flush,end}Stream() now block on next job completion when nothing to flush
authorYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 00:15:18 +0000 (16:15 -0800)
committerYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 00:15:18 +0000 (16:15 -0800)
The main issue was to avoid a caller to continually loop on {flush,end}Stream()
when there was nothing ready to be flushed but still some compression work ongoing in a worker thread.
The continuous loop would have resulted in wasted energy.
The new version makes call to {flush,end}Stream blocking when there is nothing ready to be flushed.
Of course, if all worker threads have exhausted job, it will return zero (all flush completed).

Note : There are still some remaining issues to report error codes
and properly collect back resources into pools when an error is triggered.

lib/compress/zstdmt_compress.c

index fb9183f9e41393a99d291cba640e054362584aa2..57cc107f7396412e947f423a379e86a53ad67c94 100644 (file)
@@ -388,10 +388,15 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 
     if (zcs->inBuff.filled == zcs->inBuffSize) {   /* filled enough : let's compress */
         size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
-        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
-        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);   /* should check for NULL */
+        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
+        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
         unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
+        if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+            zcs->jobs[jobID].cSize = ERROR(memory_allocation);   /* job result : how to collect that error ? */
+            zcs->jobs[jobID].jobCompleted = 1;
+        }
+
         zcs->jobs[jobID].src = zcs->inBuff.buffer;
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
@@ -426,17 +431,18 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
-            if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
-                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+            if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => go to next one */
+                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = (buffer_t) { NULL, 0 };
                 zcs->doneJobID++;
-            } else
-                zcs->jobs[jobID].dstFlushed = job.dstFlushed;
-    }   }
+            } else {
+                zcs->jobs[jobID].dstFlushed = job.dstFlushed;   /* save flush level into zcs for later retrieval */
+    }   }   }
 
     /* recommended next input size : fill current input buffer */
     return zcs->inBuffSize - zcs->inBuff.filled;
 }
 
+
 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
 {
     size_t const srcSize = zcs->inBuff.filled;
@@ -469,14 +475,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         }
 
         DEBUGLOG(3, "posting job %u   (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
-        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
+        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
         zcs->nextJobID++;
     }
 
     /* check if there is any data available to flush */
     {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
-        ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-        if (job.jobCompleted) {   /* job completed : output can be flushed */
+        PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+        while (zcs->jobs[wJobID].jobCompleted==0) {
+            DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
+            pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+        }
+        pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+        {   /* job completed : output can be flushed */
+            ZSTDMT_jobDescription job = zcs->jobs[wJobID];
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
@@ -488,11 +500,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
                 zcs->doneJobID++;
             } else {
                 zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
-        }   }
-        /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
-        if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-        return (zcs->doneJobID < zcs->nextJobID);
-    }
+            }
+            /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+            if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+            return (zcs->doneJobID < zcs->nextJobID);
+    }   }
 }