]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Simplified compressChunk job
authorYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 18:18:17 +0000 (10:18 -0800)
committerYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 18:18:17 +0000 (10:18 -0800)
minor refactoring : compression done in a single call on first chunk
Avoid a mutable hSize variable and eventual recombination to cSize at the end

lib/compress/zstdmt_compress.c

index d552acee0199fec020f3045f4b69316a73c77fb1..93220f5c9ccc5fada152de0858f669367846fdac 100644 (file)
 #  include <stdio.h>
 #  include <unistd.h>
 #  include <sys/times.h>
-   static unsigned g_debugLevel = 2;
-#  define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+   static unsigned g_debugLevel = 3;
+#  define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); }
+#  define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+
+#  define DEBUG_PRINTHEX(l,p,n) { \
+    unsigned debug_u;                   \
+    for (debug_u=0; debug_u<(n); debug_u++)           \
+        DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
+    DEBUGLOGRAW(l, " \n");       \
+}
 
 static unsigned long long GetCurrentClockTimeMicroseconds()
 {
@@ -39,6 +47,7 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
 
 #  define DEBUGLOG(l, ...)      {}    /* disabled */
 #  define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
+#  define DEBUG_PRINTHEX(l,p,n) {}
 
 #endif
 
@@ -184,22 +193,20 @@ typedef struct {
 void ZSTDMT_compressChunk(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
-    buffer_t dstBuff = job->dstBuff;
-    size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize);
-    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
-    hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);   /* flush frame header */
-    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
-    if (job->firstChunk) {   /* preserve frame header when it is first chunk */
-        dstBuff.start = (char*)dstBuff.start + hSize;
-        dstBuff.size -= hSize;
-    } else                  /* otherwise, overwrite */
-        hSize = 0;
+    buffer_t const dstBuff = job->dstBuff;
+    size_t const initError = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize);
+    if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
+    if (!job->firstChunk) {
+        size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);   /* flush frame header */
+        if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+    }
 
+    DEBUGLOG(3, "Compressing : ");
+    DEBUG_PRINTHEX(3, job->srcStart, 12);
     job->cSize = (job->lastChunk) ?   /* last chunk signal */
                  ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
                  ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
-    if (!ZSTD_isError(job->cSize)) job->cSize += hSize;
-    DEBUGLOG(5, "chunk %u : compressed %u bytes into %u bytes  ", (unsigned)job->lastChunk, (unsigned)job->srcSize, (unsigned)job->cSize);
+    DEBUGLOG(3, "compressed %u bytes into %u bytes   (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
 
 _endJob:
     PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
@@ -271,8 +278,10 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
         ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
         mtctx->jobs[jobID].cctx = NULL;
     }
+    memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
     ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);
     mtctx->inBuff.buffer = g_nullBuffer;
+    mtctx->allJobsCompleted = 1;
 }
 
 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
@@ -335,6 +344,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
             mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
 
             DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)chunkSize);
+            DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12);
             POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
 
             frameStartPos += chunkSize;
@@ -345,14 +355,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
     {   unsigned chunkID;
         size_t error = 0, dstPos = 0;
         for (chunkID=0; chunkID<nbChunks; chunkID++) {
-            DEBUGLOG(3, "ready to write chunk %u ", chunkID);
-
+            DEBUGLOG(3, "waiting for chunk %u ", chunkID);
             PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
             while (mtctx->jobs[chunkID].jobCompleted==0) {
                 DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID);
                 pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
             }
             pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+            DEBUGLOG(3, "ready to write chunk %u ", chunkID);
 
             ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
             mtctx->jobs[chunkID].cctx = NULL;
@@ -422,6 +432,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
     {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
         memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
         input->pos += toLoad;
+        zcs->inBuff.filled += toLoad;
     }
 
     if (zcs->inBuff.filled == zcs->inBuffSize) {   /* filled enough : let's compress */
@@ -438,6 +449,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             return ERROR(memory_allocation);
         }
 
+        DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
         zcs->jobs[jobID].src = zcs->inBuff.buffer;
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
@@ -474,6 +486,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         ZSTDMT_jobDescription job = zcs->jobs[jobID];
         if (job.jobCompleted) {   /* job completed : output can be flushed */
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            DEBUGLOG(1, "trying to flush compressed data from job %u \n", (U32)zcs->doneJobID);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
             zcs->jobs[jobID].cctx = NULL;
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
@@ -489,6 +502,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => go to next one */
                 ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
                 zcs->jobs[jobID].dstBuff = g_nullBuffer;
+                zcs->jobs[jobID].jobCompleted = 0;
                 zcs->doneJobID++;
             } else {
                 zcs->jobs[jobID].dstFlushed = job.dstFlushed;   /* save flush level into zcs for later retrieval */
@@ -503,6 +517,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 {
     size_t const srcSize = zcs->inBuff.filled;
 
+    DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
     if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
         size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
         buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
@@ -548,12 +563,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->frameEnded = 1;
         }
 
-        DEBUGLOG(3, "posting job %u   (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
+        DEBUGLOG(1, "posting job %u : %u bytes  (end:%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk);
         POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
         zcs->nextJobID++;
     }
 
     /* check if there is any data available to flush */
+    DEBUGLOG(1, "zcs->doneJobID : %u  ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
     if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
     {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
         PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
@@ -565,6 +581,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         {   /* job completed : output can be flushed */
             ZSTDMT_jobDescription job = zcs->jobs[wJobID];
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            DEBUGLOG(1, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
             if (ZSTD_isError(job.cSize)) {
@@ -577,6 +594,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             job.dstFlushed += toWrite;
             if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
                 ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+                zcs->jobs[wJobID].jobCompleted = 0;
                 zcs->doneJobID++;
             } else {
                 zcs->jobs[wJobID].dstFlushed = job.dstFlushed;