]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : flush() only lock to read shared job members
authorYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 20:15:43 +0000 (12:15 -0800)
committerYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 20:15:43 +0000 (12:15 -0800)
Other job members are accessed directly.
This avoids a full job copy, which would access everything,
including a few members that are supposed to be used by worker only,
uselessly requiring additional locks to avoid race conditions.

lib/compress/zstdmt_compress.c

index 7b37c5b336dbfabe11c531b8431701e6da402bf3..c7df32d3723b0acea698f115a1d498aeb03f2ae3 100644 (file)
@@ -309,16 +309,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 
 typedef struct {
     size_t   consumed;                   /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
-    size_t   cSize;                      /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
-    ZSTD_pthread_mutex_t* mtctx_mutex;   /* Thread-safe - used by mtctx and (all) worker */
-    ZSTD_pthread_cond_t* mtctx_cond;     /* Thread-safe - used by mtctx and (all) worker */
-    ZSTDMT_CCtxPool* cctxPool;           /* Thread-safe - used by mtctx and (all) worker */
-    ZSTDMT_bufferPool* bufPool;          /* Thread-safe - used by mtctx and (all) worker */
-    buffer_t dstBuff;                    /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
+    size_t   cSize;                      /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
+    ZSTD_pthread_mutex_t* mtctx_mutex;   /* Thread-safe - used by mtctx and (all) workers */
+    ZSTD_pthread_cond_t* mtctx_cond;     /* Thread-safe - used by mtctx and (all) workers */
+    ZSTDMT_CCtxPool* cctxPool;           /* Thread-safe - used by mtctx and (all) workers */
+    ZSTDMT_bufferPool* bufPool;          /* Thread-safe - used by mtctx and (all) workers */
+    buffer_t dstBuff;                    /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
     buffer_t srcBuff;                    /* set by mtctx, then released by worker => no barrier */
-    const void* prefixStart;             /* set by mtctx, then read by worker => no barrier */
+    const void* prefixStart;             /* set by mtctx, then read and set0 by worker => no barrier */
     size_t   prefixSize;                 /* set by mtctx, then read by worker => no barrier */
-    size_t   srcSize;                    /* set by mtctx, then read by worker => no barrier */
+    size_t   srcSize;                    /* set by mtctx, then read by worker & mtctx => no barrier */
     unsigned firstChunk;                 /* set by mtctx, then read by worker => no barrier */
     unsigned lastChunk;                  /* set by mtctx, then read by worker => no barrier */
     ZSTD_CCtx_params params;             /* set by mtctx, then read by worker => no barrier */
@@ -341,15 +341,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
         job->cSize = ERROR(memory_allocation);
         goto _endJob;
     }
-    if (dstBuff.start == NULL) {
+    if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
         dstBuff = ZSTDMT_getBuffer(job->bufPool);
         if (dstBuff.start==NULL) {
             job->cSize = ERROR(memory_allocation);
             goto _endJob;
         }
-        ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
         job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
-        ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
     }
 
     /* init */
@@ -1087,6 +1085,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
 
         if ( (srcSize == 0)
           && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
+            DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
             assert(endOp == ZSTD_e_end);  /* only possible case : need to end the frame with an empty last block */
             ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
             mtctx->nextJobID++;
@@ -1094,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         }
     }
 
-    DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
+    DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u, jobNb == %u (mod:%u))",
                 mtctx->nextJobID,
                 (U32)mtctx->jobs[jobID].srcSize,
                 mtctx->jobs[jobID].lastChunk,
-                mtctx->doneJobID,
-                mtctx->doneJobID & mtctx->jobIDMask);
+                mtctx->nextJobID,
+                jobID);
     if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
         mtctx->nextJobID++;
         mtctx->jobReady = 0;
@@ -1118,15 +1117,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
 {
     unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
-    DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
+    DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",
+                blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
     assert(output->size >= output->pos);
 
     ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
-    if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
+    if (  blockToFlush
+      && (mtctx->doneJobID < mtctx->nextJobID) ) {
         assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
-        while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
+        while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {  /* nothing to flush */
             if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
-                DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
+                DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",
                             mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
                 break;
             }
@@ -1135,60 +1136,60 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
             ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);  /* block when nothing available to flush but more to come */
     }   }
 
