return poolSize + totalBufferSize;
}
+/* ZSTDMT_setBufferSize() :
+ * all future buffers provided by this buffer pool will have _at least_ this size
+ * note : it's better for all buffers to have same size,
+ * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
{
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
/* ZSTDMT_writeLastEmptyBlock()
- * Write a single empty block with an end-of-frame
- * to finish a frame.
- * Completed synchronously.
- * @return : 0, or an error code (can fail due to memory allocation)
+ * Write a single empty block with an end-of-frame to finish a frame.
+ * Job must be created from streaming variant.
+ * This function is always successfull if expected conditions are fulfilled.
*/
-static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
+static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
assert(job->lastChunk == 1);
assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */
* It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
* When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
* This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
- assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
- assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
- assert(job->srcBuff.size >= ZSTD_blockHeaderSize);
+ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
+ assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
+ assert(job->srcBuff.size >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */
job->dstBuff = job->srcBuff;
job->srcBuff = g_nullBuffer;
job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size);
assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0);
- return 0;
}
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
+ assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */
mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
- CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) );
+ ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
mtctx->nextJobID++;
return 0;
}
output->pos += toWrite;
job.dstFlushed += toWrite;
- if ( (job.consumed == job.srcSize)
- && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
+ if ( (job.consumed == job.srcSize) /* job completed */
+ && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
mtctx->doneJobID, (U32)job.dstFlushed);
+ assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */
ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->consumed += job.srcSize;