]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
new zstdmt version using generic treadpool
authorYann Collet <cyan@fb.com>
Sat, 31 Dec 2016 05:04:25 +0000 (06:04 +0100)
committerYann Collet <cyan@fb.com>
Sat, 31 Dec 2016 05:04:25 +0000 (06:04 +0100)
lib/common/pool.c
lib/compress/zstdmt_compress.c

index e38881949042738d4393dbdefdef598324b19982..4ec1dfffbba9abfedd23951cca24b0f0c48ec26b 100644 (file)
@@ -46,8 +46,8 @@ struct POOL_ctx_s {
    Waits for jobs and executes them.
    @returns : NULL on failure else non-null.
 */
-static void *POOL_thread(void *opaque) {
-    POOL_ctx *ctx = (POOL_ctx *)opaque;
+static void* POOL_thread(void* opaque) {
+    POOL_ctx* const ctx = (POOL_ctx*)opaque;
     if (!ctx) { return NULL; }
     for (;;) {
         /* Lock the mutex and wait for a non-empty queue or until shutdown */
@@ -61,7 +61,7 @@ static void *POOL_thread(void *opaque) {
             return opaque;
         }
         /* Pop a job off the queue */
-        {   POOL_job job = ctx->queue[ctx->queueHead];
+        {   POOL_job const job = ctx->queue[ctx->queueHead];
             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
             /* Unlock the mutex, signal a pusher, and run the job */
             if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
index a6a4972857dd242885a5a56ac29145b030cd1678..1b925914a47890bf7dfe452c51683b6aeecebfa5 100644 (file)
@@ -1,5 +1,6 @@
 #include <stdlib.h>   /* malloc */
-#include <pthread.h>  /* posix only, to be replaced by a more portable version */
+#include <pool.h>     /* threadpool */
+#include <pthread.h>  /* mutex */
 #include "zstd_internal.h"   /* MIN, ERROR */
 #include "zstdmt_compress.h"
 
@@ -43,176 +44,11 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
 #define ZSTDMT_NBTHREADS_MAX 128
 #define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
 
-typedef struct frameToWrite_s {
-    const void* start;
-    size_t frameSize;
-    unsigned frameID;
-    unsigned isLastFrame;
-} frameToWrite_t;
-
-typedef struct ZSTDMT_dstBuffer_s {
-    ZSTD_outBuffer out;
-    unsigned frameIDToWrite;
-    pthread_mutex_t frameTable_mutex;
-    pthread_mutex_t allFramesWritten_mutex;
-    frameToWrite_t stackedFrame[ZSTDMT_NBSTACKEDFRAMES_MAX];
-    unsigned nbStackedFrames;
-} ZSTDMT_dstBufferManager;
-
-static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t dstCapacity)
-{
-    ZSTDMT_dstBufferManager dbm;
-    dbm.out.dst = dst;
-    dbm.out.size = dstCapacity;
-    dbm.out.pos = 0;
-    dbm.frameIDToWrite = 0;
-    pthread_mutex_init(&dbm.frameTable_mutex, NULL);
-    pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex;
-    pthread_mutex_init(allFramesWritten_mutex, NULL);
-    PTHREAD_MUTEX_LOCK(allFramesWritten_mutex);  /* maybe could be merged into init ? */
-    dbm.nbStackedFrames = 0;
-    return dbm;
-}
-
-/* note : can fail if nbStackedFrames > ZSTDMT_NBSTACKEDFRAMES_MAX.
- * note2 : can only be called from a section with frameTable_mutex already locked */
-static void ZSTDMT_stackFrameToWrite(ZSTDMT_dstBufferManager* dstBufferManager, frameToWrite_t frame) {
-    dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames++] = frame;
-}
-
-
 typedef struct buffer_s {
     void* start;
-    size_t bufferSize;
+    size_t size;
 } buffer_t;
 
-static buffer_t ZSTDMT_getDstBuffer(const ZSTDMT_dstBufferManager* dstBufferManager)
-{
-    ZSTD_outBuffer const out = dstBufferManager->out;
-    buffer_t buf;
-    buf.start = (char*)(out.dst) + out.pos;
-    buf.bufferSize = out.size - out.pos;
-    return buf;
-}
-
-/* condition : stackNumber < dstBufferManager->nbStackedFrames.
- * note : there can only be one write at a time, due to frameID condition */
-static size_t ZSTDMT_writeFrame(ZSTDMT_dstBufferManager* dstBufferManager, unsigned stackNumber)
-{
-    ZSTD_outBuffer const out = dstBufferManager->out;
-    size_t const frameSize = dstBufferManager->stackedFrame[stackNumber].frameSize;
-    const void* const frameStart = dstBufferManager->stackedFrame[stackNumber].start;
-    if (out.pos + frameSize > out.size)
-        return ERROR(dstSize_tooSmall);
-    DEBUGLOG(3, "writing frame %u (%u bytes) ", dstBufferManager->stackedFrame[stackNumber].frameID, (U32)frameSize);
-    memcpy((char*)out.dst + out.pos, frameStart, frameSize);
-    dstBufferManager->out.pos += frameSize;
-    dstBufferManager->frameIDToWrite = dstBufferManager->stackedFrame[stackNumber].frameID + 1;
-    return 0;
-}
-
-static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
-                                   const void* src, size_t srcSize,
-                                   unsigned frameID, unsigned isLastFrame)
-{
-    unsigned lastFrameWritten = 0;
-
-    /* check if correct frame ordering; stack otherwise */
-    DEBUGLOG(5, "considering writing frame %u ", frameID);
-    PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
-    if (frameID != dstBufferManager->frameIDToWrite) {
-        DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u  ", frameID, dstBufferManager->frameIDToWrite);
-        frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
-        ZSTDMT_stackFrameToWrite(dstBufferManager, frame);
-        pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
-        return 0;
-    }
-    pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
-
-    /* write frame
-     * note : only one write possible due to frameID condition */
-    DEBUGLOG(3, "writing frame %u (%u bytes) ", frameID, (U32)srcSize);
-    ZSTD_outBuffer const out = dstBufferManager->out;
-    if (out.pos + srcSize > out.size)
-        return ERROR(dstSize_tooSmall);
-    if (frameID) /* frameID==0 compress directly in dst buffer */
-        memcpy((char*)out.dst + out.pos, src, srcSize);
-    dstBufferManager->out.pos += srcSize;
-    dstBufferManager->frameIDToWrite = frameID+1;
-    lastFrameWritten = isLastFrame;
-
-    /* check if more frames are stacked */
-    PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
-    unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
-    while (frameWritten) {
-        unsigned u;
-        frameID++;
-        frameWritten = 0;
-        for (u=0; u<dstBufferManager->nbStackedFrames; u++) {
-            if (dstBufferManager->stackedFrame[u].frameID == frameID) {
-                pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
-                DEBUGLOG(4, "catch up frame %u ", frameID);
-                { size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u);
-                  if (ZSTD_isError(writeError)) return writeError; }
-                lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
-                dstBufferManager->frameIDToWrite = frameID+1;
-                /* remove frame from stack */
-                PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
-                dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
-                dstBufferManager->nbStackedFrames -= 1;
-                frameWritten = dstBufferManager->nbStackedFrames>0;
-                break;
-    }   }   }
-    pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
-
-    /* end reached : last frame written */
-    if (lastFrameWritten) pthread_mutex_unlock(&dstBufferManager->allFramesWritten_mutex);
-    return 0;
-}
-
-
-
-typedef struct ZSTDMT_jobDescription_s {
-    const void* src;   /* NULL means : kill thread */
-    size_t srcSize;
-    int compressionLevel;
-    ZSTDMT_dstBufferManager* dstManager;
-    unsigned frameNumber;
-    unsigned isLastFrame;
-} ZSTDMT_jobDescription;
-
-typedef struct ZSTDMT_jobAgency_s {
-    pthread_mutex_t jobAnnounce_mutex;
-    pthread_mutex_t jobApply_mutex;
-    ZSTDMT_jobDescription jobAnnounce;
-} ZSTDMT_jobAgency;
-
-/* ZSTDMT_postjob() :
- * This function is blocking as long as previous posted job is not taken.
- * It could be made non-blocking, with a storage queue.
- * But blocking has benefits : on top of memory savings,
- * the caller will be able to measure delay, allowing dynamic speed throttle (via compression level).
- */
-static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
-{
-    DEBUGLOG(5, "starting job posting  ");
-    PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex);   /* wait for a thread to take previous job */
-    DEBUGLOG(5, "job posting mutex acquired ");
-    jobAgency->jobAnnounce = job;   /* post job */
-    pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex);   /* announce */
-    DEBUGLOG(5, "job available now ");
-}
-
-static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
-{
-    PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex);   /* should check return code */
-    ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
-    pthread_mutex_unlock(&jobAgency->jobApply_mutex);
-    return job;
-}
-
-
-
 #define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
 typedef struct ZSTDMT_bufferPool_s {
     pthread_mutex_t bufferPool_mutex;
@@ -227,7 +63,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
         pool->nbBuffers--;
         buffer_t const buf = pool->bTable[pool->nbBuffers];
         pthread_mutex_unlock(&pool->bufferPool_mutex);
-        size_t const availBufferSize = buf.bufferSize;
+        size_t const availBufferSize = buf.size;
         if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))   /* large enough, but not too much */
             return buf;
         free(buf.start);   /* size conditions not respected : create a new buffer */
