#define ZSTDMT_NBTHREADS_MAX 128
-#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
+
+/* === Buffer Pool === */
typedef struct buffer_s {
void* start;
size_t size;
} buffer_t;
-#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
typedef struct ZSTDMT_bufferPool_s {
- pthread_mutex_t bufferPool_mutex;
- buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX];
+ unsigned totalBuffers;;
unsigned nbBuffers;
+ buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool;
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads)
+{
+ unsigned const maxNbBuffers = 2*nbThreads + 2;
+ ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + maxNbBuffers * sizeof(buffer_t));
+ if (bufPool==NULL) return NULL;
+ bufPool->totalBuffers = maxNbBuffers;
+ return bufPool;
+}
+
+static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
+{
+ unsigned u;
+ if (!bufPool) return; /* compatibility with free on NULL */
+ for (u=0; u<bufPool->totalBuffers; u++)
+ free(bufPool->bTable[u].start);
+ free(bufPool);
+}
+
+/* note : invocation only from main thread ! */
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
{
- PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
if (pool->nbBuffers) { /* try to use an existing buffer */
pool->nbBuffers--;
buffer_t const buf = pool->bTable[pool->nbBuffers];
- pthread_mutex_unlock(&pool->bufferPool_mutex);
size_t const availBufferSize = buf.size;
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
return buf;
free(buf.start); /* size conditions not respected : create a new buffer */
}
- pthread_mutex_unlock(&pool->bufferPool_mutex);
/* create new buffer */
buffer_t buf;
buf.size = bSize;
- buf.start = calloc(1, bSize);
+ buf.start = malloc(bSize);
return buf;
}
/* effectively store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
{
- PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
- if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
- pthread_mutex_unlock(&pool->bufferPool_mutex);
- free(buf.start);
+ if (pool->nbBuffers < pool->totalBuffers) {
+ pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
return;
}
- pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
- pthread_mutex_unlock(&pool->bufferPool_mutex);
+ /* Reached bufferPool capacity (should not happen) */
+ free(buf.start);
}
}
-/* note : calls to CCtxPool only from main thread */
+/* === CCtx Pool === */
typedef struct {
unsigned totalCCtx;
ZSTD_CCtx* cctx[1]; /* variable size */
} ZSTDMT_CCtxPool;
+/* note : CCtxPool invocation only from main thread */
+
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
{
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*));
struct ZSTDMT_CCtx_s {
POOL_ctx* factory;
- ZSTDMT_bufferPool buffPool;
+ ZSTDMT_bufferPool* buffPool;
ZSTDMT_CCtxPool* cctxPool;
unsigned nbThreads;
pthread_mutex_t jobCompleted_mutex;
if (!cctx) return NULL;
cctx->nbThreads = nbThreads;
cctx->factory = POOL_create(nbThreads, 1);
- pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL);
+ cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
return cctx;
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) /* incompleted ! */
{
POOL_free(mtctx->factory);
- /* free mutexes (if necessary) */
- /* free bufferPool */
+ ZSTDMT_freeBufferPool(mtctx->buffPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+ pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
free(mtctx);
return 0;
}
for (u=0; u<nbFrames; u++) {
size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
size_t const dstBufferCapacity = u ? ZSTD_compressBound(frameSize) : dstCapacity;
- buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(&mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
+ buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
mtctx->jobs[u].srcStart = srcStart + frameStartPos;
{ size_t const cSize = mtctx->jobs[frameID].cSize;
if (ZSTD_isError(cSize)) return cSize;
if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);
- if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize);
+ if (frameID) {
+ memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, cSize);
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
+ }
dstPos += cSize ;
}
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx);
- ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
}
- DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
+ DEBUGLOG(3, "compressed size : %u ", (U32)dstPos);
return dstPos;
}