/* ------------------------------------------ */
typedef struct {
- size_t consumed; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
- size_t cSize; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
+ size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
+ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */
ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */
- buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
- buffer_t src; /* set by mtctx, then modified by worker => no barrier */
- const void* srcStart; /* set by mtctx, then read by worker => no barrier */
- size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
- size_t srcSize; /* set by mtctx, then read by worker => no barrier */
- unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
- unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
+ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
+ buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */
+ const void* prefixStart; /* set by mtctx, then read by worker => no barrier */
+ size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
+ size_t srcSize; /* set by mtctx, then read by worker => no barrier */
+ unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
+ unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
- const void* const src = (const char*)job->srcStart + job->prefixSize;
+ const void* const src = (const char*)job->prefixStart + job->prefixSize;
buffer_t dstBuff = job->dstBuff;
/* ressources */
goto _endJob;
} }
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
- job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
+ job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
NULL, /*cdict*/
jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) {
_endJob:
/* release resources */
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
- ZSTDMT_releaseBuffer(job->bufPool, job->src);
- job->src = g_nullBuffer; job->srcStart = NULL;
+ ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
+ job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
- DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start);
- ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
- mtctx->jobs[jobID].src = g_nullBuffer;
+ DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start);
+ ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff);
+ mtctx->jobs[jobID].srcBuff = g_nullBuffer;
}
memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start);
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
- mtctx->jobs[u].src = g_nullBuffer;
- mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
+ mtctx->jobs[u].srcBuff = g_nullBuffer;
+ mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
}
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize);
- DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12);
+ DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize;
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
- mtctx->jobs[chunkID].srcStart = NULL;
+ mtctx->jobs[chunkID].prefixStart = NULL;
{ size_t const cSize = mtctx->jobs[chunkID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
*/
static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
- assert(job->srcSize == 0);
assert(job->lastChunk == 1);
- assert(job->firstChunk == 0); /* first chunk needs to create frame header too */
- assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */
- { buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
- if (dstBuff.start==NULL) return ERROR(memory_allocation);
- job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */
- assert(dstBuff.size >= ZSTD_blockHeaderSize);
- job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
- assert(!ZSTD_isError(job->cSize));
- assert(job->consumed == 0);
- }
+ assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */
+ assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */
+ /* A job created by streaming variant starts with a src buffer, but no dst buffer.
+ * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
+ * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
+ * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
+ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
+ assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
+ assert(job->srcBuff.size >= ZSTD_blockHeaderSize);
+ job->dstBuff = job->srcBuff;
+ job->srcBuff = g_nullBuffer;
+ job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size);
+ assert(!ZSTD_isError(job->cSize));
+ assert(job->consumed == 0);
return 0;
}
if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
- mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
- mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
+ mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
+ mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
+ mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
mtctx->jobs[jobID].srcSize = srcSize;
mtctx->jobs[jobID].consumed = 0;
mtctx->jobs[jobID].cSize = 0;
- mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
mtctx->jobs[jobID].params = mtctx->params;
/* do not calculate checksum within sections, but write it in header for first section */
}
mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
- (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
+ (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize,
mtctx->inBuff.filled);
mtctx->prefixSize = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */