From: Paul Cruz Date: Sat, 22 Jul 2017 00:49:39 +0000 (-0700) Subject: switched over to model where reading only waits on compression thread X-Git-Tag: v1.3.1^2~13^2^2~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=95bef759b3f1fc52bbcf5a82c1dddaf00906df8f;p=thirdparty%2Fzstd.git switched over to model where reading only waits on compression thread --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index e07333fc3..9057bfcfc 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -51,7 +51,7 @@ typedef struct { buffer_t dst; unsigned compressionLevel; unsigned jobID; - unsigned lastJob; + unsigned lastJobPlusOne; size_t compressedSize; size_t dictSize; } jobDescription; @@ -226,7 +226,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) jobDescription* job = &ctx->jobs[jobNum]; job->src.start = malloc(2 * FILE_CHUNK_SIZE); job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); - job->lastJob = 0; + job->lastJobPlusOne = 0; if (!job->src.start || !job->dst.start) { DISPLAY("Could not allocate buffers for jobs\n"); return 1; @@ -400,22 +400,30 @@ static void* compressionThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "compressionThread(): waiting on job ready\n"); - - + /* wait until job is ready */ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { + while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); - /* compression thread is waiting, take measurements of write completion and read completion */ + /* compression thread is waiting on creation thread, take measurement */ ctx->compressWaitCreateCompletion = ctx->createCompletion; - ctx->compressWaitWriteCompletion = ctx->writeCompletion; - DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); + DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); DEBUG(3, "create completion: %f\n", ctx->createCompletion); - DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); + DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + /* wait until job previously in this space is written */ + pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); + while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->compressWaitWriteCompletion = ctx->writeCompletion; + DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion); + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); + } + pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); /* reset compression completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 0; @@ -439,10 +447,8 @@ static void* compressionThread(void* arg) size_t dstPos = 0; DEBUG(3, "cLevel used: %u\n", cLevel); DEBUG(3, "compression level used: %u\n", cLevel); - /* reset compressed size */ job->compressedSize = 0; - /* begin compression */ { size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize); @@ -474,10 +480,10 @@ static void* compressionThread(void* arg) ZSTD_invalidateRepCodes(ctx->cctx); } { - DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize)); - DEBUG(3, "lastJob %u\n", job->lastJob); + DEBUG(3, "write out ending: %d\n", (job->lastJobPlusOne == currJob + 1) && (remaining == actualBlockSize)); + DEBUG(3, "lastJobPlusOne %u\n", job->lastJobPlusOne); DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize); - size_t const ret = (job->lastJob && remaining == actualBlockSize) ? + size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ? ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) : ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize); if (ZSTD_isError(ret)) { @@ -503,15 +509,16 @@ static void* compressionThread(void* arg) pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); ctx->jobCompressedID++; DEBUG(3, "signaling for job %u\n", currJob); - pthread_cond_signal(&ctx->jobCompressed_cond.pCond); + pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); DEBUG(3, "finished job compression %u\n", currJob); - currJob++; - if (job->lastJob || ctx->threadError) { + if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { /* finished compressing all jobs */ DEBUG(3, "all jobs finished compressing\n"); break; } + + currJob++; } return arg; } @@ -544,7 +551,6 @@ static void* outputThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "outputThread(): waiting on job compressed\n"); - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); @@ -599,8 +605,7 @@ static void* outputThread(void* arg) } } DEBUG(3, "finished job write %u\n", currJob); - currJob++; - displayProgress(currJob, ctx->compressionLevel, job->lastJob); + displayProgress(currJob, ctx->compressionLevel, job->lastJobPlusOne == currJob + 1); DEBUG(3, "locking job write mutex\n"); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); ctx->jobWriteID++; @@ -608,7 +613,7 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); DEBUG(3, "unlocking job write mutex\n"); - if (job->lastJob || ctx->threadError) { + if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { /* finished with all jobs */ DEBUG(3, "all jobs finished writing\n"); pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); @@ -617,6 +622,7 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); break; } + currJob++; } return arg; @@ -628,21 +634,22 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) unsigned const nextJobIndex = nextJob % ctx->numJobs; jobDescription* job = &ctx->jobs[nextJobIndex]; DEBUG(3, "createCompressionJob(): wait for job write\n"); - pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); - DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); - while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { + + + /* wait until the job has been compressed */ + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); - /* creation thread is waiting, take measurement of compression completion */ + /* creation thread is waiting, take measurement of completion */ ctx->createWaitCompressionCompletion = ctx->compressionCompletion; ctx->createWaitWriteCompletion = ctx->writeCompletion; DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); + pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } - pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset create completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->createCompletion = 0; @@ -653,7 +660,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) job->compressionLevel = ctx->compressionLevel; job->src.size = srcSize; job->jobID = nextJob; - job->lastJob = last; + if (last) job->lastJobPlusOne = nextJob + 1; { /* swap buffer */ void* const copy = job->src.start;