static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
-static unsigned g_displayStats = 0;
static UTIL_time_t g_startTime;
static size_t g_streamedSize = 0;
static unsigned g_useProgressBar = 0;
buffer_t buffer;
} inBuff_t;
-typedef struct {
- unsigned waitCompressed;
- unsigned waitReady;
- unsigned waitWrite;
- unsigned readyCounter;
- unsigned compressedCounter;
- unsigned writeCounter;
-} cStat_t;
-
typedef struct {
buffer_t src;
buffer_t dst;
mutex_t jobWrite_mutex;
cond_t jobWrite_cond;
mutex_t completion_mutex;
- mutex_t stats_mutex;
+ mutex_t wait_mutex;
size_t lastDictSize;
inBuff_t input;
- cStat_t stats;
jobDescription* jobs;
ZSTD_CCtx* cctx;
} adaptCCtx;
error |= destroyMutex(&ctx->jobWrite_mutex);
error |= destroyCond(&ctx->jobWrite_cond);
error |= destroyMutex(&ctx->completion_mutex);
- error |= destroyMutex(&ctx->stats_mutex);
+ error |= destroyMutex(&ctx->wait_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
pthreadError |= initMutex(&ctx->jobWrite_mutex);
pthreadError |= initCond(&ctx->jobWrite_cond);
pthreadError |= initMutex(&ctx->completion_mutex);
- pthreadError |= initMutex(&ctx->stats_mutex);
+ pthreadError |= initMutex(&ctx->wait_mutex);
if (pthreadError) return pthreadError;
}
ctx->numJobs = numJobs;
ctx->jobCompressedID = 0;
ctx->jobWriteID = 0;
ctx->lastDictSize = 0;
+ ctx->createCompletionMeasured = 1;
+ ctx->compressionCompletionMeasured = 1;
+ ctx->writeCompletionMeasured = 1;
+
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
if (!ctx->jobs) {
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
}
-/* this function normalizes counters when compression level is changing */
-static void reduceCounters(adaptCCtx* ctx)
-{
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
- ctx->stats.writeCounter -= min;
- ctx->stats.compressedCounter -= min;
- ctx->stats.readyCounter -= min;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-}
-
/*
* Compression level is changed depending on which part of the compression process is lagging
* Currently, three theads exist for job creation, compression, and file writing respectively.
ctx->compressionLevel = g_compressionLevel;
}
else {
- unsigned reset = 0;
- unsigned allSlow;
- unsigned compressWaiting;
- unsigned writeWaiting;
- unsigned createWaiting;
-
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
- compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
- writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
- createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
- DEBUG(2, "createWaiting %u\n", createWaiting);
- DEBUG(2, "compressWaiting %u\n", compressWaiting);
- DEBUG(2, "writeWaiting %u\n\n", writeWaiting);
+ DEBUG(2, "compression level %u\n", ctx->compressionLevel);
+ /* check if compression is too slow */
+ unsigned createChange;
+ unsigned writeChange;
+ unsigned compressionChange;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+ writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+ compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+ DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
+ DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+ DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
{
- unsigned const writeSlow = (compressWaiting && createWaiting);
- unsigned const compressSlow = (writeWaiting && createWaiting);
- unsigned const createSlow = (compressWaiting && writeWaiting);
- DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
- if (allSlow) {
- reset = 1;
- }
- else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
- DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
- double completion;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured;
- DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- {
- unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
- unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
- DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
- DEBUG(3, "write completion: %f\n", completion);
- ctx->compressionLevel += change;
- reset = 1;
- }
- }
- else if (compressSlow && ctx->compressionLevel > 1) {
- double completion;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- completion = ctx->compressionCompletionMeasured;
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- {
- unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
- unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
- DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
- DEBUG(2, "completion: %f\n", completion);
- ctx->compressionLevel -= change;
- reset = 1;
- }
+ unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
+ DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
+
+ if (compressionFastChange) {
+ DEBUG(2, "compression level too low\n");
+ ctx->compressionLevel += compressionFastChange;
}
- if (reset) {
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- ctx->stats.readyCounter = 0;
- ctx->stats.writeCounter = 0;
- ctx->stats.compressedCounter = 0;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+ else {
+ unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
+ DEBUG(2, "compression level too high\n");
+ ctx->compressionLevel -= compressionSlowChange;
}
}
+
+ /* reset */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->createCompletionMeasured = 1;
+ ctx->compressionCompletionMeasured = 1;
+ ctx->writeCompletionMeasured = 1;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ DEBUG(2, "\n");
}
}
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n");
+
+ /* new job, reset completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->compressionCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- ctx->stats.waitReady++;
- ctx->stats.readyCounter++;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
- reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ /* compression thread is waiting, take measurements of write completion and read completion */
ctx->createCompletionMeasured = ctx->createCompletion;
+ ctx->writeCompletionMeasured = ctx->writeCompletion;
+ DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ /* reset create completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->createCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
DEBUG(3, "compressionThread(): continuing after job ready\n");
DEBUG(3, "DICTIONARY ENDED\n");
DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
/* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
- DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
+ DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
} while (remaining != 0);
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n");
+
+ /* new job, reset completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->writeCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- ctx->stats.waitCompressed++;
- ctx->stats.compressedCounter++;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
- reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ /* write thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
- DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion);
+ DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+ /* reset compression completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->compressionCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
size_t const compressedSize = job->compressedSize;
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
break;
}
+
}
return arg;
}
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
- pthread_mutex_lock(&ctx->stats_mutex.pMutex);
- ctx->stats.waitWrite++;
- ctx->stats.writeCounter++;
- pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
- reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- ctx->writeCompletionMeasured = ctx->writeCompletion;
+ /* creation thread is waiting, take measurement of compression completion */
+ ctx->compressionCompletionMeasured = ctx->compressionCompletion;
+ DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+ /* reset write completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->writeCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n");
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
return 0;
}
-static void printStats(cStat_t stats)
-{
- DISPLAY("========STATISTICS========\n");
- DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
- DISPLAY("# times waited on job compressed: %u\n", stats.waitCompressed);
- DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
-}
-
static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
{
if (!ctx || !srcFile || !otArg) {
return 1;
}
}
+ {
+ unsigned currJob = 0;
+ /* creating jobs */
+ for ( ; ; ) {
+ size_t pos = 0;
+ size_t const readBlockSize = 1 << 15;
+ size_t remaining = FILE_CHUNK_SIZE;
+
+ /* new job reset completion */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->createCompletion = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- /* creating jobs */
- for ( ; ; ) {
- size_t pos = 0;
- size_t const readBlockSize = 1 << 15;
- size_t remaining = FILE_CHUNK_SIZE;
- while (remaining != 0 && !feof(srcFile)) {
- size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
- if (ret != readBlockSize && !feof(srcFile)) {
- /* error could not read correct number of bytes */
+ while (remaining != 0 && !feof(srcFile)) {
+ size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
+ if (ret != readBlockSize && !feof(srcFile)) {
+ /* error could not read correct number of bytes */
+ DISPLAY("Error: problem occurred during read from src file\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ pos += ret;
+ remaining -= ret;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
+ DEBUG(3, "create completion: %f\n", ctx->createCompletion);
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ }
+ if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n");
signalErrorToThreads(ctx);
return 1;
}
- pos += ret;
- remaining -= ret;
- pthread_mutex_lock(&ctx->completion_mutex.pMutex);
- ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
- DEBUG(3, "create completion: %f\n", ctx->createCompletion);
- pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
- }
- if (remaining != 0 && !feof(srcFile)) {
- DISPLAY("Error: problem occurred during read from src file\n");
- signalErrorToThreads(ctx);
- return 1;
- }
- g_streamedSize += pos;
- /* reading was fine, now create the compression job */
- {
- int const last = feof(srcFile);
- int const error = createCompressionJob(ctx, pos, last);
- if (error != 0) {
- signalErrorToThreads(ctx);
- return error;
+ g_streamedSize += pos;
+ /* reading was fine, now create the compression job */
+ {
+ int const last = feof(srcFile);
+ int const error = createCompressionJob(ctx, pos, last);
+ if (error != 0) {
+ signalErrorToThreads(ctx);
+ return error;
+ }
+ }
+ currJob++;
+ if (feof(srcFile)) {
+ DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
+ break;
}
- }
- if (feof(srcFile)) {
- DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
- break;
}
}
-
/* success -- created all jobs */
return 0;
}
{
int ret = 0;
waitUntilAllJobsCompleted(fcr->ctx);
- pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
- if (g_displayStats) printStats(fcr->ctx->stats);
- pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
if (fcr->otArg) {
PRINT(" -oFILE : specify the output file name\n");
PRINT(" -v : display debug information\n");
PRINT(" -i# : provide initial compression level\n");
- PRINT(" -s : display information stats\n");
PRINT(" -h : display help/information\n");
PRINT(" -f : force the compression level to stay constant\n");
}
g_compressionLevel = readU32FromChar(&argument);
DEBUG(3, "g_compressionLevel: %u\n", g_compressionLevel);
break;
- case 's':
- g_displayStats = 1;
- break;
case 'h':
help();
goto _main_exit;