/* this function normalizes counters when compression level is changing */
static void reduceCounters(adaptCCtx* ctx)
{
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
ctx->stats.writeCounter -= min;
ctx->stats.compressedCounter -= min;
ctx->stats.readyCounter -= min;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
}
/*
}
else {
unsigned reset = 0;
- unsigned const allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
- unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
- unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
- unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
- unsigned const writeSlow = (compressWaiting && createWaiting);
- unsigned const compressSlow = (writeWaiting && createWaiting);
- unsigned const createSlow = (compressWaiting && writeWaiting);
- DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
- DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
- if (allSlow) {
- reset = 1;
- }
- else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
- DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
- double completion;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- completion = ctx->writeCompletion;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- {
- unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
- unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
- DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
- DEBUG(2, "write completion: %f\n", completion);
- ctx->compressionLevel += change;
+ unsigned allSlow;
+ unsigned compressWaiting;
+ unsigned writeWaiting;
+ unsigned createWaiting;
+
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
+ allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
+ compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
+ writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
+ createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+ {
+ unsigned const writeSlow = (compressWaiting && createWaiting);
+ unsigned const compressSlow = (writeWaiting && createWaiting);
+ unsigned const createSlow = (compressWaiting && writeWaiting);
+ DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
+ if (allSlow) {
reset = 1;
}
- }
- else if (compressSlow && ctx->compressionLevel > 1) {
- double completion;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- completion = ctx->compressionCompletion;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- {
- unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
- unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
- DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
- DEBUG(3, "completion: %f\n", completion);
- ctx->compressionLevel -= change;
- reset = 1;
+ else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
+ DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
+ double completion;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ completion = ctx->writeCompletion;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ {
+ unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
+ unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
+ DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
+ DEBUG(2, "write completion: %f\n", completion);
+ ctx->compressionLevel += change;
+ reset = 1;
+ }
+ }
+ else if (compressSlow && ctx->compressionLevel > 1) {
+ double completion;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ completion = ctx->compressionCompletion;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ {
+ unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
+ unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
+ DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
+ DEBUG(3, "completion: %f\n", completion);
+ ctx->compressionLevel -= change;
+ reset = 1;
+ }
+ }
+ if (reset) {
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
+ ctx->stats.readyCounter = 0;
+ ctx->stats.writeCounter = 0;
+ ctx->stats.compressedCounter = 0;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->compressionCompletion = 1;
+ ctx->compressionCompletionMeasured = 0;
+ ctx->writeCompletion = 1;
+ ctx->writeCompletionMeasured = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
- }
- if (reset) {
- ctx->stats.readyCounter = 0;
- ctx->stats.writeCounter = 0;
- ctx->stats.compressedCounter = 0;
-
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- ctx->compressionCompletion = 1;
- ctx->compressionCompletionMeasured = 0;
- ctx->writeCompletion = 1;
- ctx->writeCompletionMeasured = 0;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
}
}
DEBUG(3, "compressionThread(): waiting on job ready\n");
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitReady++;
ctx->stats.readyCounter++;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
adaptCompressionLevel(ctx);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
DEBUG(3, "outputThread(): waiting on job compressed\n");
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->compressionCompletionMeasured) {
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+ pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
+ pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletionMeasured = 1;
{
int ret = 0;
waitUntilAllJobsCompleted(fcr->ctx);
+ pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
if (g_displayStats) printStats(fcr->ctx->stats);
+ pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
if (fcr->otArg) {