From: Paul Cruz Date: Wed, 5 Jul 2017 18:52:55 +0000 (-0700) Subject: removed calculation of file size and replaced with limited number of available jobs X-Git-Tag: v1.3.1^2~13^2^2~121 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=a2680e5b9605941b2b9d250f3a7915fa7f48d5b8;p=thirdparty%2Fzstd.git removed calculation of file size and replaced with limited number of available jobs --- diff --git a/contrib/adaptive-compression/multi.c b/contrib/adaptive-compression/multi.c index b89d0c239..a60f48ece 100644 --- a/contrib/adaptive-compression/multi.c +++ b/contrib/adaptive-compression/multi.c @@ -1,5 +1,6 @@ #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define FILE_CHUNK_SIZE 4 << 20 +#define MAX_NUM_JOBS 30; typedef unsigned char BYTE; #include /* fprintf */ @@ -22,10 +23,13 @@ typedef struct { unsigned jobID; unsigned jobCompleted; unsigned jobReady; + unsigned jobWritten; pthread_mutex_t* jobCompleted_mutex; pthread_cond_t* jobCompleted_cond; pthread_mutex_t* jobReady_mutex; pthread_cond_t* jobReady_cond; + pthread_mutex_t* jobWrite_mutex; + pthread_cond_t* jobWrite_cond; size_t compressedSize; } jobDescription; @@ -43,6 +47,8 @@ typedef struct { pthread_cond_t jobReady_cond; pthread_mutex_t allJobsCompleted_mutex; pthread_cond_t allJobsCompleted_cond; + pthread_mutex_t jobWrite_mutex; + pthread_cond_t jobWrite_cond; jobDescription* jobs; FILE* dstFile; } adaptCCtx; @@ -66,12 +72,14 @@ static int freeCCtx(adaptCCtx* ctx) int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond); int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex); int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond); + int const jobWriteMutexError = pthread_mutex_destroy(&ctx->jobWrite_mutex); + int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond); int const fileCloseError = ctx->dstFile != NULL ? fclose(ctx->dstFile) : 0; if (ctx->jobs){ freeCompressionJobs(ctx); free(ctx->jobs); } - return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError; + return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError; } } @@ -91,6 +99,8 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) pthread_cond_init(&ctx->jobReady_cond, NULL); pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL); pthread_cond_init(&ctx->allJobsCompleted_cond, NULL); + pthread_mutex_init(&ctx->jobWrite_mutex, NULL); + pthread_cond_init(&ctx->jobWrite_cond, NULL); ctx->numJobs = numJobs; ctx->lastJobID = -1; /* intentional underflow */ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); @@ -101,6 +111,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond; ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex; ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond; + ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex; + ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond; + ctx->jobs[u].jobWritten = 1; } } ctx->nextJobID = 0; @@ -139,7 +152,8 @@ static void* compressionThread(void* arg) adaptCCtx* ctx = (adaptCCtx*)arg; unsigned currJob = 0; for ( ; ; ) { - jobDescription* job = &ctx->jobs[currJob]; + unsigned const currJobIndex = currJob % ctx->numJobs; + jobDescription* job = &ctx->jobs[currJobIndex]; pthread_mutex_lock(job->jobReady_mutex); while(job->jobReady == 0) { pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex); @@ -174,7 +188,8 @@ static void* outputThread(void* arg) unsigned currJob = 0; for ( ; ; ) { - jobDescription* job = &ctx->jobs[currJob]; + unsigned const currJobIndex = currJob % ctx->numJobs; + jobDescription* job = &ctx->jobs[currJobIndex]; pthread_mutex_lock(job->jobCompleted_mutex); while (job->jobCompleted == 0) { pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex); @@ -187,7 +202,7 @@ static void* outputThread(void* arg) return arg; } { - size_t const writeSize = fwrite(ctx->jobs[currJob].dst.start, 1, compressedSize, ctx->dstFile); + size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, ctx->dstFile); if (writeSize != compressedSize) { DISPLAY("Error: an error occurred during file write operation\n"); return arg; @@ -195,6 +210,10 @@ static void* outputThread(void* arg) } } currJob++; + pthread_mutex_lock(job->jobWrite_mutex); + job->jobWritten = 1; + pthread_cond_signal(job->jobWrite_cond); + pthread_mutex_unlock(job->jobWrite_mutex); if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished with all jobs */ pthread_mutex_lock(&ctx->allJobsCompleted_mutex); @@ -232,13 +251,20 @@ static size_t getFileSize(const char* const filename) static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) { unsigned const nextJob = ctx->nextJobID; - jobDescription* job = &ctx->jobs[nextJob]; + unsigned const nextJobIndex = nextJob % ctx->numJobs; + jobDescription* job = &ctx->jobs[nextJobIndex]; + pthread_mutex_lock(job->jobWrite_mutex); + while (job->jobWritten == 0) { + pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex); + } + pthread_mutex_unlock(job->jobWrite_mutex); job->compressionLevel = ctx->compressionLevel; job->src.start = malloc(srcSize); job->src.size = srcSize; job->dst.size = ZSTD_compressBound(srcSize); job->dst.start = malloc(job->dst.size); job->jobCompleted = 0; + job->jobWritten = 0; job->jobCompleted_cond = &ctx->jobCompleted_cond; job->jobCompleted_mutex = &ctx->jobCompleted_mutex; job->jobReady_cond = &ctx->jobReady_cond; @@ -265,8 +291,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst BYTE* const src = malloc(FILE_CHUNK_SIZE); FILE* const srcFile = fopen(srcFilename, "rb"); size_t fileSize = getFileSize(srcFilename); - size_t const numJobsPrelim = (fileSize / ((size_t)FILE_CHUNK_SIZE)); - size_t const numJobs = (numJobsPrelim * FILE_CHUNK_SIZE) == fileSize ? numJobsPrelim : numJobsPrelim + 1; + size_t const numJobs = MAX_NUM_JOBS; int ret = 0; adaptCCtx* ctx = NULL; diff --git a/contrib/adaptive-compression/run.sh b/contrib/adaptive-compression/run.sh index b9c4caf7f..9d2e98706 100755 --- a/contrib/adaptive-compression/run.sh +++ b/contrib/adaptive-compression/run.sh @@ -1,6 +1,39 @@ make clean multi + ./multi tests/test2048.pdf tmp.zst zstd -d tmp.zst diff tmp tests/test2048.pdf echo "diff test complete" +rm tmp* + +./multi tests/test512.pdf tmp.zst +zstd -d tmp.zst +diff tmp tests/test512.pdf +echo "diff test complete" +rm tmp* + +./multi tests/test64.pdf tmp.zst +zstd -d tmp.zst +diff tmp tests/test64.pdf +echo "diff test complete" +rm tmp* + +./multi tests/test16.pdf tmp.zst +zstd -d tmp.zst +diff tmp tests/test16.pdf +echo "diff test complete" +rm tmp* + +./multi tests/test4.pdf tmp.zst +zstd -d tmp.zst +diff tmp tests/test4.pdf +echo "diff test complete" +rm tmp* + +./multi tests/test.pdf tmp.zst +zstd -d tmp.zst +diff tmp tests/test.pdf +echo "diff test complete" +rm tmp* + make clean