]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : refactor a few member names
authorYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 21:00:14 +0000 (13:00 -0800)
committerYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 21:00:14 +0000 (13:00 -0800)
for clarity

lib/compress/zstdmt_compress.c

index c7df32d3723b0acea698f115a1d498aeb03f2ae3..3bf585d4937c462f961a804ffa4fd07c2a633680 100644 (file)
@@ -83,7 +83,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
 
 typedef struct buffer_s {
     void* start;
-    size_t size;
+    size_t capacity;
 } buffer_t;
 
 static const buffer_t g_nullBuffer = { NULL, 0 };
@@ -136,7 +136,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
     size_t totalBufferSize = 0;
     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
     for (u=0; u<bufPool->totalBuffers; u++)
-        totalBufferSize += bufPool->bTable[u].size;
+        totalBufferSize += bufPool->bTable[u].capacity;
     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
 
     return poolSize + totalBufferSize;
@@ -165,12 +165,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
     if (bufPool->nbBuffers) {   /* try to use an existing buffer */
         buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
-        size_t const availBufferSize = buf.size;
+        size_t const availBufferSize = buf.capacity;
         bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer;
         if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) {
             /* large enough, but not too much */
             DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u",
-                        bufPool->nbBuffers, (U32)buf.size);
+                        bufPool->nbBuffers, (U32)buf.capacity);
             ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
             return buf;
         }
@@ -184,7 +184,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
     {   buffer_t buffer;
         void* const start = ZSTD_malloc(bSize, bufPool->cMem);
         buffer.start = start;   /* note : start can be NULL if malloc fails ! */
-        buffer.size = (start==NULL) ? 0 : bSize;
+        buffer.capacity = (start==NULL) ? 0 : bSize;
         if (start==NULL) {
             DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!");
         } else {
@@ -203,7 +203,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
     if (bufPool->nbBuffers < bufPool->totalBuffers) {
         bufPool->bTable[bufPool->nbBuffers++] = buf;  /* stored for later use */
         DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
-                    (U32)buf.size, (U32)(bufPool->nbBuffers-1));
+                    (U32)buf.capacity, (U32)(bufPool->nbBuffers-1));
         ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
         return;
     }
@@ -372,7 +372,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
                 goto _endJob;
     }   }   }
     if (!job->firstChunk) {  /* flush and overwrite frame header when it's not first job */
-        size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
+        size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0);
         if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
         DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize);
         ZSTD_invalidateRepCodes(cctx);
@@ -386,7 +386,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
         const BYTE* ip = (const BYTE*) src;
         BYTE* const ostart = (BYTE*)dstBuff.start;
         BYTE* op = ostart;
-        BYTE* oend = op + dstBuff.size;
+        BYTE* oend = op + dstBuff.capacity;
         int blockNb;
         DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
         assert(job->cSize == 0);
@@ -437,6 +437,8 @@ _endJob:
 
 typedef struct {
     buffer_t buffer;
+    size_t targetCapacity;  /* note : buffers provided by the pool may be larger than target capacity */
+    size_t prefixSize;
     size_t filled;
 } inBuff_t;
 
@@ -449,8 +451,6 @@ struct ZSTDMT_CCtx_s {
     ZSTD_pthread_cond_t mtctx_cond;
     ZSTD_CCtx_params params;
     size_t targetSectionSize;
-    size_t inBuffSize;
-    size_t prefixSize;
     size_t targetPrefixSize;
     inBuff_t inBuff;
     int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
@@ -663,13 +663,13 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
  * Note : mutex will be acquired during statistics collection. */
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
-    ZSTD_frameProgression fs;
+    ZSTD_frameProgression fps;
     DEBUGLOG(6, "ZSTDMT_getFrameProgression");
     ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
-    fs.consumed = mtctx->consumed;
-    fs.produced = mtctx->produced;
-    assert(mtctx->inBuff.filled >= mtctx->prefixSize);
-    fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize);
+    fps.consumed = mtctx->consumed;
+    fps.produced = mtctx->produced;
+    assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize);
+    fps.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->inBuff.prefixSize);
     {   unsigned jobNb;
         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
@@ -678,13 +678,13 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
             size_t const cResult = mtctx->jobs[wJobID].cSize;
             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
-            fs.consumed += mtctx->jobs[wJobID].consumed;
-            fs.ingested += mtctx->jobs[wJobID].srcSize;
-            fs.produced += produced;
+            fps.consumed += mtctx->jobs[wJobID].consumed;
+            fps.ingested += mtctx->jobs[wJobID].srcSize;
+            fps.produced += produced;
         }
     }
     ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
-    return fs;
+    return fps;
 }
 
 
@@ -928,11 +928,11 @@ size_t ZSTDMT_initCStream_internal(
     if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
     if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
     DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
-    mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize;
-    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10));
-    ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) );
+    mtctx->inBuff.targetCapacity = mtctx->targetPrefixSize + mtctx->targetSectionSize;
+    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuff.targetCapacity>>10));
+    ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuff.targetCapacity, ZSTD_compressBound(mtctx->targetSectionSize)) );
     mtctx->inBuff.buffer = g_nullBuffer;
-    mtctx->prefixSize = 0;
+    mtctx->inBuff.prefixSize = 0;
     mtctx->doneJobID = 0;
     mtctx->nextJobID = 0;
     mtctx->frameEnded = 0;
@@ -1009,10 +1009,10 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
      * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
     assert(job->dstBuff.start == NULL);   /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
     assert(job->srcBuff.start != NULL);   /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
