]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
buffer pool : all buffers have same size
authorYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 22:17:25 +0000 (15:17 -0700)
committerYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 22:17:25 +0000 (15:17 -0700)
to reduce memory fragmentation.
They can be used for in or out, interchangeably.

lib/compress/zstdmt_compress.c

index 9f30d31814073a17eb82cf93091c5f98468aea8c..703d25e3070f4b1bf62d41af878eaf1033a5659c 100644 (file)
@@ -84,6 +84,7 @@ static const buffer_t g_nullBuffer = { NULL, 0 };
 
 typedef struct ZSTDMT_bufferPool_s {
     pthread_mutex_t poolMutex;
+    size_t bufferSize;
     unsigned totalBuffers;
     unsigned nbBuffers;
     ZSTD_customMem cMem;
@@ -97,6 +98,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo
         sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
     if (bufPool==NULL) return NULL;
     pthread_mutex_init(&bufPool->poolMutex, NULL);
+    bufPool->bufferSize = 64 KB;
     bufPool->totalBuffers = maxNbBuffers;
     bufPool->nbBuffers = 0;
     bufPool->cMem = cMem;
@@ -128,10 +130,16 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
     return poolSize + totalBufferSize;
 }
 
+static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize)
+{
+    bufPool->bufferSize = bSize;
+}
+
 /** ZSTDMT_getBuffer() :
- *  assumption : invocation from main thread only ! */
-static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
+ *  assumption : bufPool must be valid */
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
 {
+    size_t const bSize = bufPool->bufferSize;
     DEBUGLOG(2, "ZSTDMT_getBuffer");
     pthread_mutex_lock(&bufPool->poolMutex);
     if (bufPool->nbBuffers) {   /* try to use an existing buffer */
@@ -151,9 +159,8 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
     DEBUGLOG(2, "create a new buffer");
     {   buffer_t buffer;
         void* const start = ZSTD_malloc(bSize, bufPool->cMem);
-        if (start==NULL) bSize = 0;
         buffer.start = start;   /* note : start can be NULL if malloc fails ! */
-        buffer.size = bSize;
+        buffer.size = (start==NULL) ? 0 : bSize;
         return buffer;
     }
 }
@@ -306,8 +313,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
     }
 
     if (dstBuff.start == NULL) {
-        size_t const dstCapacity = ZSTD_compressBound(job->srcSize);
-        dstBuff = ZSTDMT_getBuffer(job->bufPool, dstCapacity);
+        dstBuff = ZSTDMT_getBuffer(job->bufPool);
         if (dstBuff.start==NULL) {
             job->cSize = ERROR(memory_allocation);
             goto _endJob;
@@ -530,6 +536,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
         return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
     }
     assert(avgChunkSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is useful to avoid allocating extra buffers */
+    ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
 
     if (nbChunks > mtctx->jobIDMask+1) {  /* enlarge job table */
         U32 nbJobs = nbChunks;
@@ -690,6 +697,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
     zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
     DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
     zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
+    ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
     zcs->inBuff.buffer = g_nullBuffer;
     zcs->dictSize = 0;
     zcs->doneJobID = 0;
@@ -767,7 +775,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     if (!endFrame) {
         size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
         DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
-        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool, zcs->inBuffSize);
+        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
         if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
@@ -910,7 +918,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     /* fill input buffer */
     if (input->size > input->pos) {   /* support NULL input */
         if (mtctx->inBuff.buffer.start == NULL) {
-            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool, mtctx->inBuffSize);
+            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
             if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
             mtctx->inBuff.filled = 0;
         }