-    /* some output is available to be flushed */
-    {   ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
+    /* try to flush something */
+    {   size_t cSize = mtctx->jobs[wJobID].cSize;
+        size_t const srcConsumed = mtctx->jobs[wJobID].consumed;
         ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
-        if (ZSTD_isError(job.cSize)) {
+        if (ZSTD_isError(cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
-                        mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
+                        mtctx->doneJobID, ZSTD_getErrorName(cSize));
             ZSTDMT_waitForAllJobsCompleted(mtctx);
             ZSTDMT_releaseAllJobResources(mtctx);
-            return job.cSize;
+            return cSize;
         }
         /* add frame checksum if necessary (can only happen once) */
-        assert(job.consumed <= job.srcSize);
-        if ( (job.consumed == job.srcSize)
-          && job.frameChecksumNeeded ) {
+        assert(srcConsumed <= mtctx->jobs[wJobID].srcSize);
+        if ( (srcConsumed == mtctx->jobs[wJobID].srcSize)   /* job completed -> worker no longer active */
+          && mtctx->jobs[wJobID].frameChecksumNeeded ) {
             U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
             DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
-            MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
-            job.cSize += 4;
-            mtctx->jobs[wJobID].cSize += 4;
+            MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);
+            cSize += 4;
+            mtctx->jobs[wJobID].cSize += 4;  /* can write this shared value, as worker is no longer active */
             mtctx->jobs[wJobID].frameChecksumNeeded = 0;
         }
-        assert(job.cSize >= job.dstFlushed);
-        if (job.dstBuff.start != NULL) {  /* dst buffer present : some work is ongoing or completed */
-            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
-                        (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
-            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+        if (cSize > 0) {  /* compression is ongoing or completed */
+            size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
+            DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)",
+                        (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize);
+            assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
+            memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite);
             output->pos += toWrite;
-            job.dstFlushed += toWrite;
+            mtctx->jobs[wJobID].dstFlushed += toWrite;  /* can write : this value is only used by mtctx */
 
-            if ( (job.consumed == job.srcSize)      /* job completed */
-              && (job.dstFlushed == job.cSize) ) {  /* output buffer fully flushed => free this job position */
+            if ( (srcConsumed == mtctx->jobs[wJobID].srcSize)    /* job completed */
+              && (mtctx->jobs[wJobID].dstFlushed == cSize) ) {   /* output buffer fully flushed => free this job position */
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
-                        mtctx->doneJobID, (U32)job.dstFlushed);
-                assert(job.srcBuff.start == NULL);  /* srcBuff supposed already released */
-                ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
+                        mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+                assert(mtctx->jobs[wJobID].srcBuff.start == NULL);  /* srcBuff supposed already released */
+                ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
-                mtctx->consumed += job.srcSize;
-                mtctx->produced += job.cSize;
+                mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
+                mtctx->consumed += mtctx->jobs[wJobID].srcSize;
+                mtctx->produced += cSize;
                 mtctx->doneJobID++;
-            } else {
-                mtctx->jobs[wJobID].dstFlushed = job.dstFlushed;   /* remember how much was flushed for next attempt */
         }   }
 
         /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
-        if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-        if (job.srcSize > job.consumed) return 1;   /* current job not completely compressed */
+        if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed);
+        if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1;   /* current job not completely compressed */
     }
-    if (mtctx->doneJobID < mtctx->nextJobID) return 1;   /* some more jobs to flush */
-    if (mtctx->jobReady) return 1;   /* one job is ready and queued! */
-    if (mtctx->inBuff.filled > 0) return 1;   /* input not empty */
-    mtctx->allJobsCompleted = mtctx->frameEnded;   /* last frame entirely flushed */
-    if (end == ZSTD_e_end) return !mtctx->frameEnded;  /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
-    return 0;   /* everything flushed */
+    if (mtctx->doneJobID < mtctx->nextJobID) return 1;   /* some more jobs ongoing */
+    if (mtctx->jobReady) return 1;      /* one job is ready to push, just not yet in the list */
+    if (mtctx->inBuff.filled > 0) return 1;   /* input is not empty, and still needs to be converted into a job */
+    mtctx->allJobsCompleted = mtctx->frameEnded;   /* all chunks are entirely flushed => if this one is last one, frame is completed */
+    if (end == ZSTD_e_end) return !mtctx->frameEnded;  /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
+    return 0;   /* internal buffers fully flushed */
 }
 
 
@@ -1217,10 +1218,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     }
 
     /* single-pass shortcut (note : synchronous-mode) */
-    if ( (mtctx->nextJobID == 0)     /* just started */
-      && (mtctx->inBuff.filled == 0) /* nothing buffered */
-      && (!mtctx->jobReady)          /* no job already created */
-      && (endOp == ZSTD_e_end)       /* end order */
+    if ( (mtctx->nextJobID == 0)      /* just started */
+      && (mtctx->inBuff.filled == 0)  /* nothing buffered */
+      && (!mtctx->jobReady)           /* no job already created */
+      && (endOp == ZSTD_e_end)        /* end order */
       && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
         size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
                 (char*)output->dst + output->pos, output->size - output->pos,