*/
static void adaptCompressionLevel(adaptCCtx* ctx)
{
- /* check if compression is too slow */
- unsigned createChange;
- unsigned writeChange;
- unsigned compressionChange;
+ double createCompletion, compressionCompletion, writeCompletion;
+ double const threshold = 0.00001;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
- writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
- compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
- DEBUG(2, "compression level %u\n", ctx->compressionLevel);
- DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
- DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
- DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
+ createCompletion = ctx->createCompletionMeasured;
+ compressionCompletion = ctx->compressionCompletionMeasured;
+ writeCompletion = ctx->writeCompletionMeasured;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- {
- unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
-
- DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
-
- if (compressionFastChange) {
- DEBUG(2, "compression level too low\n");
- ctx->compressionLevel += compressionFastChange;
- }
- else {
- unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
- DEBUG(2, "compression level too high\n");
- ctx->compressionLevel -= compressionSlowChange;
- }
+ DEBUG(2, "create completion: %f\n", createCompletion);
+ DEBUG(2, "compression completion: %f\n", compressionCompletion);
+ DEBUG(2, "write completion: %f\n", writeCompletion);
+ /* adapt compression based on bottleneck */
+ if (1 - createCompletion > threshold) {
+ /* job creation was not finished, compression thread waited */
+ unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+ DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
+ ctx->compressionLevel += change;
+ }
+ else if (1 - writeCompletion > threshold) {
+ /* write thread was not finished, compression thread waited */
+ unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+ DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
+ ctx->compressionLevel += change;
+ }
+ else if (1 - compressionCompletion > threshold) {
+ /* compression thread was not finished, one of the other two threads waited */
+ unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+ DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change);
+ ctx->compressionLevel -= change;
}
-
/* reset */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletionMeasured = 1;
ctx->compressionCompletionMeasured = 1;
ctx->writeCompletionMeasured = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- DEBUG(2, "\n");
if (g_forceCompressionLevel) {
ctx->compressionLevel = g_compressionLevel;
/* compression thread is waiting, take measurements of write completion and read completion */
ctx->createCompletionMeasured = ctx->createCompletion;
ctx->writeCompletionMeasured = ctx->writeCompletion;
- DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
+ DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* write thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
- DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+ DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* creation thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
- DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
+ DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);