#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
#define FILE_CHUNK_SIZE 4 << 20
+#define MAX_NUM_JOBS 30;
typedef unsigned char BYTE;
#include <stdio.h> /* fprintf */
unsigned jobID;
unsigned jobCompleted;
unsigned jobReady;
+ unsigned jobWritten;
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
pthread_mutex_t* jobReady_mutex;
pthread_cond_t* jobReady_cond;
+ pthread_mutex_t* jobWrite_mutex;
+ pthread_cond_t* jobWrite_cond;
size_t compressedSize;
} jobDescription;
pthread_cond_t jobReady_cond;
pthread_mutex_t allJobsCompleted_mutex;
pthread_cond_t allJobsCompleted_cond;
+ pthread_mutex_t jobWrite_mutex;
+ pthread_cond_t jobWrite_cond;
jobDescription* jobs;
FILE* dstFile;
} adaptCCtx;
int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond);
+ int const jobWriteMutexError = pthread_mutex_destroy(&ctx->jobWrite_mutex);
+ int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond);
int const fileCloseError = ctx->dstFile != NULL ? fclose(ctx->dstFile) : 0;
if (ctx->jobs){
freeCompressionJobs(ctx);
free(ctx->jobs);
}
- return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError;
+ return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError;
}
}
pthread_cond_init(&ctx->jobReady_cond, NULL);
pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
+ pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
+ pthread_cond_init(&ctx->jobWrite_cond, NULL);
ctx->numJobs = numJobs;
ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
+ ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
+ ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
+ ctx->jobs[u].jobWritten = 1;
}
}
ctx->nextJobID = 0;
adaptCCtx* ctx = (adaptCCtx*)arg;
unsigned currJob = 0;
for ( ; ; ) {
- jobDescription* job = &ctx->jobs[currJob];
+ unsigned const currJobIndex = currJob % ctx->numJobs;
+ jobDescription* job = &ctx->jobs[currJobIndex];
pthread_mutex_lock(job->jobReady_mutex);
while(job->jobReady == 0) {
pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
unsigned currJob = 0;
for ( ; ; ) {
- jobDescription* job = &ctx->jobs[currJob];
+ unsigned const currJobIndex = currJob % ctx->numJobs;
+ jobDescription* job = &ctx->jobs[currJobIndex];
pthread_mutex_lock(job->jobCompleted_mutex);
while (job->jobCompleted == 0) {
pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
return arg;
}
{
- size_t const writeSize = fwrite(ctx->jobs[currJob].dst.start, 1, compressedSize, ctx->dstFile);
+ size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, ctx->dstFile);
if (writeSize != compressedSize) {
DISPLAY("Error: an error occurred during file write operation\n");
return arg;
}
}
currJob++;
+ pthread_mutex_lock(job->jobWrite_mutex);
+ job->jobWritten = 1;
+ pthread_cond_signal(job->jobWrite_cond);
+ pthread_mutex_unlock(job->jobWrite_mutex);
if (currJob >= ctx->lastJobID || ctx->threadError) {
/* finished with all jobs */
pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
{
unsigned const nextJob = ctx->nextJobID;
- jobDescription* job = &ctx->jobs[nextJob];
+ unsigned const nextJobIndex = nextJob % ctx->numJobs;
+ jobDescription* job = &ctx->jobs[nextJobIndex];
+ pthread_mutex_lock(job->jobWrite_mutex);
+ while (job->jobWritten == 0) {
+ pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
+ }
+ pthread_mutex_unlock(job->jobWrite_mutex);
job->compressionLevel = ctx->compressionLevel;
job->src.start = malloc(srcSize);
job->src.size = srcSize;
job->dst.size = ZSTD_compressBound(srcSize);
job->dst.start = malloc(job->dst.size);
job->jobCompleted = 0;
+ job->jobWritten = 0;
job->jobCompleted_cond = &ctx->jobCompleted_cond;
job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
job->jobReady_cond = &ctx->jobReady_cond;
BYTE* const src = malloc(FILE_CHUNK_SIZE);
FILE* const srcFile = fopen(srcFilename, "rb");
size_t fileSize = getFileSize(srcFilename);
- size_t const numJobsPrelim = (fileSize / ((size_t)FILE_CHUNK_SIZE));
- size_t const numJobs = (numJobsPrelim * FILE_CHUNK_SIZE) == fileSize ? numJobsPrelim : numJobsPrelim + 1;
+ size_t const numJobs = MAX_NUM_JOBS;
int ret = 0;
adaptCCtx* ctx = NULL;