#include "threading.h" /* mutex */
#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstdmt_compress.h"
+#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
+#include "xxhash.h"
/* ====== Debug ====== */
unsigned firstChunk;
unsigned lastChunk;
unsigned jobCompleted;
+ unsigned jobScanned;
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params;
_endJob:
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1;
+ job->jobScanned = 0;
pthread_cond_signal(job->jobCompleted_cond);
pthread_mutex_unlock(job->jobCompleted_mutex);
}
size_t inBuffSize;
inBuff_t inBuff;
ZSTD_parameters params;
+ XXH64_state_t xxhState;
unsigned nbThreads;
unsigned jobIDMask;
unsigned doneJobID;
ZSTDMT_releaseAllJobResources(zcs);
zcs->allJobsCompleted = 1;
}
- params.fParams.checksumFlag = 0; /* current limitation : no checksum (to be lifted in a later version) */
zcs->params = params;
if (updateDict) {
ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
zcs->nextJobID = 0;
zcs->frameEnded = 0;
zcs->allJobsCompleted = 0;
+ if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
return 0;
}
/* ZSTDMT_flushNextJob() :
* output : will be updated with amount of data flushed .
- * blockToFlush : the function will block and wait if there is no data available to flush .
+ * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
- DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
- if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
- pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
+ if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
+ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
}
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
/* compression job completed : output can be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
- size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
- ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
- zcs->jobs[wJobID].cctx = NULL;
- ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
- zcs->jobs[wJobID].srcStart = NULL;
- zcs->jobs[wJobID].src = g_nullBuffer;
- if (ZSTD_isError(job.cSize)) {
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return job.cSize;
+ if (!job.jobScanned) {
+ if (ZSTD_isError(job.cSize)) {
+ DEBUGLOG(5, "compression error detected ");
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
+ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+ zcs->jobs[wJobID].cctx = NULL;
+ DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
+ if (zcs->params.fParams.checksumFlag) {
+ XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize);
+ if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
+ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+ DEBUGLOG(4, "writing checksum : %08X \n", checksum);
+ MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
+ job.cSize += 4;
+ zcs->jobs[wJobID].cSize += 4;
+ } }
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+ zcs->jobs[wJobID].srcStart = NULL;
+ zcs->jobs[wJobID].src = g_nullBuffer;
+ zcs->jobs[wJobID].jobScanned = 1;
+ }
+ { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+ DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+ output->pos += toWrite;
+ job.dstFlushed += toWrite;
}
- memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
- output->pos += toWrite;
- job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
- if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+ if ((cctx==NULL) || (dstBuffer.start==NULL)) { /* cannot get resources for next job */
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
return ERROR(memory_allocation);
}
- DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
+ DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
zcs->jobs[jobID].params = zcs->params;
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
zcs->nextJobID++;
}
- /* check if there is any data available to flush */
+ /* check for data to flush */
ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */
-#if 0
- { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
- unsigned jobCompleted;
- pthread_mutex_lock(&zcs->jobCompleted_mutex);
- while (zcs->jobs[jobID].jobCompleted == 0 && zcs->inBuff.filled == zcs->inBuffSize) {
- /* when no new job could be started, block until there is something to flush, ensuring forward progress */
- pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
- }
- jobCompleted = zcs->jobs[jobID].jobCompleted;
- pthread_mutex_unlock(&zcs->jobCompleted_mutex);
- if (jobCompleted) {
- ZSTDMT_jobDescription const job = zcs->jobs[jobID];
- size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
- ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
- zcs->jobs[jobID].cctx = NULL;
- ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
- zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
- if (ZSTD_isError(job.cSize)) {
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return job.cSize;
- }
- memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
- output->pos += toWrite;
- zcs->jobs[jobID].dstFlushed += toWrite;
- DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed));
- if (zcs->jobs[jobID].dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
- zcs->jobs[jobID].dstBuff = g_nullBuffer;
- zcs->jobs[jobID].jobCompleted = 0;
- zcs->doneJobID++;
- } } }
-#endif
+
/* recommended next input size : fill current input buffer */
- return zcs->inBuffSize - zcs->inBuff.filled;
+ return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}
{
size_t const srcSize = zcs->inBuff.filled;
- DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
+ DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].params = zcs->params;
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0;
zcs->frameEnded = 1;
+ if (zcs->nextJobID == 0)
+ zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
}
DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
/* check if there is any data available to flush */
DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
return ZSTDMT_flushNextJob(zcs, output, 1);
-
-#if 0
- { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
- PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
- while (zcs->jobs[wJobID].jobCompleted==0) {
- DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
- pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
- }
- pthread_mutex_unlock(&zcs->jobCompleted_mutex);
- /* compression job completed : output can be flushed */
- { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
- size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
- ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
- if (ZSTD_isError(job.cSize)) {
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return job.cSize;
- }
- memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
- output->pos += toWrite;
- job.dstFlushed += toWrite;
- if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
- zcs->jobs[wJobID].jobCompleted = 0;
- zcs->doneJobID++;
- } else {
- zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
- }
- /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
- if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
- if ((zcs->doneJobID < zcs->nextJobID) || (zcs->inBuff.filled)) return 1; /* still some buffer to flush */
- zcs->allJobsCompleted = zcs->frameEnded;
- return 0;
- } }
-#endif
}