]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt: minor code refactor for clarity
authorYann Collet <cyan@fb.com>
Sat, 27 Jan 2018 01:08:58 +0000 (17:08 -0800)
committerYann Collet <cyan@fb.com>
Sat, 27 Jan 2018 01:08:58 +0000 (17:08 -0800)
lib/compress/zstdmt_compress.c

index d78257b2ec2cd86550fbb8db197174f2383bc775..e0486450751a1c21585c15b9154dbca69cfca81a 100644 (file)
@@ -379,35 +379,35 @@ void ZSTDMT_compressChunk(void* jobDescription)
     }
 
     /* compress */
-    if (sizeof(size_t) > sizeof(int))
-        assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX);   /* check overflow */
-
-    {   int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
+    {   size_t const blockSize = ZSTD_BLOCKSIZE_MAX;
+        int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize);
         const BYTE* ip = (const BYTE*) src;
         BYTE* const ostart = (BYTE*)dstBuff.start;
         BYTE* op = ostart;
         BYTE* oend = op + dstBuff.capacity;
         int blockNb;
+        if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize);   /* check overflow */
         DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
         assert(job->cSize == 0);
         for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
-            size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
+            size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize);
             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
-            ip += ZSTD_BLOCKSIZE_MAX;
+            ip += blockSize;
             op += cSize; assert(op < oend);
             /* stats */
             ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
-            job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
+            job->consumed = blockSize * blockNb;
             DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
             ZSTD_pthread_cond_signal(job->mtctx_cond);   /* warns some more data is ready to be flushed */
             ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
         }
         /* last block */
+        assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0);  /* blockSize must be power of 2 for mask==(blockSize-1) to work */
         if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
-            size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
-            size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
+            size_t const lastBlockSize1 = job->srcSize & (blockSize-1);
+            size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1;
             size_t const cSize = (job->lastChunk) ?
                  ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
@@ -469,21 +469,10 @@ struct ZSTDMT_CCtx_s {
     const ZSTD_CDict* cdict;
 };
 
-/* Sets parameters relevant to the compression job, initializing others to
- * default values. Notably, nbThreads should probably be zero. */
-static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
-{
-    ZSTD_CCtx_params jobParams;
-    memset(&jobParams, 0, sizeof(jobParams));
-
-    jobParams.cParams = params.cParams;
-    jobParams.fParams = params.fParams;
-    jobParams.compressionLevel = params.compressionLevel;
-
-    jobParams.ldmParams = params.ldmParams;
-    return jobParams;
-}
-
+/* ZSTDMT_allocJobsTable()
+ * allocate, and just init to zero, a job table.
+ * update *nbJobsPtr to next power of 2 value, as size of table
+ * No reverse free() function is provided : just use ZSTD_free() */
 static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
 {
     U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
@@ -524,6 +513,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
     mtctx->allJobsCompleted = 1;
     mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
     mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
+    assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */
     mtctx->jobIDMask = nbJobs - 1;
     mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
     mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
@@ -649,6 +639,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
     }
 }
 
+/* Sets parameters relevant to the compression job, initializing others to
+ * default values. Notably, nbThreads should probably be zero. */
+static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
+{
+    ZSTD_CCtx_params jobParams;
+    memset(&jobParams, 0, sizeof(jobParams));
+
+    jobParams.cParams = params.cParams;
+    jobParams.fParams = params.fParams;
+    jobParams.compressionLevel = params.compressionLevel;
+
+    jobParams.ldmParams = params.ldmParams;
+    return jobParams;
+}
+
 /* ZSTDMT_getNbThreads():
  * @return nb threads currently active in mtctx.
  * mtctx must be valid */
@@ -1139,8 +1144,9 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
     }   }
 
     /* try to flush something */
-    {   size_t cSize = mtctx->jobs[wJobID].cSize;
-        size_t const srcConsumed = mtctx->jobs[wJobID].consumed;
+    {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
+        size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
+        size_t const srcSize = mtctx->jobs[wJobID].srcSize;        /* read-only, could be done after mutex lock, but no-declaration-after-statement */
         ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
         if (ZSTD_isError(cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
@@ -1150,8 +1156,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
             return cSize;
         }
         /* add frame checksum if necessary (can only happen once) */
-        assert(srcConsumed <= mtctx->jobs[wJobID].srcSize);
-        if ( (srcConsumed == mtctx->jobs[wJobID].srcSize)   /* job completed -> worker no longer active */
+        assert(srcConsumed <= srcSize);
+        if ( (srcConsumed == srcSize)   /* job completed -> worker no longer active */
           && mtctx->jobs[wJobID].frameChecksumNeeded ) {
             U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
             DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
@@ -1161,17 +1167,19 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
             mtctx->jobs[wJobID].frameChecksumNeeded = 0;
         }
         if (cSize > 0) {   /* compression is ongoing or completed */
-            size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
+            size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
             DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
-                        (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize);
+                        (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize);
             assert(mtctx->doneJobID < mtctx->nextJobID);
             assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
             assert(mtctx->jobs[wJobID].dstBuff.start != NULL);
-            memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite);
-            output->pos += toWrite;
-            mtctx->jobs[wJobID].dstFlushed += toWrite;  /* can write : this value is only used by mtctx */
+            memcpy((char*)output->dst + output->pos,
+                   (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,
+                   toFlush);
+            output->pos += toFlush;
+            mtctx->jobs[wJobID].dstFlushed += toFlush;  /* can write : this value is only used by mtctx */
 
-            if ( (srcConsumed == mtctx->jobs[wJobID].srcSize)    /* job completed */
+            if ( (srcConsumed == srcSize)    /* job completed */
               && (mtctx->jobs[wJobID].dstFlushed == cSize) ) {   /* output buffer fully flushed => free this job position */
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
@@ -1179,14 +1187,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
-                mtctx->consumed += mtctx->jobs[wJobID].srcSize;
+                mtctx->consumed += srcSize;
                 mtctx->produced += cSize;
                 mtctx->doneJobID++;
         }   }
 
         /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
         if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed);
-        if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1;   /* current job not completely compressed */
+        if (srcSize > srcConsumed) return 1;   /* current job not completely compressed */
     }
     if (mtctx->doneJobID < mtctx->nextJobID) return 1;   /* some more jobs ongoing */
     if (mtctx->jobReady) return 1;      /* one job is ready to push, just not yet in the list */