From: Paul Cruz Date: Sun, 23 Jul 2017 21:09:16 +0000 (-0700) Subject: reduced competition for completion mutex by separating mutex use based on which value... X-Git-Tag: v1.3.1^2~13^2^2~25 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=483d936b8760819f8f6b71c2eb0c635165293086;p=thirdparty%2Fzstd.git reduced competition for completion mutex by separating mutex use based on which values is updated --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 248421457..4e5a86c72 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -92,8 +92,9 @@ typedef struct { cond_t allJobsCompleted_cond; mutex_t jobWrite_mutex; cond_t jobWrite_cond; - mutex_t completion_mutex; - mutex_t wait_mutex; + mutex_t compressionCompletion_mutex; + mutex_t createCompletion_mutex; + mutex_t writeCompletion_mutex; size_t lastDictSize; inBuff_t input; jobDescription* jobs; @@ -152,8 +153,9 @@ static int freeCCtx(adaptCCtx* ctx) error |= destroyCond(&ctx->allJobsCompleted_cond); error |= destroyMutex(&ctx->jobWrite_mutex); error |= destroyCond(&ctx->jobWrite_cond); - error |= destroyMutex(&ctx->completion_mutex); - error |= destroyMutex(&ctx->wait_mutex); + error |= destroyMutex(&ctx->compressionCompletion_mutex); + error |= destroyMutex(&ctx->createCompletion_mutex); + error |= destroyMutex(&ctx->writeCompletion_mutex); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); free(ctx->input.buffer.start); if (ctx->jobs){ @@ -192,8 +194,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) pthreadError |= initCond(&ctx->allJobsCompleted_cond); pthreadError |= initMutex(&ctx->jobWrite_mutex); pthreadError |= initCond(&ctx->jobWrite_cond); - pthreadError |= initMutex(&ctx->completion_mutex); - pthreadError |= initMutex(&ctx->wait_mutex); + pthreadError |= initMutex(&ctx->compressionCompletion_mutex); + pthreadError |= initMutex(&ctx->createCompletion_mutex); + pthreadError |= initMutex(&ctx->writeCompletion_mutex); if (pthreadError) return pthreadError; } ctx->numJobs = numJobs; @@ -323,28 +326,32 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel); /* read and reset completion measurements */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion); - DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion); - DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion); DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion); - createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; - compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; - compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; - - DEBUG(2, "resetting adaptive variables\n"); ctx->createWaitCompressionCompletion = 1; - ctx->compressWaitCreateCompletion = 1; - ctx->compressWaitWriteCompletion = 1; ctx->writeWaitCompressionCompletion = 1; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); + DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion); + compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; + ctx->compressWaitWriteCompletion = 1; + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); + + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); + DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion); + compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; + ctx->compressWaitCreateCompletion = 1; + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); /* adaptation logic */ - if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) { - /* both create and write threads waiting on compression */ - /* use writeWaitCompressionCompletion */ + if (1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) { + /* compression waiting on either create or write */ + /* use whichever one waited less because it was slower */ double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); @@ -404,15 +411,21 @@ static void* compressionThread(void* arg) if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1; pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + if (willWaitForCreate || willWaitForWrite) { - ctx->compressWaitCreateCompletion = ctx->createCompletion; - ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(2, "compression will wait for create or write\n"); + + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); + ctx->compressWaitCreateCompletion = ctx->createCompletion; DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); + + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); + ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + } /* wait until job is ready */ @@ -429,9 +442,9 @@ static void* compressionThread(void* arg) } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); /* reset compression completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); ctx->compressionCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); DEBUG(3, "compressionThread(): continuing after job ready\n"); DEBUG(3, "DICTIONARY ENDED\n"); @@ -502,10 +515,10 @@ static void* compressionThread(void* arg) blockNum++; /* update completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); } } while (remaining != 0); job->dst.size = job->compressedSize; @@ -557,20 +570,20 @@ static void* outputThread(void* arg) DEBUG(3, "outputThread(): waiting on job compressed\n"); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); /* write thread is waiting on compression thread */ ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset write completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->writeCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); DEBUG(3, "outputThread(): continuing after job compressed\n"); { @@ -593,10 +606,10 @@ static void* outputThread(void* arg) remaining -= ret; /* update completion variable for writing */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->writeCompletion = 1 - (double)remaining/compressedSize; DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); if (remaining == 0) break; } @@ -642,20 +655,20 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) /* wait until the job has been compressed */ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); /* creation thread is waiting, take measurement of completion */ ctx->createWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset create completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->createCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); DEBUG(3, "createCompressionJob(): continuing after job write\n"); DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize); @@ -735,10 +748,10 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA } pos += ret; remaining -= ret; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); } if (remaining != 0 && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n");