buffer_t dst;
unsigned compressionLevel;
unsigned jobID;
- unsigned lastJob;
+ unsigned lastJobPlusOne;
size_t compressedSize;
size_t dictSize;
} jobDescription;
jobDescription* job = &ctx->jobs[jobNum];
job->src.start = malloc(2 * FILE_CHUNK_SIZE);
job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
- job->lastJob = 0;
+ job->lastJobPlusOne = 0;
if (!job->src.start || !job->dst.start) {
DISPLAY("Could not allocate buffers for jobs\n");
return 1;
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n");
-
-
+ /* wait until job is ready */
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
- while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
+ while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- /* compression thread is waiting, take measurements of write completion and read completion */
+ /* compression thread is waiting on creation thread, take measurement */
ctx->compressWaitCreateCompletion = ctx->createCompletion;
- ctx->compressWaitWriteCompletion = ctx->writeCompletion;
- DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
+ DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
- DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
+ DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+ /* wait until job previously in this space is written */
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+ while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->compressWaitWriteCompletion = ctx->writeCompletion;
+ DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset compression completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
size_t dstPos = 0;
DEBUG(3, "cLevel used: %u\n", cLevel);
DEBUG(3, "compression level used: %u\n", cLevel);
-
/* reset compressed size */
job->compressedSize = 0;
-
/* begin compression */
{
size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
ZSTD_invalidateRepCodes(ctx->cctx);
}
{
- DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
- DEBUG(3, "lastJob %u\n", job->lastJob);
+ DEBUG(3, "write out ending: %d\n", (job->lastJobPlusOne == currJob + 1) && (remaining == actualBlockSize));
+ DEBUG(3, "lastJobPlusOne %u\n", job->lastJobPlusOne);
DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize);
- size_t const ret = (job->lastJob && remaining == actualBlockSize) ?
+ size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
if (ZSTD_isError(ret)) {
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
ctx->jobCompressedID++;
DEBUG(3, "signaling for job %u\n", currJob);
- pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
+ pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
DEBUG(3, "finished job compression %u\n", currJob);
- currJob++;
- if (job->lastJob || ctx->threadError) {
+ if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
/* finished compressing all jobs */
DEBUG(3, "all jobs finished compressing\n");
break;
}
+
+ currJob++;
}
return arg;
}
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n");
-
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
}
}
DEBUG(3, "finished job write %u\n", currJob);
- currJob++;
- displayProgress(currJob, ctx->compressionLevel, job->lastJob);
+ displayProgress(currJob, ctx->compressionLevel, job->lastJobPlusOne == currJob + 1);
DEBUG(3, "locking job write mutex\n");
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
ctx->jobWriteID++;
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "unlocking job write mutex\n");
- if (job->lastJob || ctx->threadError) {
+ if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
/* finished with all jobs */
DEBUG(3, "all jobs finished writing\n");
pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
break;
}
+ currJob++;
}
return arg;
unsigned const nextJobIndex = nextJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[nextJobIndex];
DEBUG(3, "createCompressionJob(): wait for job write\n");
- pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
- DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
- while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+
+
+ /* wait until the job has been compressed */
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- /* creation thread is waiting, take measurement of compression completion */
+ /* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
ctx->createWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
+ pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
-
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset create completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0;
job->compressionLevel = ctx->compressionLevel;
job->src.size = srcSize;
job->jobID = nextJob;
- job->lastJob = last;
+ if (last) job->lastJobPlusOne = nextJob + 1;
{
/* swap buffer */
void* const copy = job->src.start;