buffer_t dst;
unsigned compressionLevel;
unsigned jobID;
- unsigned jobCompleted;
- unsigned jobReady;
- unsigned jobWritten;
- pthread_mutex_t* jobCompleted_mutex;
- pthread_cond_t* jobCompleted_cond;
- pthread_mutex_t* jobReady_mutex;
- pthread_cond_t* jobReady_cond;
- pthread_mutex_t* jobWrite_mutex;
- pthread_cond_t* jobWrite_cond;
size_t compressedSize;
} jobDescription;
unsigned lastJobID;
unsigned nextJobID;
unsigned threadError;
+ unsigned jobReadyID;
+ unsigned jobCompletedID;
+ unsigned jobWrittenID;
unsigned allJobsCompleted;
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
pthread_cond_init(&ctx->jobWrite_cond, NULL);
ctx->numJobs = numJobs;
+ ctx->jobReadyID = 0;
+ ctx->jobCompletedID = 0;
+ ctx->jobWrittenID = 0;
ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
- {
- unsigned u;
- for (u=0; u<numJobs; u++) {
- ctx->jobs[u].jobCompleted_mutex = &ctx->jobCompleted_mutex;
- ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
- ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
- ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
- ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
- ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
- ctx->jobs[u].jobWritten = 1;
- }
- }
ctx->nextJobID = 0;
ctx->threadError = 0;
ctx->allJobsCompleted = 0;
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
// DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
- pthread_mutex_lock(job->jobReady_mutex);
- while(job->jobReady == 0) {
- pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
+ pthread_mutex_lock(&ctx->jobReady_mutex);
+ while(currJob + 1 > ctx->jobReadyID) {
+ pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
}
- pthread_mutex_unlock(job->jobReady_mutex);
+ pthread_mutex_unlock(&ctx->jobReady_mutex);
// DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
/* compress the data */
{
}
job->compressedSize = compressedSize;
}
- pthread_mutex_lock(job->jobCompleted_mutex);
- job->jobCompleted = 1;
+ pthread_mutex_lock(&ctx->jobCompleted_mutex);
+ ctx->jobCompletedID++;
DEBUGLOG(2, "signaling for job %u\n", currJob);
- pthread_cond_signal(job->jobCompleted_cond);
- pthread_mutex_unlock(job->jobCompleted_mutex);
+ pthread_cond_signal(&ctx->jobCompleted_cond);
+ pthread_mutex_unlock(&ctx->jobCompleted_mutex);
currJob++;
if (currJob >= ctx->lastJobID || ctx->threadError) {
/* finished compressing all jobs */
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUGLOG(2, "outputThread(): waiting on job completed\n");
- pthread_mutex_lock(job->jobCompleted_mutex);
- while (job->jobCompleted == 0) {
+ pthread_mutex_lock(&ctx->jobCompleted_mutex);
+ while (currJob + 1 > ctx->jobCompletedID) {
DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob);
- pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
+ pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
}
- pthread_mutex_unlock(job->jobCompleted_mutex);
+ pthread_mutex_unlock(&ctx->jobCompleted_mutex);
DEBUGLOG(2, "outputThread(): continuing after job completed\n");
{
size_t const compressedSize = job->compressedSize;
}
currJob++;
DEBUGLOG(2, "locking job write mutex\n");
- pthread_mutex_lock(job->jobWrite_mutex);
- job->jobWritten = 1;
- pthread_cond_signal(job->jobWrite_cond);
- pthread_mutex_unlock(job->jobWrite_mutex);
+ pthread_mutex_lock(&ctx->jobWrite_mutex);
+ ctx->jobWrittenID++;
+ pthread_cond_signal(&ctx->jobWrite_cond);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex);
DEBUGLOG(2, "unlocking job write mutex\n");
DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
unsigned const nextJobIndex = nextJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[nextJobIndex];
// DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
- pthread_mutex_lock(job->jobWrite_mutex);
- while (job->jobWritten == 0) {
- pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
+ pthread_mutex_lock(&ctx->jobWrite_mutex);
+ while (nextJob - ctx->jobWrittenID >= ctx->numJobs) {
+ pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
}
- pthread_mutex_unlock(job->jobWrite_mutex);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex);
// DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
job->compressionLevel = ctx->compressionLevel;
job->src.start = malloc(srcSize);
job->src.size = srcSize;
job->dst.size = ZSTD_compressBound(srcSize);
job->dst.start = malloc(job->dst.size);
- job->jobCompleted = 0;
- job->jobWritten = 0;
- job->jobCompleted_cond = &ctx->jobCompleted_cond;
- job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
- job->jobReady_cond = &ctx->jobReady_cond;
- job->jobReady_mutex = &ctx->jobReady_mutex;
job->jobID = nextJob;
if (!job->src.start || !job->dst.start) {
/* problem occurred, free things then return */
return 1;
}
memcpy(job->src.start, data, srcSize);
- pthread_mutex_lock(job->jobReady_mutex);
- job->jobReady = 1;
- pthread_cond_signal(job->jobReady_cond);
- pthread_mutex_unlock(job->jobReady_mutex);
+ pthread_mutex_lock(&ctx->jobReady_mutex);
+ ctx->jobReadyID++;
+ pthread_cond_signal(&ctx->jobReady_cond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex);
ctx->nextJobID++;
return 0;
}