ZSTD_pthread_cond_t queuePopCond;
/* Indicates if the queue is shutting down */
int shutdown;
+
+ /* external mutex for the external condition */
+ ZSTD_pthread_mutex_t* extMutex;
+ /* external condition variable to set when a job is completed */
+ ZSTD_pthread_cond_t* extCond;
};
/* POOL_thread() :
ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
/* Unlock the mutex, signal a pusher, and run the job */
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
- ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ if (ctx->extMutex != NULL) {
+ assert(ctx->extCond != NULL);
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ ZSTD_pthread_mutex_lock(ctx->extMutex);
+ ZSTD_pthread_cond_signal(ctx->extCond);
+ ZSTD_pthread_mutex_unlock(ctx->extMutex);
+ } else {
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ }
job.function(job.opaque);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
- ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ if (ctx->extMutex != NULL) {
+ assert(ctx->extCond != NULL);
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ ZSTD_pthread_mutex_lock(ctx->extMutex);
+ ZSTD_pthread_cond_signal(ctx->extCond);
+ ZSTD_pthread_mutex_unlock(ctx->extMutex);
+ } else {
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ }
}
} /* for (;;) */
assert(0); /* Unreachable */
error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
if (error) { POOL_free(ctx); return NULL; }
}
+ ctx->extMutex = NULL;
+ ctx->extCond = NULL;
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
+ ctx->threadCapacity * sizeof(ZSTD_pthread_t);
}
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
+{
+ ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+ ctx->extMutex = mutex;
+ ctx->extCond = cond;
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ return;
+}
/* @return : 0 on success, 1 on error */
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
return 1;
}
+int POOL_canAcceptJob(POOL_ctx* ctx)
+{
+ int r;
+ ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+ r = !isQueueFull(ctx);
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ return r;
+}
#else /* ZSTD_MULTITHREAD not defined */
return sizeof(*ctx);
}
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
+{
+ (void)ctx; (void)mutex; (void)cond;
+ return;
+}
+
#endif /* ZSTD_MULTITHREAD */
#include "zstd_deps.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_customMem */
#include "../zstd.h"
+#include "threading.h" /* ZSTD_pthread_mutex_t, ZSTD_pthread_cond_t */
typedef struct POOL_ctx_s POOL_ctx;
void POOL_free(POOL_ctx* ctx);
-/*! POOL_joinJobs() :
- * Waits for all queued jobs to finish executing.
- */
-void POOL_joinJobs(POOL_ctx* ctx);
-
/*! POOL_resize() :
* Expands or shrinks pool's number of threads.
* This is more efficient than releasing + creating a new context,
*/
size_t POOL_sizeof(const POOL_ctx* ctx);
+
+/*! POOL_sizeof() :
+ * Pass a condition (and its associated mutex) to set whenever a job slot get freed.
+ * Note: can pass NULL to disable currently set condition.
+ */
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond);
+
/*! POOL_function :
* The function type that can be added to a thread pool.
*/
*/
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
-
/*! POOL_tryAdd() :
* Add the job `function(opaque)` to thread pool _if_ a queue slot is available.
* Returns immediately even if not (does not block).
*/
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);
+/*! POOL_canAcceptJob() :
+ * Tells if will be able to accept a new job without blocking.
+ * @return : 1 if true, 0 if not (queue full)
+ */
+int POOL_canAcceptJob(POOL_ctx* ctx);
+
+/*! POOL_joinJobs() :
+ * Waits for all queued jobs to finish executing.
+ */
+void POOL_joinJobs(POOL_ctx* ctx);
+
#endif
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
+ POOL_setExtCond(mtctx->factory, &mtctx->flushMutex, &mtctx->flushCond);
DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);
return mtctx;
}
* for the overlap (if > 0), then one to fill which doesn't overlap
* with the LDM window.
*/
- size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
+ size_t const nbWorkers = MAX((size_t)mtctx->params.nbWorkers, 1);
+ size_t const minSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
+ size_t const extraSlackBuffers = MAX(1, nbWorkers/4); /* for fluidity, when jobs are completed out of order */
+ size_t const nbSlackBuffers = minSlackBuffers + extraSlackBuffers;
size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers;
/* Compute the total size, and always have enough slack */
- size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
size_t const sectionsSize = mtctx->targetJobSize * nbWorkers;
size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
if (mtctx->roundBuff.capacity < capacity) {
assert(job->consumed == 0);
}
-/* @returns 1 if there is anything ready to flush */
-static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx)
-{
- unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
- int r = 0;
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
- r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize;
- ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
- return r;
-}
-
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
mtctx->nextJobID,
jobID);
- if (1 || ZSTDMT_anythingToFlush(mtctx)) {
- if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
- mtctx->nextJobID++;
- mtctx->jobReady = 0;
- return 1;
- } else {
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
- mtctx->jobReady = 1;
- return 0;
- }
- } else {
- /* block here, wait for next available job */
- POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]);
+ if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
+ return 1;
}
- return 1;
+
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+ mtctx->jobReady = 1;
+ return 0;
}
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size);
break;
}
- DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
- mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+ DEBUGLOG(5, "waiting for something to flush from job %u (%u input left)",
+ mtctx->doneJobID, (unsigned)(mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed));
if (mtctx->jobs[wJobID].flush_mutex == NULL) {
mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex;
mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond;
}
- DEBUGLOG(6, "waiting to flush something (%zu left)", mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+ /* note: if a job was completed between POOL_tryAdd() and this waiting condition,
+ * the signal, which was already issued, will be lost.
+ * It just reduces an opportunity to start a new job immediately */
ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); /* block waiting for something to flush */
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
- DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
+ DEBUGLOG(6, "flushCond triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
+ break; /* can be triggered with nothing to flush, when a job was just completed */
} }
/* try to flush something */