]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
refactor job creation
authorYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 01:41:49 +0000 (17:41 -0800)
committerYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 01:41:49 +0000 (17:41 -0800)
code shared accross ZSTDMT_{compress,flush,end}Stream(),
for easier maintenance

lib/compress/zstdmt_compress.c

index 1baccf0fca6e0e6a7d5106d736c68a03c6244610..e5790807812d7d56c5fffdf4b28559c0355df7a8 100644 (file)
@@ -536,10 +536,71 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
 }
 
 
+static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
+{
+    size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
+    buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
+    ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
+    unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+
+    if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+        zcs->jobs[jobID].jobCompleted = 1;
+        zcs->nextJobID++;
+        ZSTDMT_waitForAllJobsCompleted(zcs);
+        ZSTDMT_releaseAllJobResources(zcs);
+        return ERROR(memory_allocation);
+    }
+
+    DEBUGLOG(4, "preparing job %u to compress %u bytes \n", zcs->nextJobID, (U32)srcSize);
+    zcs->jobs[jobID].src = zcs->inBuff.buffer;
+    zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+    zcs->jobs[jobID].srcSize = srcSize;
+    zcs->jobs[jobID].params = zcs->params;
+    if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
+    zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
+    zcs->jobs[jobID].dict = NULL;
+    zcs->jobs[jobID].dictSize = 0;
+    zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
+    zcs->jobs[jobID].dstBuff = dstBuffer;
+    zcs->jobs[jobID].cctx = cctx;
+    zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
+    zcs->jobs[jobID].lastChunk = endFrame;
+    zcs->jobs[jobID].jobCompleted = 0;
+    zcs->jobs[jobID].dstFlushed = 0;
+    zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+    zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+    /* get a new buffer for next input */
+    if (!endFrame) {
+        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+        if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
+            zcs->jobs[jobID].jobCompleted = 1;
+            zcs->nextJobID++;
+            ZSTDMT_waitForAllJobsCompleted(zcs);
+            ZSTDMT_releaseAllJobResources(zcs);
+            return ERROR(memory_allocation);
+        }
+        zcs->inBuff.filled -= srcSize;
+        memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + srcSize, zcs->inBuff.filled);
+    } else {
+        zcs->inBuff.buffer = g_nullBuffer;
+        zcs->inBuff.filled = 0;
+        zcs->frameEnded = 1;
+        if (zcs->nextJobID == 0)
+            zcs->params.fParams.checksumFlag = 0;   /* single chunk : checksum is calculated directly within worker thread */
+    }
+
+    DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+    POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
+    zcs->nextJobID++;
+    return 0;
+}
+
+
 /* ZSTDMT_flushNextJob() :
  * output : will be updated with amount of data flushed .
  * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
- * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
+ * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
@@ -613,57 +674,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 
     if ( (zcs->inBuff.filled == zcs->inBuffSize)  /* filled enough : let's compress */
         && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {   /* avoid overwriting job round buffer */
-        size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
-        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
-        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
-        unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
-
-        if ((cctx==NULL) || (dstBuffer.start==NULL)) {  /* cannot get resources for next job */
-            zcs->jobs[jobID].jobCompleted = 1;
-            zcs->nextJobID++;
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
-            return ERROR(memory_allocation);
-        }
-
-        DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
-        zcs->jobs[jobID].src = zcs->inBuff.buffer;
-        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
-        zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
-        zcs->jobs[jobID].params = zcs->params;
-        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
-        zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
-        zcs->jobs[jobID].dict = NULL;
-        zcs->jobs[jobID].dictSize = 0;
-        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
-        zcs->jobs[jobID].dstBuff = dstBuffer;
-        zcs->jobs[jobID].cctx = cctx;
-        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
-        zcs->jobs[jobID].lastChunk = 0;
-        zcs->jobs[jobID].jobCompleted = 0;
-        zcs->jobs[jobID].dstFlushed = 0;
-        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
-        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
-        /* get a new buffer for next input - save remaining into it */
-        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
-        if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-            zcs->jobs[jobID].jobCompleted = 1;
-            zcs->nextJobID++;
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
-            return ERROR(memory_allocation);
-        }
-        zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
-        memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
-
-        DEBUGLOG(3, "posting job %u   (%u bytes)  (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
-        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* This call is blocking if all workers are busy */
-        zcs->nextJobID++;
+        CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
     }
 
     /* check for data to flush */
-    ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize));  /* we'll block if it wasn't possible to create new job due to saturation */
+    CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */
 
     /* recommended next input size : fill current input buffer */
     return zcs->inBuffSize - zcs->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@@ -677,59 +692,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
     DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
     if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
-        size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
-        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
-        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
-        unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
-
-        if ((cctx==NULL) || (dstBuffer.start==NULL)) {
-            zcs->jobs[jobID].jobCompleted = 1;
-            zcs->nextJobID++;
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
-            return ERROR(memory_allocation);
-        }
-
-        zcs->jobs[jobID].src = zcs->inBuff.buffer;
-        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
-        zcs->jobs[jobID].srcSize = srcSize;
-        zcs->jobs[jobID].params = zcs->params;
-        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
-        zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
-        zcs->jobs[jobID].dict = NULL;
-        zcs->jobs[jobID].dictSize = 0;
-        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
-        zcs->jobs[jobID].dstBuff = dstBuffer;
-        zcs->jobs[jobID].cctx = cctx;
-        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
-        zcs->jobs[jobID].lastChunk = endFrame;
-        zcs->jobs[jobID].jobCompleted = 0;
-        zcs->jobs[jobID].dstFlushed = 0;
-        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
-        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
-        /* get a new buffer for next input */
-        if (!endFrame) {
-            zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
-            zcs->inBuff.filled = 0;
-            if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-                zcs->jobs[jobID].jobCompleted = 1;
-                zcs->nextJobID++;
-                ZSTDMT_waitForAllJobsCompleted(zcs);
-                ZSTDMT_releaseAllJobResources(zcs);
-                return ERROR(memory_allocation);
-            }
-        } else {
-            zcs->inBuff.buffer = g_nullBuffer;
-            zcs->inBuff.filled = 0;
-            zcs->frameEnded = 1;
-            if (zcs->nextJobID == 0)
-                zcs->params.fParams.checksumFlag = 0;   /* single chunk : checksum is calculated directly within worker thread */
-        }
-
-        DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
-        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
-        zcs->nextJobID++;
+        CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
     }
 
     /* check if there is any data available to flush */