From: Paul Cruz Date: Wed, 26 Jul 2017 23:40:05 +0000 (-0700) Subject: fix leaky abstraction regarding measuring completion X-Git-Tag: v1.3.1^2~13^2^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ab5a78547e62c9a80936e998f4db94fe43e6d232;p=thirdparty%2Fzstd.git fix leaky abstraction regarding measuring completion --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 05da8b7f7..a16ab40cc 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -367,20 +367,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; - ctx->createWaitCompressionCompletion = 1; - ctx->writeWaitCompressionCompletion = 1; pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion); compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; - ctx->compressWaitWriteCompletion = 1; pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; - ctx->compressWaitCreateCompletion = 1; pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); @@ -462,22 +458,28 @@ static void* compressionThread(void* arg) pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); if (willWaitForCreate) { DEBUG(2, "compression will wait for create on job %u\n", currJob); - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->compressWaitCreateCompletion = ctx->createCompletion; DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); } + else { + ctx->compressWaitCreateCompletion = 1; + } + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); if (willWaitForWrite) { DEBUG(2, "compression will wait for write on job %u\n", currJob); - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } + else { + ctx->compressWaitWriteCompletion = 1; + } + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } @@ -610,14 +612,27 @@ static void* outputThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* const job = &ctx->jobs[currJobIndex]; + unsigned willWaitForCompress = 0; DEBUG(2, "starting write for job %u\n", currJob); + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1; + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + + + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (willWaitForCompress) { /* write thread is waiting on compression thread */ ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + } + else { + ctx->writeWaitCompressionCompletion = 1; + } + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); @@ -751,17 +766,27 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA size_t const readBlockSize = 1 << 15; size_t remaining = FILE_CHUNK_SIZE; unsigned const nextJob = ctx->nextJobID; + unsigned willWaitForCompress = 0; DEBUG(2, "starting creation of job %u\n", currJob); - - /* wait until the job has been compressed */ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1; + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (willWaitForCompress) { /* creation thread is waiting, take measurement of completion */ ctx->createWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + } + else { + ctx->createWaitCompressionCompletion = 1; + } + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + + /* wait until the job has been compressed */ + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);