cond_t allJobsCompleted_cond;
mutex_t jobWrite_mutex;
cond_t jobWrite_cond;
- mutex_t completion_mutex;
- mutex_t wait_mutex;
+ mutex_t compressionCompletion_mutex;
+ mutex_t createCompletion_mutex;
+ mutex_t writeCompletion_mutex;
size_t lastDictSize;
inBuff_t input;
jobDescription* jobs;
error |= destroyCond(&ctx->allJobsCompleted_cond);
error |= destroyMutex(&ctx->jobWrite_mutex);
error |= destroyCond(&ctx->jobWrite_cond);
- error |= destroyMutex(&ctx->completion_mutex);
- error |= destroyMutex(&ctx->wait_mutex);
+ error |= destroyMutex(&ctx->compressionCompletion_mutex);
+ error |= destroyMutex(&ctx->createCompletion_mutex);
+ error |= destroyMutex(&ctx->writeCompletion_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
pthreadError |= initCond(&ctx->allJobsCompleted_cond);
pthreadError |= initMutex(&ctx->jobWrite_mutex);
pthreadError |= initCond(&ctx->jobWrite_cond);
- pthreadError |= initMutex(&ctx->completion_mutex);
- pthreadError |= initMutex(&ctx->wait_mutex);
+ pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
+ pthreadError |= initMutex(&ctx->createCompletion_mutex);
+ pthreadError |= initMutex(&ctx->writeCompletion_mutex);
if (pthreadError) return pthreadError;
}
ctx->numJobs = numJobs;
DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel);
/* read and reset completion measurements */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
- DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
- DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
-
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
- compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
- compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
-
- DEBUG(2, "resetting adaptive variables\n");
ctx->createWaitCompressionCompletion = 1;
- ctx->compressWaitCreateCompletion = 1;
- ctx->compressWaitWriteCompletion = 1;
ctx->writeWaitCompressionCompletion = 1;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
+ compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
+ ctx->compressWaitWriteCompletion = 1;
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
+ compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
+ ctx->compressWaitCreateCompletion = 1;
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
/* adaptation logic */
- if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) {
- /* both create and write threads waiting on compression */
- /* use writeWaitCompressionCompletion */
+ if (1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) {
+ /* compression waiting on either create or write */
+ /* use whichever one waited less because it was slower */
double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+
if (willWaitForCreate || willWaitForWrite) {
- ctx->compressWaitCreateCompletion = ctx->createCompletion;
- ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "compression will wait for create or write\n");
+
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ ctx->compressWaitCreateCompletion = ctx->createCompletion;
DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
}
/* wait until job is ready */
}
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset compression completion */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
ctx->compressionCompletion = 0;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
DEBUG(3, "compressionThread(): continuing after job ready\n");
DEBUG(3, "DICTIONARY ENDED\n");
blockNum++;
/* update completion */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
}
} while (remaining != 0);
job->dst.size = job->compressedSize;
DEBUG(3, "outputThread(): waiting on job compressed\n");
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
/* write thread is waiting on compression thread */
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset write completion */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->writeCompletion = 0;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
remaining -= ret;
/* update completion variable for writing */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->writeCompletion = 1 - (double)remaining/compressedSize;
DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
if (remaining == 0) break;
}
/* wait until the job has been compressed */
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
/* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset create completion */
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->createCompletion = 0;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n");
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
}
pos += ret;
remaining -= ret;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
}
if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n");