#define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6
#define DEFAULT_ADAPT_PARAM 1
+#define MAX_COMPRESSION_LEVEL_CHANGE 10
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
size_t dictSize;
} jobDescription;
+typedef struct {
+ pthread_mutex_t pMutex;
+ int noError;
+} mutex_t;
+
+typedef struct {
+ pthread_cond_t pCond;
+ int noError;
+} cond_t;
+
typedef struct {
unsigned compressionLevel;
unsigned numActiveThreads;
unsigned jobWriteID;
unsigned allJobsCompleted;
unsigned adaptParam;
- pthread_mutex_t jobCompressed_mutex;
- pthread_cond_t jobCompressed_cond;
- pthread_mutex_t jobReady_mutex;
- pthread_cond_t jobReady_cond;
- pthread_mutex_t allJobsCompleted_mutex;
- pthread_cond_t allJobsCompleted_cond;
- pthread_mutex_t jobWrite_mutex;
- pthread_cond_t jobWrite_cond;
+ unsigned completionMeasured;
+ double completion;
+ mutex_t jobCompressed_mutex;
+ cond_t jobCompressed_cond;
+ mutex_t jobReady_mutex;
+ cond_t jobReady_cond;
+ mutex_t allJobsCompleted_mutex;
+ cond_t allJobsCompleted_cond;
+ mutex_t jobWrite_mutex;
+ cond_t jobWrite_cond;
size_t lastDictSize;
inBuff_t input;
cStat_t stats;
}
}
+static int destroyMutex(mutex_t* mutex)
+{
+ if (mutex->noError) {
+ int const ret = pthread_mutex_destroy(&mutex->pMutex);
+ return ret;
+ }
+ return 0;
+}
+
+static int destroyCond(cond_t* cond)
+{
+ if (cond->noError) {
+ int const ret = pthread_cond_destroy(&cond->pCond);
+ return ret;
+ }
+ return 0;
+}
+
static int freeCCtx(adaptCCtx* ctx)
{
if (!ctx) return 0;
{
int error = 0;
- error |= pthread_mutex_destroy(&ctx->jobCompressed_mutex);
- error |= pthread_cond_destroy(&ctx->jobCompressed_cond);
- error |= pthread_mutex_destroy(&ctx->jobReady_mutex);
- error |= pthread_cond_destroy(&ctx->jobReady_cond);
- error |= pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
- error |= pthread_cond_destroy(&ctx->allJobsCompleted_cond);
- error |= pthread_mutex_destroy(&ctx->jobWrite_mutex);
- error |= pthread_cond_destroy(&ctx->jobWrite_cond);
- error |= (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0;
+ error |= destroyMutex(&ctx->jobCompressed_mutex);
+ error |= destroyCond(&ctx->jobCompressed_cond);
+ error |= destroyMutex(&ctx->jobReady_mutex);
+ error |= destroyCond(&ctx->jobReady_cond);
+ error |= destroyMutex(&ctx->allJobsCompleted_mutex);
+ error |= destroyCond(&ctx->allJobsCompleted_cond);
+ error |= destroyMutex(&ctx->jobWrite_mutex);
+ error |= destroyCond(&ctx->jobWrite_cond);
+ error |= (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0;
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
}
}
+static int initMutex(mutex_t* mutex)
+{
+ int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
+ mutex->noError = !ret;
+ return ret;
+}
+
+static int initCond(cond_t* cond)
+{
+ int const ret = pthread_cond_init(&cond->pCond, NULL);
+ cond->noError = !ret;
+ return ret;
+}
+
static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
{
- adaptCCtx* ctx = calloc(1, sizeof(adaptCCtx));
+ adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
if (ctx == NULL) {
DISPLAY("Error: could not allocate space for context\n");
return NULL;
}
ctx->compressionLevel = g_compressionLevel;
- pthread_mutex_init(&ctx->jobCompressed_mutex, NULL);
- pthread_cond_init(&ctx->jobCompressed_cond, NULL);
- pthread_mutex_init(&ctx->jobReady_mutex, NULL);
- pthread_cond_init(&ctx->jobReady_cond, NULL);
- pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
- pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
- pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
- pthread_cond_init(&ctx->jobWrite_cond, NULL);
+ {
+ int pthreadError = 0;
+ pthreadError |= initMutex(&ctx->jobCompressed_mutex);
+ pthreadError |= initCond(&ctx->jobCompressed_cond);
+ pthreadError |= initMutex(&ctx->jobReady_mutex);
+ pthreadError |= initCond(&ctx->jobReady_cond);
+ pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
+ pthreadError |= initCond(&ctx->allJobsCompleted_cond);
+ pthreadError |= initMutex(&ctx->jobWrite_mutex);
+ pthreadError |= initCond(&ctx->jobWrite_cond);
+ if (pthreadError) return NULL;
+ }
ctx->numJobs = numJobs;
ctx->jobReadyID = 0;
ctx->jobCompressedID = 0;
static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
{
if (!ctx) return;
- pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
while (ctx->allJobsCompleted == 0) {
- pthread_cond_wait(&ctx->allJobsCompleted_cond, &ctx->allJobsCompleted_mutex);
+ pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
+ pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
}
/*
reset = 1;
}
else if (compressSlow && ctx->compressionLevel > 1) {
+ double const completion = ctx->completion;
+ unsigned const maxChange = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
+ unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
- ctx->compressionLevel--;
+ DEBUG(2, "completion: %f\n", completion);
+ ctx->compressionLevel -= change;
reset = 1;
}
if (reset) {
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
+ ctx->completion = 1;
+ ctx->completionMeasured = 0;
}
return ctx->compressionLevel;
}
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n");
- pthread_mutex_lock(&ctx->jobReady_mutex);
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID) {
ctx->stats.waitReady++;
ctx->stats.readyCounter++;
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
- pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
+ pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->jobReady_mutex);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
DEBUG(3, "compressionThread(): continuing after job ready\n");
DEBUG(3, "DICTIONARY ENDED\n");
DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
DEBUG(3, "compression level used: %u\n", cLevel);
/* begin compression */
{
- size_t useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
+ size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
DEBUG(2, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
}
job->dst.size = job->compressedSize;
}
- pthread_mutex_lock(&ctx->jobCompressed_mutex);
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
ctx->jobCompressedID++;
DEBUG(3, "signaling for job %u\n", currJob);
- pthread_cond_signal(&ctx->jobCompressed_cond);
- pthread_mutex_unlock(&ctx->jobCompressed_mutex);
+ pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
DEBUG(3, "finished job compression %u\n", currJob);
currJob++;
if (job->lastJob || ctx->threadError) {
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n");
- pthread_mutex_lock(&ctx->jobCompressed_mutex);
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID) {
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
+ if (!ctx->completionMeasured) {
+ ctx->completion = ZSTD_getCompletion(ctx->cctx);
+ ctx->completionMeasured = 1;
+ }
+ DEBUG(2, "output detected completion: %f\n", ctx->completion);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
- pthread_cond_wait(&ctx->jobCompressed_cond, &ctx->jobCompressed_mutex);
+ pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->jobCompressed_mutex);
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
size_t const compressedSize = job->compressedSize;
currJob++;
displayProgress(currJob, ctx->compressionLevel, job->lastJob);
DEBUG(3, "locking job write mutex\n");
- pthread_mutex_lock(&ctx->jobWrite_mutex);
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
ctx->jobWriteID++;
- pthread_cond_signal(&ctx->jobWrite_cond);
- pthread_mutex_unlock(&ctx->jobWrite_mutex);
+ pthread_cond_signal(&ctx->jobWrite_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "unlocking job write mutex\n");
if (job->lastJob || ctx->threadError) {
/* finished with all jobs */
DEBUG(3, "all jobs finished writing\n");
- pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
ctx->allJobsCompleted = 1;
- pthread_cond_signal(&ctx->allJobsCompleted_cond);
- pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
+ pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
+ pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
break;
}
}
unsigned const nextJobIndex = nextJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[nextJobIndex];
DEBUG(3, "createCompressionJob(): wait for job write\n");
- pthread_mutex_lock(&ctx->jobWrite_mutex);
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
+ if (!ctx->completionMeasured) {
+ ctx->completion = ZSTD_getCompletion(ctx->cctx);
+ ctx->completionMeasured = 1;
+ }
+ DEBUG(2, "job creation detected completion %f\n", ctx->completion);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
- pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
+ pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}
- pthread_mutex_unlock(&ctx->jobWrite_mutex);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n");
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
job->lastJob = last;
memcpy(job->src.start, ctx->input.buffer.start, ctx->lastDictSize + srcSize);
job->dictSize = ctx->lastDictSize;
- pthread_mutex_lock(&ctx->jobReady_mutex);
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
ctx->jobReadyID++;
- pthread_cond_signal(&ctx->jobReady_cond);
- pthread_mutex_unlock(&ctx->jobReady_mutex);
+ pthread_cond_signal(&ctx->jobReady_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
DEBUG(3, "finished job creation %u\n", nextJob);
ctx->nextJobID++;
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);