]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
refactor ZSTDMT streaming flush code
authorYann Collet <cyan@fb.com>
Mon, 23 Jan 2017 19:43:51 +0000 (11:43 -0800)
committerYann Collet <cyan@fb.com>
Mon, 23 Jan 2017 19:50:44 +0000 (11:50 -0800)
now shared by both ZSTDMT_compressStream() and ZSTDMT_flushStream()

lib/compress/zstdmt_compress.c

index 135d274f8a5b040f29f6e65d4b9bfc05ac27976e..9e7754b8889780a73dd8ee02b20080df07bf6717 100644 (file)
@@ -244,8 +244,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
         ZSTD_invalidateRepCodes(job->cctx);
     }
 
-    DEBUGLOG(3, "Compressing : ");
-    DEBUG_PRINTHEX(3, job->srcStart, 12);
+    DEBUGLOG(4, "Compressing : ");
+    DEBUG_PRINTHEX(4, job->srcStart, 12);
     job->cSize = (job->lastChunk) ?   /* last chunk signal */
                  ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
                  ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
@@ -516,6 +516,54 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
 }
 
 
+/* ZSTDMT_flushNextJob() :
+ * output : will be updated with amount of data flushed .
+ * blockToFlush : the function will block and wait if there is no data available to flush .
+ * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
+static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
+{
+    unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+    if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
+    PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+    while (zcs->jobs[wJobID].jobCompleted==0) {
+        DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);    /* block when nothing available to flush */
+        if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
+        pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+    }
+    pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+    /* compression job completed : output can be flushed */
+    {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+        size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+        DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+        ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+        zcs->jobs[wJobID].cctx = NULL;
+        ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+        zcs->jobs[wJobID].srcStart = NULL;
+        zcs->jobs[wJobID].src = g_nullBuffer;
+        if (ZSTD_isError(job.cSize)) {
+            ZSTDMT_waitForAllJobsCompleted(zcs);
+            ZSTDMT_releaseAllJobResources(zcs);
+            return job.cSize;
+        }
+        memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+        output->pos += toWrite;
+        job.dstFlushed += toWrite;
+        if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => move to next one */
+            ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+            zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+            zcs->jobs[wJobID].jobCompleted = 0;
+            zcs->doneJobID++;
+        } else {
+            zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
+        }
+        /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+        if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+        if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some buffer to flush */
+        zcs->allJobsCompleted = zcs->frameEnded;   /* frame completed and entirely flushed */
+        return 0;   /* everything flushed */
+}   }
+
+
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
     if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Only flush is allowed. Restart with init */
@@ -579,6 +627,8 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
     }
 
     /* check if there is any data available to flush */
+    ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize));  /* we'll block if it wasn't possible to create new job due to saturation */
+#if 0
     {   unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
         unsigned jobCompleted;
         pthread_mutex_lock(&zcs->jobCompleted_mutex);
@@ -611,7 +661,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
                 zcs->jobs[jobID].jobCompleted = 0;
                 zcs->doneJobID++;
     }   }   }
-
+#endif
     /* recommended next input size : fill current input buffer */
     return zcs->inBuffSize - zcs->inBuff.filled;
 }
@@ -671,25 +721,27 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->frameEnded = 1;
         }
 
-        DEBUGLOG(1, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+        DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
         POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
         zcs->nextJobID++;
     }
 
     /* check if there is any data available to flush */
-    DEBUGLOG(1, "zcs->doneJobID : %u  ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
-    if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
+    DEBUGLOG(5, "zcs->doneJobID : %u  ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
+    return ZSTDMT_flushNextJob(zcs, output, 1);
+
+#if 0
     {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
         PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
         while (zcs->jobs[wJobID].jobCompleted==0) {
-            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
+            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);   /* block when nothing available to flush */
             pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
         }
         pthread_mutex_unlock(&zcs->jobCompleted_mutex);
-        {   /* job completed : output can be flushed */
-            ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+        /* compression job completed : output can be flushed */
+        {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(1, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
             if (ZSTD_isError(job.cSize)) {
@@ -713,6 +765,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->allJobsCompleted = zcs->frameEnded;
             return 0;
     }   }
+#endif
 }