-    assert(job->srcBuff.size >= ZSTD_blockHeaderSize);   /* no buffer should ever be that small */
+    assert(job->srcBuff.capacity >= ZSTD_blockHeaderSize);   /* no buffer should ever be that small */
     job->dstBuff = job->srcBuff;
     job->srcBuff = g_nullBuffer;
-    job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size);
+    job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity);
     assert(!ZSTD_isError(job->cSize));
     assert(job->consumed == 0);
 }
@@ -1030,15 +1030,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
 
     if (!mtctx->jobReady) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
-                    mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
+                    mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefixSize);
         assert(mtctx->jobs[jobID].srcBuff.start == NULL);   /* no buffer left : supposed already released */
         mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
         mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
-        mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
+        mtctx->jobs[jobID].prefixSize = mtctx->inBuff.prefixSize;
         mtctx->jobs[jobID].srcSize = srcSize;
+        assert(mtctx->inBuff.filled >= srcSize + mtctx->inBuff.prefixSize);
         mtctx->jobs[jobID].consumed = 0;
         mtctx->jobs[jobID].cSize = 0;
-        assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
         mtctx->jobs[jobID].params = mtctx->params;
         /* do not calculate checksum within sections, but write it in header for first section */
         if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0;
@@ -1055,28 +1055,28 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
 
         if (mtctx->params.fParams.checksumFlag)
-            XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize);
+            XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize);
 
         /* get a new buffer for next input */
         if (!endFrame) {
-            size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize);
+            size_t const newPrefixSize = MIN(mtctx->inBuff.filled, mtctx->targetPrefixSize);
             mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
-            if (mtctx->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
+            if (mtctx->inBuff.buffer.start == NULL) {    /* not enough memory to allocate a new input buffer */
                 mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0;
                 mtctx->nextJobID++;
                 ZSTDMT_waitForAllJobsCompleted(mtctx);
                 ZSTDMT_releaseAllJobResources(mtctx);
                 return ERROR(memory_allocation);
             }
-            mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
+            mtctx->inBuff.filled -= (mtctx->inBuff.prefixSize + srcSize) - newPrefixSize;
             memmove(mtctx->inBuff.buffer.start,   /* copy end of current job into next job, as "prefix" */
-                (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize,
+                (const char*)mtctx->jobs[jobID].prefixStart + mtctx->inBuff.prefixSize + srcSize - newPrefixSize,
                 mtctx->inBuff.filled);
-            mtctx->prefixSize = newPrefixSize;
+            mtctx->inBuff.prefixSize = newPrefixSize;
         } else {   /* endFrame==1 => no need for another input buffer */
             mtctx->inBuff.buffer = g_nullBuffer;
             mtctx->inBuff.filled = 0;
-            mtctx->prefixSize = 0;
+            mtctx->inBuff.prefixSize = 0;
             mtctx->frameEnded = endFrame;
             if (mtctx->nextJobID == 0) {
                 /* single chunk exception : checksum is already calculated directly within worker thread */
@@ -1202,7 +1202,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                                      ZSTD_inBuffer* input,
                                      ZSTD_EndDirective endOp)
 {
-    size_t const newJobThreshold = mtctx->prefixSize + mtctx->targetSectionSize;
+    size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize;
     unsigned forwardInputProgress = 0;
     DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp);
     assert(output->pos <= output->size);
@@ -1240,16 +1240,18 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     if ( (!mtctx->jobReady)
       && (input->size > input->pos) ) {   /* support NULL input */
         if (mtctx->inBuff.buffer.start == NULL) {
-            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);  /* note : allocation can fail, in which case, no forward input progress */
+            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);  /* note : allocation can fail, in which case, buffer.start==NULL */
             mtctx->inBuff.filled = 0;
-            if ( (mtctx->inBuff.buffer.start == NULL)    /* allocation failure */
+            if ( (mtctx->inBuff.buffer.start == NULL)        /* allocation failure */
               && (mtctx->doneJobID == mtctx->nextJobID) ) {  /* and nothing to flush */
-                return ERROR(memory_allocation);   /* no forward progress possible => output an error */
-        }   }
-        if (mtctx->inBuff.buffer.start != NULL) {
-            size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
+                return ERROR(memory_allocation);    /* no forward progress possible => output an error */
+            }
+            assert(mtctx->inBuff.buffer.capacity >= mtctx->inBuff.targetCapacity);  /* pool must provide a buffer >= targetCapacity */
+        }
+        if (mtctx->inBuff.buffer.start != NULL) {   /* no buffer for input, but it's possible to flush, and then reclaim the buffer */
+            size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuff.targetCapacity - mtctx->inBuff.filled);
             DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
-                        (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize);
+                        (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuff.targetCapacity);
             memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
             input->pos += toLoad;
             mtctx->inBuff.filled += toLoad;
@@ -1263,7 +1265,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
       || (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
       || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0))  /* something to flush : let's go */
       || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) {   /* must finish the frame with a zero-size block */
-        size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->prefixSize, mtctx->targetSectionSize);
+        size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->inBuff.prefixSize, mtctx->targetSectionSize);
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
     }
 
@@ -1280,13 +1282,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_in
     CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
 
     /* recommended next input size : fill current input buffer */
-    return mtctx->inBuffSize - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
+    return mtctx->inBuff.targetCapacity - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
 }
 
 
 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame)
 {
-    size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize;
+    size_t const srcSize = mtctx->inBuff.filled - mtctx->inBuff.prefixSize;
     DEBUGLOG(5, "ZSTDMT_flushStream_internal");
 
     if ( mtctx->jobReady     /* one job ready for a worker to pick up */