@@ -235,7 +71,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
     pthread_mutex_unlock(&pool->bufferPool_mutex);
     /* create new buffer */
     buffer_t buf;
-    buf.bufferSize = bSize;
+    buf.size = bSize;
     buf.start = calloc(1, bSize);
     return buf;
 }
@@ -255,79 +91,119 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
 
 
 
-struct ZSTDMT_CCtx_s {
-    pthread_t pthread[ZSTDMT_NBTHREADS_MAX];
-    unsigned nbThreads;
-    ZSTDMT_jobAgency jobAgency;
-    ZSTDMT_bufferPool bufferPool;
-};
+typedef struct {
+    ZSTD_CCtx* cctx;
+    const void* srcStart;
+    size_t srcSize;
+    buffer_t dstBuff;
+    int compressionLevel;
+    unsigned frameID;
+    size_t cSize;
+    unsigned jobCompleted;
+    pthread_mutex_t* jobCompleted_mutex;
+} ZSTDMT_jobDescription;
 
-static void* ZSTDMT_compressionThread(void* arg)
+/* ZSTDMT_compressFrame() : POOL_function type */
+void ZSTDMT_compressFrame(void* jobDescription)
 {
-    if (arg==NULL) return NULL;   /* error : should not be possible */
-    ZSTDMT_CCtx* const mtctx = (ZSTDMT_CCtx*) arg;
-    ZSTDMT_jobAgency* const jobAgency = &mtctx->jobAgency;
-    ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
-    ZSTD_CCtx* const cctx = ZSTD_createCCtx();
-    if (cctx==NULL) return NULL;   /* allocation failure : thread not started */
-    DEBUGLOG(3, "thread %li created  ", (long int)pthread_self());
-    for (;;) {
-        ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
-        if (job.src == NULL) {
-            DEBUGLOG(4, "thread exit  ");
-            ZSTD_freeCCtx(cctx);
-            return NULL;
-        }
-        ZSTDMT_dstBufferManager* dstBufferManager = job.dstManager;
-        size_t const dstBufferCapacity = ZSTD_compressBound(job.srcSize);
-        DEBUGLOG(4, "requesting a dstBuffer for frame %u", job.frameNumber);
-        buffer_t const dstBuffer = job.frameNumber ? ZSTDMT_getBuffer(pool, dstBufferCapacity) : ZSTDMT_getDstBuffer(dstBufferManager);  /* lack params */
-        DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
-        //size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
-        size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
-        if (ZSTD_isError(cSize)) return (void*)(cSize);   /* error - find a better way */
-        size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame);   /* pas clair */
-        if (ZSTD_isError(writeError)) return (void*)writeError;
-        if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
+    DEBUGLOG(5, "Entering ZSTDMT_compressFrame() ");
+    ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
+    DEBUGLOG(5, "compressing %u bytes with ZSTD_compressCCtx : ", (unsigned)job->srcSize);
+    job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel);
+    DEBUGLOG(5, "compressed to %u bytes  ", (unsigned)job->cSize);
+    job->jobCompleted = 1;
+    DEBUGLOG(5, "unlocking mutex jobCompleted_mutex");
+    pthread_mutex_unlock(job->jobCompleted_mutex);
+    DEBUGLOG(5, "ZSTDMT_compressFrame completed");
+}
+
+
+/* note : calls to CCtxPool only from main thread */
+
+typedef struct {
+    unsigned totalCCtx;
+    unsigned availCCtx;
+    ZSTD_CCtx* cctx[1];   /* variable size */
+} ZSTDMT_CCtxPool;
+
+static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
+{
+    ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*));
+    if (!cctxPool) return NULL;
+    {   unsigned u;
+        for (u=0; u<nbThreads; u++)
+            cctxPool->cctx[u] = ZSTD_createCCtx();   /* check for NULL result ! */
+    }
+    cctxPool->totalCCtx = cctxPool->availCCtx = nbThreads;
+    return cctxPool;
+}
+
+static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool)
+{
+    if (pool->availCCtx) {
+        pool->availCCtx--;
+        return pool->cctx[pool->availCCtx];
     }
+    /* should not be possible, since totalCCtx==nbThreads */
+    return ZSTD_createCCtx();
 }
 
