buffer_t dst;
unsigned compressionLevel;
unsigned jobID;
+ unsigned lastJob;
size_t compressedSize;
} jobDescription;
unsigned compressionLevel;
unsigned numActiveThreads;
unsigned numJobs;
- unsigned lastJobID;
unsigned nextJobID;
unsigned threadError;
unsigned jobReadyID;
ctx->jobReadyID = 0;
ctx->jobCompressedID = 0;
ctx->jobWriteID = 0;
- ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
- /* allocating buffers for jobs */
+ /* initializing jobs */
{
unsigned jobNum;
for (jobNum=0; jobNum<numJobs; jobNum++) {
jobDescription* job = &ctx->jobs[jobNum];
job->src.start = malloc(FILE_CHUNK_SIZE);
job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
+ job->lastJob = 0;
if (!job->src.start || !job->dst.start) {
DISPLAY("Could not allocate buffers for jobs\n");
freeCCtx(ctx);
pthread_mutex_unlock(&ctx->jobCompressed_mutex);
DEBUGLOG(2, "finished job compression %u\n", currJob);
currJob++;
- if (currJob >= ctx->lastJobID || ctx->threadError) {
+ if (job->lastJob || ctx->threadError) {
/* finished compressing all jobs */
DEBUGLOG(2, "all jobs finished compressing\n");
break;
}
DEBUGLOG(2, "finished job write %u\n", currJob);
currJob++;
- displayProgress(currJob, ctx->compressionLevel, currJob >= ctx->lastJobID);
+ displayProgress(currJob, ctx->compressionLevel, job->lastJob);
DEBUGLOG(2, "locking job write mutex\n");
pthread_mutex_lock(&ctx->jobWrite_mutex);
ctx->jobWriteID++;
pthread_mutex_unlock(&ctx->jobWrite_mutex);
DEBUGLOG(2, "unlocking job write mutex\n");
- DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
- if (currJob >= ctx->lastJobID || ctx->threadError) {
+ if (job->lastJob || ctx->threadError) {
/* finished with all jobs */
DEBUGLOG(2, "all jobs finished writing\n");
pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
return arg;
}
-static int createCompressionJob(adaptCCtx* ctx, size_t srcSize)
+static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
{
unsigned const nextJob = ctx->nextJobID;
unsigned const nextJobIndex = nextJob % ctx->numJobs;
job->src.size = srcSize;
job->dst.size = ZSTD_compressBound(srcSize);
job->jobID = nextJob;
+ job->lastJob = last;
memcpy(job->src.start, ctx->input.buffer.start, srcSize);
pthread_mutex_lock(&ctx->jobReady_mutex);
ctx->jobReadyID++;
g_streamedSize += readSize;
/* reading was fine, now create the compression job */
{
- int const error = createCompressionJob(ctx, readSize);
+ int const last = feof(srcFile);
+ int const error = createCompressionJob(ctx, readSize, last);
if (error != 0) {
ret = error;
ctx->threadError = 1;
}
if (feof(srcFile)) {
DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
- ctx->lastJobID = ctx->nextJobID;
break;
}
}