typedef struct {
buffer_t src;
buffer_t dst;
- unsigned compressionLevel;
unsigned jobID;
unsigned lastJobPlusOne;
size_t compressedSize;
typedef struct {
unsigned compressionLevel;
- unsigned numActiveThreads;
unsigned numJobs;
unsigned nextJobID;
unsigned threadError;
mutex_t compressionCompletion_mutex;
mutex_t createCompletion_mutex;
mutex_t writeCompletion_mutex;
+ mutex_t compressionLevel_mutex;
size_t lastDictSize;
inBuff_t input;
jobDescription* jobs;
error |= destroyMutex(&ctx->compressionCompletion_mutex);
error |= destroyMutex(&ctx->createCompletion_mutex);
error |= destroyMutex(&ctx->writeCompletion_mutex);
+ error |= destroyMutex(&ctx->compressionLevel_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
pthreadError |= initMutex(&ctx->createCompletion_mutex);
pthreadError |= initMutex(&ctx->writeCompletion_mutex);
+ pthreadError |= initMutex(&ctx->compressionLevel_mutex);
if (pthreadError) return pthreadError;
}
ctx->numJobs = numJobs;
double compressWaitWriteCompletion;
double writeWaitCompressionCompletion;
double const threshold = 0.00001;
- unsigned const prevCompressionLevel = ctx->compressionLevel;
+ unsigned prevCompressionLevel;
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ prevCompressionLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
if (g_forceCompressionLevel) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel = g_compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
return;
}
- DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel);
+ DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
/* read and reset completion measurements */
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
- assert(g_minCLevel <= ctx->compressionLevel && g_maxCLevel >= ctx->compressionLevel);
+ assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
/* adaptation logic */
if (ctx->cooldown) ctx->cooldown--;
/* use whichever one waited less because it was slower */
double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
unsigned const change = convertCompletionToChange(completion);
- unsigned const boundChange = MIN(change, ctx->compressionLevel - g_minCLevel);
+ unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
/* reset convergence counter, might have been a spike */
ctx->convergenceCounter = 0;
DEBUG(2, "convergence counter reset, no change applied\n");
}
else if (boundChange != 0) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel -= boundChange;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
ctx->convergenceCounter = 1;
/* compress waiting on write */
double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
unsigned const change = convertCompletionToChange(completion);
- unsigned const boundChange = MIN(change, g_maxCLevel - ctx->compressionLevel);
+ unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
/* reset convergence counter, might have been a spike */
ctx->convergenceCounter = 0;
DEBUG(2, "convergence counter reset, no change applied\n");
}
else if (boundChange != 0) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel += boundChange;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
ctx->cooldown = 0;
ctx->convergenceCounter = 1;
}
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
if (ctx->compressionLevel == prevCompressionLevel) {
ctx->convergenceCounter++;
}
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
}
static size_t getUseableDictSize(unsigned compressionLevel)
/* adapt compression level */
if (currJob) adaptCompressionLevel(ctx);
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
/* compress the data */
{
size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
- unsigned const cLevel = ctx->compressionLevel;
+ unsigned cLevel;
unsigned blockNum = 0;
size_t remaining = job->src.size;
size_t srcPos = 0;
size_t dstPos = 0;
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ cLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
/* reset compressed size */
job->compressedSize = 0;
DEBUG(2, "calling ZSTD_compressBegin()\n");
}
}
}
- displayProgress(ctx->compressionLevel, job->lastJobPlusOne == currJob + 1);
+ {
+ unsigned cLevel;
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ cLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+ displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
+ }
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
ctx->jobWriteID++;
pthread_cond_signal(&ctx->jobWrite_cond.pCond);
jobDescription* const job = &ctx->jobs[nextJobIndex];
- job->compressionLevel = ctx->compressionLevel;
job->src.size = srcSize;
job->jobID = nextJob;
if (last) job->lastJobPlusOne = nextJob + 1;