+static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
+{
+    if (pool->availCCtx < pool->totalCCtx)
+        pool->cctx[pool->availCCtx++] = cctx;
+    else
+    /* should not be possible, since totalCCtx==nbThreads */
+        ZSTD_freeCCtx(cctx);
+}
+
+static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
+{
+    unsigned u;
+    for (u=0; u<pool->totalCCtx; u++)
+        ZSTD_freeCCtx(pool->cctx[u]);
+    free(pool);
+}
+
+
+struct ZSTDMT_CCtx_s {
+    POOL_ctx* factory;
+    ZSTDMT_bufferPool buffPool;
+    ZSTDMT_CCtxPool* cctxPool;
+    unsigned nbThreads;
+    pthread_mutex_t jobCompleted_mutex;
+    ZSTDMT_jobDescription jobs[1];   /* variable size */
+};
+
 ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
 {
     if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
-    ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
+    ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription));
     if (!cctx) return NULL;
-    /* init jobAgency */
-    pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL);   /* check return value ? */
-    pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
-    PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex);   /* no job at beginning */
-    /* init bufferPool */
-    pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
-    /* start all workers */
     cctx->nbThreads = nbThreads;
-    DEBUGLOG(2, "nbThreads : %u \n", nbThreads);
-    unsigned t;
-    for (t = 0; t < nbThreads; t++) {
-        pthread_create(&cctx->pthread[t], NULL, ZSTDMT_compressionThread, cctx);  /* check return value ? */
-    }
+    cctx->factory = POOL_create(nbThreads, 1);
+    pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL);
+    cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
+    pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
     return cctx;
 }
 
