#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
#define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
#define FILE_CHUNK_SIZE 4 << 20
-#define MAX_NUM_JOBS 100;
+#define MAX_NUM_JOBS 2;
#define stdinmark "/*stdin*\\"
#define stdoutmark "/*stdout*\\"
#define MAX_PATH 256
#define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6
+#define DEFAULT_ADAPT_PARAM 2
typedef unsigned char BYTE;
#include <stdio.h> /* fprintf */
typedef struct {
unsigned waitCompleted;
unsigned waitReady;
- unsigned waitWritten;
+ unsigned waitWrite;
+ unsigned readyCounter;
+ unsigned completedCounter;
+ unsigned writeCounter;
} stat_t;
typedef struct {
unsigned threadError;
unsigned jobReadyID;
unsigned jobCompletedID;
- unsigned jobWrittenID;
+ unsigned jobWriteID;
unsigned allJobsCompleted;
+ unsigned adaptParam;
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
pthread_mutex_t jobReady_mutex;
ctx->numJobs = numJobs;
ctx->jobReadyID = 0;
ctx->jobCompletedID = 0;
- ctx->jobWrittenID = 0;
+ ctx->jobWriteID = 0;
ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
ctx->nextJobID = 0;
ctx->threadError = 0;
ctx->allJobsCompleted = 0;
+ ctx->adaptParam = DEFAULT_ADAPT_PARAM;
if (!ctx->jobs) {
DISPLAY("Error: could not allocate space for jobs during context creation\n");
freeCCtx(ctx);
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
}
+static unsigned adaptCompressionLevel(adaptCCtx* ctx)
+{
+ unsigned reset = 0;
+ unsigned const allSlow = ctx->adaptParam < ctx->stats.completedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
+ unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
+ unsigned const writeWaiting = ctx->adaptParam < ctx->stats.completedCounter ? 1 : 0;
+ unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter ? 1 : 0;
+ unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0;
+ unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0;
+ unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0;
+ // unsigned const writeSlow = ((compressWaiting && createWaiting)) ? 1 : 0;
+ // unsigned const compressSlow = ((writeWaiting && createWaiting)) ? 1 : 0;
+ // unsigned const createSlow = ((compressWaiting && writeWaiting)) ? 1 : 0;
+ DEBUGLOG(2, "ready: %u completed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.completedCounter, ctx->stats.writeCounter);
+ if (allSlow) {
+ reset = 1;
+ }
+ else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
+ DEBUGLOG(2, "increasing compression level %u\n", ctx->compressionLevel);
+ ctx->compressionLevel++;
+ reset = 1;
+ }
+ else if (compressSlow && ctx->compressionLevel > 1) {
+ DEBUGLOG(2, "decreasing compression level %u\n", ctx->compressionLevel);
+ ctx->compressionLevel--;
+ reset = 1;
+ }
+ if (reset) {
+ ctx->stats.readyCounter = 0;
+ ctx->stats.writeCounter = 0;
+ ctx->stats.completedCounter = 0;
+ }
+ return ctx->compressionLevel;
+}
+
static void* compressionThread(void* arg)
{
adaptCCtx* ctx = (adaptCCtx*)arg;
pthread_mutex_lock(&ctx->jobReady_mutex);
while(currJob + 1 > ctx->jobReadyID) {
ctx->stats.waitReady++;
+ ctx->stats.readyCounter++;
DEBUGLOG(2, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
}
// DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
/* compress the data */
{
- size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel);
+ unsigned const cLevel = adaptCompressionLevel(ctx);
+ // unsigned const cLevel = job->compressionLevel;
+ DEBUGLOG(2, "cLevel used: %u\n", cLevel);
+ size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, cLevel);
if (ZSTD_isError(compressedSize)) {
ctx->threadError = 1;
DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(compressedSize));
pthread_mutex_lock(&ctx->jobCompleted_mutex);
while (currJob + 1 > ctx->jobCompletedID) {
ctx->stats.waitCompleted++;
+ ctx->stats.completedCounter++;
DEBUGLOG(2, "waiting on job completed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
}
currJob++;
DEBUGLOG(2, "locking job write mutex\n");
pthread_mutex_lock(&ctx->jobWrite_mutex);
- ctx->jobWrittenID++;
+ ctx->jobWriteID++;
pthread_cond_signal(&ctx->jobWrite_cond);
pthread_mutex_unlock(&ctx->jobWrite_mutex);
DEBUGLOG(2, "unlocking job write mutex\n");
jobDescription* job = &ctx->jobs[nextJobIndex];
// DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
pthread_mutex_lock(&ctx->jobWrite_mutex);
- // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWrittenID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWrittenID, ctx->numJobs);
- while (nextJob - ctx->jobWrittenID >= ctx->numJobs) {
- ctx->stats.waitWritten++;
- DEBUGLOG(2, "waiting on job written, nextJob: %u\n", nextJob);
+ // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWriteID, ctx->numJobs);
+ while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
+ ctx->stats.waitWrite++;
+ ctx->stats.writeCounter++;
+ DEBUGLOG(2, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
}
pthread_mutex_unlock(&ctx->jobWrite_mutex);
// DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
+
+
job->compressionLevel = ctx->compressionLevel;
job->src.start = malloc(srcSize);
job->src.size = srcSize;
DISPLAY("========STATISTICS========\n");
DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
DISPLAY("# times waited on job completed: %u\n", stats.waitCompleted);
- DISPLAY("# times waited on job written: %u\n\n", stats.waitWritten);
+ DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
}
static int compressFilename(const char* const srcFilename, const char* const dstFilename)