unsigned firstChunk;
unsigned lastChunk;
unsigned jobCompleted;
- unsigned jobScanned;
+ unsigned checksumWritten;
ZSTD_pthread_mutex_t* jobCompleted_mutex;
ZSTD_pthread_cond_t* jobCompleted_cond;
ZSTD_CCtx_params params;
BYTE* op = ostart;
BYTE* oend = op + dstBuff.size;
int blockNb;
- DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
+ DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
+ DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
+ (U32)cSize, (U32)job->cSize);
+ ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
/* last block */
- if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) {
+ if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
size_t const cSize = (job->lastChunk) ?
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1;
- job->jobScanned = 0;
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
+ zcs->jobs[jobID].checksumWritten = 0;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
- DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
+ DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
- while (zcs->jobs[wJobID].jobCompleted==0) {
- DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
+ while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+ DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+ zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
+ assert(zcs->jobs[wJobID].jobCompleted==0);
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
}
- ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
- /* compression job completed : output can be flushed */
+
+ /* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
- if (!job.jobScanned) {
- if (ZSTD_isError(job.cSize)) {
- DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
- zcs->doneJobID, ZSTD_getErrorName(job.cSize));
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return job.cSize;
- }
- DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
- if (zcs->params.fParams.checksumFlag) {
- if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
- U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
- DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
- MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
- job.cSize += 4;
- zcs->jobs[wJobID].cSize += 4;
- } }
- zcs->jobs[wJobID].jobScanned = 1;
+ ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ if (ZSTD_isError(job.cSize)) {
+ DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
+ zcs->doneJobID, ZSTD_getErrorName(job.cSize));
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
+ /* add frame checksum if necessary */
+ if ( zcs->frameEnded
+ && (zcs->doneJobID+1 == zcs->nextJobID)
+ && (zcs->params.fParams.checksumFlag)
+ && (!job.checksumWritten) ) {
+ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+ DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
+ MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
+ job.cSize += 4;
+ zcs->jobs[wJobID].cSize += 4;
+ zcs->jobs[wJobID].checksumWritten = 1;
}
+ assert(job.cSize >= job.dstFlushed);
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
}
- if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
+ if ( job.jobCompleted
+ && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;