-size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx)
+size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)  /* incompleted ! */
 {
-    /* free threads */
-    /* free mutex (if necessary) */
+    POOL_free(mtctx->factory);
+    /* free mutexes (if necessary) */
     /* free bufferPool */
-    free(cctx);   /* incompleted ! */
+    ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+    free(mtctx);
     return 0;
 }
 
+
 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
                            void* dst, size_t dstCapacity,
                      const void* src, size_t srcSize,
                            int compressionLevel)
 {
-    ZSTDMT_jobAgency* jobAgency = &mtctx->jobAgency;
     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
     size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
     unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
@@ -336,7 +212,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
     size_t remainingSrcSize = srcSize;
     const char* const srcStart = (const char*)src;
     size_t frameStartPos = 0;
-    ZSTDMT_dstBufferManager dbm = ZSTDMT_createDstBufferManager(dst, dstCapacity);
+
 
     DEBUGLOG(2, "windowLog : %u   => frameSizeTarget : %u      ", params.cParams.windowLog, (U32)frameSizeTarget);
     DEBUGLOG(2, "nbFrames : %u   (size : %u bytes)   ", nbFrames, (U32)avgFrameSize);
@@ -344,15 +220,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
     {   unsigned u;
         for (u=0; u<nbFrames; u++) {
             size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
+            size_t const dstBufferCapacity = u ? ZSTD_compressBound(frameSize) : dstCapacity;
+            buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(&mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
+            ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
+
+            mtctx->jobs[u].srcStart = srcStart + frameStartPos;
+            mtctx->jobs[u].srcSize = frameSize;
+            mtctx->jobs[u].compressionLevel = compressionLevel;
+            mtctx->jobs[u].dstBuff = dstBuffer;
+            mtctx->jobs[u].cctx = cctx;
+            mtctx->jobs[u].frameID = u;
+            mtctx->jobs[u].jobCompleted = 0;
+            mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
+
             DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)frameSize);
-            ZSTDMT_jobDescription const job = { srcStart+frameStartPos, frameSize, compressionLevel,
-                                                &dbm, u, u==(nbFrames-1) };
-            ZSTDMT_postjob(jobAgency, job);
+            POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]);
+
             frameStartPos += frameSize;
             remainingSrcSize -= frameSize;
     }   }
+    /* note : since nbFrames <= nbThreads, all jobs should be running immediately in parallel */
+
+    {   unsigned frameID;
+        size_t dstPos = 0;
+        for (frameID=0; frameID<nbFrames; frameID++) {
+            DEBUGLOG(3, "ready to write frame %u ", frameID);
+            while (mtctx->jobs[frameID].jobCompleted==0) {
+                DEBUGLOG(4, "waiting for signal jobCompleted_mutex")
+                pthread_mutex_lock(&mtctx->jobCompleted_mutex);
+            }
+            {   size_t const cSize = mtctx->jobs[frameID].cSize;
+                if (ZSTD_isError(cSize)) return cSize;
+                if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);
+                if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize);
+                dstPos += cSize ;
+            }
+            ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx);
+            ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
+        }
+        DEBUGLOG(4, "compressed size : %u  ", (U32)dstPos);
+        return dstPos;
+    }
 
-    PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex);
-    DEBUGLOG(4, "compressed size : %u  ", (U32)dbm.out.pos);
-    return dbm.out.pos;
 }