From: Yann Collet Date: Sun, 2 Feb 2025 03:42:23 +0000 (-0800) Subject: main thread is awaken when a job position is freed X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=de9b5c7c028183e5a0c5831cfe30f6df884aa6a1;p=thirdparty%2Fzstd.git main thread is awaken when a job position is freed resulting in less "wasted" idle time --- diff --git a/lib/common/pool.c b/lib/common/pool.c index 3adcefc9a..39f38dbea 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -57,6 +57,11 @@ struct POOL_ctx_s { ZSTD_pthread_cond_t queuePopCond; /* Indicates if the queue is shutting down */ int shutdown; + + /* external mutex for the external condition */ + ZSTD_pthread_mutex_t* extMutex; + /* external condition variable to set when a job is completed */ + ZSTD_pthread_cond_t* extCond; }; /* POOL_thread() : @@ -89,7 +94,15 @@ static void* POOL_thread(void* opaque) { ctx->queueEmpty = (ctx->queueHead == ctx->queueTail); /* Unlock the mutex, signal a pusher, and run the job */ ZSTD_pthread_cond_signal(&ctx->queuePushCond); - ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + if (ctx->extMutex != NULL) { + assert(ctx->extCond != NULL); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(ctx->extMutex); + ZSTD_pthread_cond_signal(ctx->extCond); + ZSTD_pthread_mutex_unlock(ctx->extMutex); + } else { + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + } job.function(job.opaque); @@ -97,7 +110,15 @@ static void* POOL_thread(void* opaque) { ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; ZSTD_pthread_cond_signal(&ctx->queuePushCond); - ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + if (ctx->extMutex != NULL) { + assert(ctx->extCond != NULL); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(ctx->extMutex); + ZSTD_pthread_cond_signal(ctx->extCond); + ZSTD_pthread_mutex_unlock(ctx->extMutex); + } else { + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + } } } /* for (;;) */ assert(0); /* Unreachable */ @@ -138,6 +159,8 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); if (error) { POOL_free(ctx); return NULL; } } + ctx->extMutex = NULL; + ctx->extCond = NULL; ctx->shutdown = 0; /* Allocate space for the thread handles */ ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem); @@ -210,6 +233,14 @@ size_t POOL_sizeof(const POOL_ctx* ctx) { + ctx->threadCapacity * sizeof(ZSTD_pthread_t); } +void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond) +{ + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + ctx->extMutex = mutex; + ctx->extCond = cond; + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return; +} /* @return : 0 on success, 1 on error */ static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) @@ -309,6 +340,14 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) return 1; } +int POOL_canAcceptJob(POOL_ctx* ctx) +{ + int r; + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + r = !isQueueFull(ctx); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return r; +} #else /* ZSTD_MULTITHREAD not defined */ @@ -368,4 +407,10 @@ size_t POOL_sizeof(const POOL_ctx* ctx) { return sizeof(*ctx); } +void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond) +{ + (void)ctx; (void)mutex; (void)cond; + return; +} + #endif /* ZSTD_MULTITHREAD */ diff --git a/lib/common/pool.h b/lib/common/pool.h index f39b7f1eb..7c06cacfb 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -15,6 +15,7 @@ #include "zstd_deps.h" #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_customMem */ #include "../zstd.h" +#include "threading.h" /* ZSTD_pthread_mutex_t, ZSTD_pthread_cond_t */ typedef struct POOL_ctx_s POOL_ctx; @@ -35,11 +36,6 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, void POOL_free(POOL_ctx* ctx); -/*! POOL_joinJobs() : - * Waits for all queued jobs to finish executing. - */ -void POOL_joinJobs(POOL_ctx* ctx); - /*! POOL_resize() : * Expands or shrinks pool's number of threads. * This is more efficient than releasing + creating a new context, @@ -57,6 +53,13 @@ int POOL_resize(POOL_ctx* ctx, size_t numThreads); */ size_t POOL_sizeof(const POOL_ctx* ctx); + +/*! POOL_sizeof() : + * Pass a condition (and its associated mutex) to set whenever a job slot get freed. + * Note: can pass NULL to disable currently set condition. + */ +void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond); + /*! POOL_function : * The function type that can be added to a thread pool. */ @@ -70,7 +73,6 @@ typedef void (*POOL_function)(void*); */ void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque); - /*! POOL_tryAdd() : * Add the job `function(opaque)` to thread pool _if_ a queue slot is available. * Returns immediately even if not (does not block). @@ -78,4 +80,15 @@ void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque); */ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque); +/*! POOL_canAcceptJob() : + * Tells if will be able to accept a new job without blocking. + * @return : 1 if true, 0 if not (queue full) + */ +int POOL_canAcceptJob(POOL_ctx* ctx); + +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx); + #endif diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 4af236fd0..91f158084 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1019,6 +1019,7 @@ ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZST ZSTDMT_freeCCtx(mtctx); return NULL; } + POOL_setExtCond(mtctx->factory, &mtctx->flushMutex, &mtctx->flushCond); DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers); return mtctx; } @@ -1350,10 +1351,12 @@ size_t ZSTDMT_initCStream_internal( * for the overlap (if > 0), then one to fill which doesn't overlap * with the LDM window. */ - size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); + size_t const nbWorkers = MAX((size_t)mtctx->params.nbWorkers, 1); + size_t const minSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); + size_t const extraSlackBuffers = MAX(1, nbWorkers/4); /* for fluidity, when jobs are completed out of order */ + size_t const nbSlackBuffers = minSlackBuffers + extraSlackBuffers; size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers; /* Compute the total size, and always have enough slack */ - size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); size_t const sectionsSize = mtctx->targetJobSize * nbWorkers; size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { @@ -1431,17 +1434,6 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) assert(job->consumed == 0); } -/* @returns 1 if there is anything ready to flush */ -static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx) -{ - unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; - int r = 0; - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); - r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize; - ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); - return r; -} - static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) { unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask; @@ -1513,23 +1505,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->nextJobID, jobID); - if (1 || ZSTDMT_anythingToFlush(mtctx)) { - if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { - mtctx->nextJobID++; - mtctx->jobReady = 0; - return 1; - } else { - DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID); - mtctx->jobReady = 1; - return 0; - } - } else { - /* block here, wait for next available job */ - POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]); + if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { mtctx->nextJobID++; mtctx->jobReady = 0; + return 1; } - return 1; + + DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID); + mtctx->jobReady = 1; + return 0; } @@ -1556,17 +1540,20 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size); break; } - DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", - mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + DEBUGLOG(5, "waiting for something to flush from job %u (%u input left)", + mtctx->doneJobID, (unsigned)(mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed)); if (mtctx->jobs[wJobID].flush_mutex == NULL) { mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex; mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond; } - DEBUGLOG(6, "waiting to flush something (%zu left)", mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed); ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + /* note: if a job was completed between POOL_tryAdd() and this waiting condition, + * the signal, which was already issued, will be lost. + * It just reduces an opportunity to start a new job immediately */ ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); /* block waiting for something to flush */ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); - DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed); + DEBUGLOG(6, "flushCond triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed); + break; /* can be triggered with nothing to flush, when a job was just completed */ } } /* try to flush something */