typedef struct ZSTDMT_bufferPool_s {
pthread_mutex_t poolMutex;
+ size_t bufferSize;
unsigned totalBuffers;
unsigned nbBuffers;
ZSTD_customMem cMem;
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL;
pthread_mutex_init(&bufPool->poolMutex, NULL);
+ bufPool->bufferSize = 64 KB;
bufPool->totalBuffers = maxNbBuffers;
bufPool->nbBuffers = 0;
bufPool->cMem = cMem;
return poolSize + totalBufferSize;
}
+static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize)
+{
+ bufPool->bufferSize = bSize;
+}
+
/** ZSTDMT_getBuffer() :
- * assumption : invocation from main thread only ! */
-static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
+ * assumption : bufPool must be valid */
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{
+ size_t const bSize = bufPool->bufferSize;
DEBUGLOG(2, "ZSTDMT_getBuffer");
pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers) { /* try to use an existing buffer */
DEBUGLOG(2, "create a new buffer");
{ buffer_t buffer;
void* const start = ZSTD_malloc(bSize, bufPool->cMem);
- if (start==NULL) bSize = 0;
buffer.start = start; /* note : start can be NULL if malloc fails ! */
- buffer.size = bSize;
+ buffer.size = (start==NULL) ? 0 : bSize;
return buffer;
}
}
}
if (dstBuff.start == NULL) {
- size_t const dstCapacity = ZSTD_compressBound(job->srcSize);
- dstBuff = ZSTDMT_getBuffer(job->bufPool, dstCapacity);
+ dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
}
assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is useful to avoid allocating extra buffers */
+ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
U32 nbJobs = nbChunks;
zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
+ ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
zcs->inBuff.buffer = g_nullBuffer;
zcs->dictSize = 0;
zcs->doneJobID = 0;
if (!endFrame) {
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool, zcs->inBuffSize);
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
/* fill input buffer */
if (input->size > input->pos) { /* support NULL input */
if (mtctx->inBuff.buffer.start == NULL) {
- mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool, mtctx->inBuffSize);
+ mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
mtctx->inBuff.filled = 0;
}