}
+static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
+{
+ size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
+ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
+ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
+ unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+
+ if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+
+ DEBUGLOG(4, "preparing job %u to compress %u bytes \n", zcs->nextJobID, (U32)srcSize);
+ zcs->jobs[jobID].src = zcs->inBuff.buffer;
+ zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+ zcs->jobs[jobID].srcSize = srcSize;
+ zcs->jobs[jobID].params = zcs->params;
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
+ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
+ zcs->jobs[jobID].dict = NULL;
+ zcs->jobs[jobID].dictSize = 0;
+ zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
+ zcs->jobs[jobID].dstBuff = dstBuffer;
+ zcs->jobs[jobID].cctx = cctx;
+ zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
+ zcs->jobs[jobID].lastChunk = endFrame;
+ zcs->jobs[jobID].jobCompleted = 0;
+ zcs->jobs[jobID].dstFlushed = 0;
+ zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+ zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+ /* get a new buffer for next input */
+ if (!endFrame) {
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+ zcs->inBuff.filled -= srcSize;
+ memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + srcSize, zcs->inBuff.filled);
+ } else {
+ zcs->inBuff.buffer = g_nullBuffer;
+ zcs->inBuff.filled = 0;
+ zcs->frameEnded = 1;
+ if (zcs->nextJobID == 0)
+ zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
+ }
+
+ DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+ POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
+ zcs->nextJobID++;
+ return 0;
+}
+
+
/* ZSTDMT_flushNextJob() :
* output : will be updated with amount of data flushed .
* blockToFlush : if >0, the function will block and wait if there is no data available to flush .
- * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
+ * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
- size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
- buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
- ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
- unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
-
- if ((cctx==NULL) || (dstBuffer.start==NULL)) { /* cannot get resources for next job */
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return ERROR(memory_allocation);
- }
-
- DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
- zcs->jobs[jobID].src = zcs->inBuff.buffer;
- zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
- zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
- zcs->jobs[jobID].params = zcs->params;
- if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
- zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
- zcs->jobs[jobID].dict = NULL;
- zcs->jobs[jobID].dictSize = 0;
- zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
- zcs->jobs[jobID].dstBuff = dstBuffer;
- zcs->jobs[jobID].cctx = cctx;
- zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
- zcs->jobs[jobID].lastChunk = 0;
- zcs->jobs[jobID].jobCompleted = 0;
- zcs->jobs[jobID].dstFlushed = 0;
- zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
- zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
- /* get a new buffer for next input - save remaining into it */
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
- if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return ERROR(memory_allocation);
- }
- zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
- memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
-
- DEBUGLOG(3, "posting job %u (%u bytes) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
- POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* This call is blocking if all workers are busy */
- zcs->nextJobID++;
+ CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
}
/* check for data to flush */
- ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */
+ CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
- size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
- buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
- ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
- unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
-
- if ((cctx==NULL) || (dstBuffer.start==NULL)) {
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return ERROR(memory_allocation);
- }
-
- zcs->jobs[jobID].src = zcs->inBuff.buffer;
- zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
- zcs->jobs[jobID].srcSize = srcSize;
- zcs->jobs[jobID].params = zcs->params;
- if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
- zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
- zcs->jobs[jobID].dict = NULL;
- zcs->jobs[jobID].dictSize = 0;
- zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
- zcs->jobs[jobID].dstBuff = dstBuffer;
- zcs->jobs[jobID].cctx = cctx;
- zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
- zcs->jobs[jobID].lastChunk = endFrame;
- zcs->jobs[jobID].jobCompleted = 0;
- zcs->jobs[jobID].dstFlushed = 0;
- zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
- zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
- /* get a new buffer for next input */
- if (!endFrame) {
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
- zcs->inBuff.filled = 0;
- if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return ERROR(memory_allocation);
- }
- } else {
- zcs->inBuff.buffer = g_nullBuffer;
- zcs->inBuff.filled = 0;
- zcs->frameEnded = 1;
- if (zcs->nextJobID == 0)
- zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
- }
-
- DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
- POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
- zcs->nextJobID++;
+ CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
}
/* check if there is any data available to flush */