From ab5a78547e62c9a80936e998f4db94fe43e6d232 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Wed, 26 Jul 2017 16:40:05 -0700 Subject: [PATCH] fix leaky abstraction regarding measuring completion --- contrib/adaptive-compression/adapt.c | 57 ++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 05da8b7f7..a16ab40cc 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -367,20 +367,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; - ctx->createWaitCompressionCompletion = 1; - ctx->writeWaitCompressionCompletion = 1; pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion); compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; - ctx->compressWaitWriteCompletion = 1; pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; - ctx->compressWaitCreateCompletion = 1; pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); @@ -462,22 +458,28 @@ static void* compressionThread(void* arg) pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); + pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); if (willWaitForCreate) { DEBUG(2, "compression will wait for create on job %u\n", currJob); - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->compressWaitCreateCompletion = ctx->createCompletion; DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); } + else { + ctx->compressWaitCreateCompletion = 1; + } + pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); + pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); if (willWaitForWrite) { DEBUG(2, "compression will wait for write on job %u\n", currJob); - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } + else { + ctx->compressWaitWriteCompletion = 1; + } + pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); } @@ -610,14 +612,27 @@ static void* outputThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* const job = &ctx->jobs[currJobIndex]; + unsigned willWaitForCompress = 0; DEBUG(2, "starting write for job %u\n", currJob); + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1; + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + + + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (willWaitForCompress) { /* write thread is waiting on compression thread */ ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + } + else { + ctx->writeWaitCompressionCompletion = 1; + } + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); @@ -751,17 +766,27 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA size_t const readBlockSize = 1 << 15; size_t remaining = FILE_CHUNK_SIZE; unsigned const nextJob = ctx->nextJobID; + unsigned willWaitForCompress = 0; DEBUG(2, "starting creation of job %u\n", currJob); - - /* wait until the job has been compressed */ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1; + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + if (willWaitForCompress) { /* creation thread is waiting, take measurement of completion */ ctx->createWaitCompressionCompletion = ctx->compressionCompletion; DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + } + else { + ctx->createWaitCompressionCompletion = 1; + } + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + + /* wait until the job has been compressed */ + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); -- 2.47.2