* and full queues.
*/
ctx->queueSize = queueSize + 1;
- ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
+ ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0;
ctx->queueTail = 0;
pthread_mutex_init(&ctx->queueMutex, NULL);
pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0;
/* Allocate space for the thread handles */
- ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
+ ctx->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0;
/* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
+ ctx->numThreads * sizeof(pthread_t);
}
-void POOL_add(void *ctxVoid, POOL_function function, void *opaque) {
- POOL_ctx *ctx = (POOL_ctx *)ctxVoid;
+void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
+ POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; }
pthread_mutex_lock(&ctx->queueMutex);
int data;
};
-POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
+POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
(void)numThreads;
(void)queueSize;
- return (POOL_ctx *)malloc(sizeof(POOL_ctx));
+ return (POOL_ctx*)malloc(sizeof(POOL_ctx));
}
-void POOL_free(POOL_ctx *ctx) {
- if (ctx) free(ctx);
+void POOL_free(POOL_ctx* ctx) {
+ free(ctx);
}
-void POOL_add(void *ctx, POOL_function function, void *opaque) {
+void POOL_add(void* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
}
-size_t POOL_sizeof(POOL_ctx *ctx) {
+size_t POOL_sizeof(POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx);
}
/* ===== Buffer Pool ===== */
+/* a single Buffer Pool can be invoked from multiple threads in parallel */
typedef struct buffer_s {
void* start;
static const buffer_t g_nullBuffer = { NULL, 0 };
typedef struct ZSTDMT_bufferPool_s {
+ pthread_mutex_t poolMutex;
unsigned totalBuffers;
unsigned nbBuffers;
ZSTD_customMem cMem;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL;
+ pthread_mutex_init(&bufPool->poolMutex, NULL);
bufPool->totalBuffers = maxNbBuffers;
bufPool->nbBuffers = 0;
bufPool->cMem = cMem;
if (!bufPool) return; /* compatibility with free on NULL */
for (u=0; u<bufPool->totalBuffers; u++)
ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
+ pthread_mutex_destroy(&bufPool->poolMutex);
ZSTD_free(bufPool, bufPool->cMem);
}
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);
unsigned u;
size_t totalBufferSize = 0;
+ pthread_mutex_lock(&bufPool->poolMutex);
for (u=0; u<bufPool->totalBuffers; u++)
totalBufferSize += bufPool->bTable[u].size;
+ pthread_mutex_unlock(&bufPool->poolMutex);
return poolSize + totalBufferSize;
}
/** ZSTDMT_getBuffer() :
* assumption : invocation from main thread only ! */
-static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
{
DEBUGLOG(2, "ZSTDMT_getBuffer");
- if (pool->nbBuffers) { /* try to use an existing buffer */
- buffer_t const buf = pool->bTable[--(pool->nbBuffers)];
+ pthread_mutex_lock(&bufPool->poolMutex);
+ if (bufPool->nbBuffers) { /* try to use an existing buffer */
+ buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.size;
- if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))
+ if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
/* large enough, but not too much */
+ pthread_mutex_unlock(&bufPool->poolMutex);
return buf;
+ }
/* size conditions not respected : scratch this buffer, create new one */
DEBUGLOG(2, "existing buffer does not meet size conditions => freeing");
- ZSTD_free(buf.start, pool->cMem);
+ ZSTD_free(buf.start, bufPool->cMem);
}
+ pthread_mutex_unlock(&bufPool->poolMutex);
/* create new buffer */
DEBUGLOG(2, "create a new buffer");
{ buffer_t buffer;
- void* const start = ZSTD_malloc(bSize, pool->cMem);
+ void* const start = ZSTD_malloc(bSize, bufPool->cMem);
if (start==NULL) bSize = 0;
buffer.start = start; /* note : start can be NULL if malloc fails ! */
buffer.size = bSize;
}
/* store buffer for later re-use, up to pool capacity */
-static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
+static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{
DEBUGLOG(2, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* release on NULL */
- if (pool->nbBuffers < pool->totalBuffers) {
- pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
+ pthread_mutex_lock(&bufPool->poolMutex);
+ if (bufPool->nbBuffers < bufPool->totalBuffers) {
+ bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */
+ pthread_mutex_unlock(&bufPool->poolMutex);
return;
}
+ pthread_mutex_unlock(&bufPool->poolMutex);
/* Reached bufferPool capacity (should not happen) */
DEBUGLOG(2, "buffer pool capacity reached => freeing ");
- ZSTD_free(buf.start, pool->cMem);
+ ZSTD_free(buf.start, bufPool->cMem);
}
/* ===== CCtx Pool ===== */
-
-/* a single cctxPool can be called from multiple threads in parallel */
+/* a single CCtx Pool can be invoked from multiple threads in parallel */
typedef struct {
pthread_mutex_t poolMutex;
job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
- DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
+ DEBUGLOG(2, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));