unsigned firstChunk;
unsigned lastChunk;
unsigned jobCompleted;
- unsigned checksumWritten;
+ unsigned frameChecksumNeeded;
ZSTD_pthread_mutex_t* jobCompleted_mutex;
ZSTD_pthread_cond_t* jobCompleted_cond;
ZSTD_CCtx_params params;
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
- zcs->jobs[jobID].checksumWritten = 0;
+ zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
}
-/* ZSTDMT_flushNextJob() :
- * output : will be updated with amount of data flushed .
- * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
- * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
+/*! ZSTDMT_flushNextJob() :
+ * `output` : will be updated with amount of data flushed .
+ * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
+ * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
- if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
+ if (zcs->doneJobID == zcs->nextJobID) {
+ DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !")
+ return 0; /* all flushed ! */
+ }
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+ if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
+ if (zcs->jobs[wJobID].jobCompleted==1) break;
DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
- assert(zcs->jobs[wJobID].jobCompleted==0);
- if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
- ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
+ ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
}
/* some output is available to be flushed */
return job.cSize;
}
/* add frame checksum if necessary */
- if ( zcs->frameEnded
- && (zcs->doneJobID+1 == zcs->nextJobID)
- && (zcs->params.fParams.checksumFlag)
- && (!job.checksumWritten) ) {
+ if ( job.jobCompleted
+ && job.frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
- zcs->jobs[wJobID].checksumWritten = 1;
+ zcs->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
}
if ( job.jobCompleted
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
+ DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID);
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
- zcs->doneJobID++;
zcs->consumed += job.srcSize;
zcs->produced += job.cSize;
+ zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
}
- /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+ /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
- if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
- zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */
+ if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */
+ zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
return 0; /* everything flushed */
} }
outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+ if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) {
+ DISPLAY("ZSTD_decompressStream: checksum error : \n");
+ findDiff(copyBuffer, dstBuffer, totalTestSize);
+ }
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
- DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize);
+ DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos);
}
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);