From: Paul Cruz Date: Tue, 4 Jul 2017 03:05:42 +0000 (-0700) Subject: working I believe X-Git-Tag: v1.3.1^2~13^2^2~128 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=0b70152a9b262e6e2f4106c1ca261d7546663865;p=thirdparty%2Fzstd.git working I believe --- diff --git a/contrib/adaptive-compression/v2.c b/contrib/adaptive-compression/v2.c index edb2d0f3f..2f58e7503 100644 --- a/contrib/adaptive-compression/v2.c +++ b/contrib/adaptive-compression/v2.c @@ -33,6 +33,7 @@ typedef struct { unsigned compressionLevel; unsigned numActiveThreads; unsigned numJobs; + unsigned lastJobID; unsigned nextJobID; unsigned threadError; unsigned allJobsCompleted; @@ -63,7 +64,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL); pthread_cond_init(&ctx->allJobsCompleted_cond, NULL); ctx->numJobs = numJobs; + ctx->lastJobID = -1; /* intentional underflow */ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); + DISPLAY("jobs %p\n", ctx->jobs); { unsigned u; for (u=0; ujobCompleted_cond); int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex); int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond); + int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex); + int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond); int const fileError = fclose(ctx->dstFile); freeCompressionJobs(ctx); free(ctx->jobs); - return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError; + return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError | allJobsMutexError | allJobsCondError; } } @@ -131,10 +136,10 @@ static void* compressionThread(void* arg) jobDescription* job = &ctx->jobs[currJob]; pthread_mutex_lock(job->jobReady_mutex); while(job->jobReady == 0) { + DISPLAY("waiting\n"); pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex); } pthread_mutex_unlock(job->jobReady_mutex); - /* compress the data */ { size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel); @@ -145,8 +150,12 @@ static void* compressionThread(void* arg) } job->compressedSize = compressedSize; } + pthread_mutex_lock(job->jobCompleted_mutex); + job->jobCompleted = 1; + pthread_cond_signal(job->jobCompleted_cond); + pthread_mutex_unlock(job->jobCompleted_mutex); currJob++; - if (currJob >= ctx->numJobs || ctx->threadError) { + if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished compressing all jobs */ break; } @@ -158,7 +167,6 @@ static void* outputThread(void* arg) { DISPLAY("started output thread\n"); adaptCCtx* ctx = (adaptCCtx*)arg; - DISPLAY("casted ctx\n"); unsigned currJob = 0; for ( ; ; ) { @@ -183,7 +191,7 @@ static void* outputThread(void* arg) } } currJob++; - if (currJob >= ctx->numJobs || ctx->threadError) { + if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished with all jobs */ pthread_mutex_lock(&ctx->allJobsCompleted_mutex); ctx->allJobsCompleted = 1; @@ -220,29 +228,30 @@ static size_t getFileSize(const char* const filename) static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) { unsigned const nextJob = ctx->nextJobID; - jobDescription job = ctx->jobs[nextJob]; - job.compressionLevel = ctx->compressionLevel; - job.src.start = malloc(srcSize); - job.src.size = srcSize; - job.dst.size = ZSTD_compressBound(srcSize); - job.dst.start = malloc(job.dst.size); - job.jobCompleted = 0; - job.jobCompleted_cond = &ctx->jobCompleted_cond; - job.jobCompleted_mutex = &ctx->jobCompleted_mutex; - job.jobReady_cond = &ctx->jobReady_cond; - job.jobReady_mutex = &ctx->jobReady_mutex; - job.jobID = nextJob; - if (!job.src.start || !job.dst.start) { + jobDescription* job = &ctx->jobs[nextJob]; + job->compressionLevel = ctx->compressionLevel; + job->src.start = malloc(srcSize); + job->src.size = srcSize; + job->dst.size = ZSTD_compressBound(srcSize); + job->dst.start = malloc(job->dst.size); + job->jobCompleted = 0; + job->jobCompleted_cond = &ctx->jobCompleted_cond; + job->jobCompleted_mutex = &ctx->jobCompleted_mutex; + job->jobReady_cond = &ctx->jobReady_cond; + job->jobReady_mutex = &ctx->jobReady_mutex; + job->jobID = nextJob; + if (!job->src.start || !job->dst.start) { /* problem occurred, free things then return */ - if (job.src.start) free(job.src.start); - if (job.dst.start) free(job.dst.start); + DISPLAY("Error: problem occurred during job creation\n"); + if (job->src.start) free(job->src.start); + if (job->dst.start) free(job->dst.start); return 1; } - memcpy(job.src.start, data, srcSize); - pthread_mutex_lock(job.jobReady_mutex); - job.jobReady = 1; - pthread_cond_signal(job.jobReady_cond); - pthread_mutex_unlock(job.jobReady_mutex); + memcpy(job->src.start, data, srcSize); + pthread_mutex_lock(job->jobReady_mutex); + job->jobReady = 1; + pthread_cond_signal(job->jobReady_cond); + pthread_mutex_unlock(job->jobReady_mutex); ctx->nextJobID++; return 0; } @@ -305,14 +314,12 @@ int main(int argCount, const char* argv[]) /* creating jobs */ for ( ; ; ) { - DISPLAY("in job creation loop\n"); size_t const readSize = fread(src, 1, FILE_CHUNK_SIZE, srcFile); if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); ret = 1; goto cleanup; } - DISPLAY("reading was fine\n"); /* reading was fine, now create the compression job */ { int const error = createCompressionJob(ctx, src, readSize); @@ -321,9 +328,12 @@ int main(int argCount, const char* argv[]) goto cleanup; } } - if (feof(srcFile)) break; + if (feof(srcFile)) { + ctx->lastJobID = ctx->nextJobID; + break; + } } - + cleanup: /* file compression completed */ ret |= (srcFile != NULL) ? fclose(srcFile) : 0;