buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool;
-static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
{
- unsigned const maxNbBuffers = 2*nbWorkers + 3;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL;
}
-static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
+static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
{
- unsigned const maxNbBuffers = 2*nbWorkers + 3;
if (srcBufPool==NULL) return NULL;
if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
return srcBufPool;
size_t const bSize = srcBufPool->bufferSize; /* forward parameters */
ZSTDMT_bufferPool* newBufPool;
ZSTDMT_freeBufferPool(srcBufPool);
- newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
if (newBufPool==NULL) return newBufPool;
ZSTDMT_setBufferSize(newBufPool, bSize);
return newBufPool;
ZSTD_customFree(buf.start, bufPool->cMem);
}
+/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
+ * The 3 additional buffers are as follows:
+ * 1 buffer for input loading
+ * 1 buffer for "next input" when submitting current one
+ * 1 buffer stuck in queue */
+#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) 2*nbWorkers + 3
+
+/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
+ * So we only need one seq buffer per worker. */
+#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) nbWorkers
/* ===== Seq Pool Wrapper ====== */
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{
- ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
if (seqPool == NULL) return NULL;
ZSTDMT_setNbSeq(seqPool, 0);
return seqPool;
static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
{
- return ZSTDMT_expandBufferPool(pool, nbWorkers);
+ return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));
}
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
- mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
{
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
- mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
+ mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
* Private use only. Init streaming operation.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
+ * mtctx can be freshly constructed or reused from a prior compression.
+ * If mtctx is reused, memory allocations from the prior compression may not be freed,
+ * even if they are not needed for the current compression.
* @return : 0, or an error code */
-size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
+size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);