{
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
- unsigned const limitID = zcs->doneJobID & zcs->jobIDMask;
- if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID))
- return 0; /* new job would overwrite unflushed older job */
+ if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
+ assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
+ return 0;
+ }
if (!zcs->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
}
-/*! ZSTDMT_flushNextJob() :
- * `output` : will be updated with amount of data flushed .
+/*! ZSTDMT_flushProduced() :
+ * `output` : `pos` will be updated with amount of data flushed .
* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
-static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
+static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
- DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
- if (zcs->doneJobID == zcs->nextJobID) {
- DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
- zcs->doneJobID, zcs->nextJobID)
- return 0; /* all flushed ! */
- }
+ DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
+ assert(output->size >= output->pos);
+
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
- while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
- if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
- if (zcs->jobs[wJobID].jobCompleted==1) break;
- DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
- zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
- ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
- }
+ if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
+ while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+ if (zcs->jobs[wJobID].jobCompleted==1) break;
+ DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+ zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
+ ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
+ } }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
if (ZSTD_isError(job.cSize)) {
- DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
+ DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
- /* add frame checksum if necessary */
+ /* add frame checksum if necessary (can only happen once) */
if ( job.jobCompleted
&& job.frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
- DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
+ DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
zcs->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
- { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
+ if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */
+ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+ DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
(U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
- }
- if ( job.jobCompleted
- && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
- DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
- zcs->doneJobID, (U32)job.dstFlushed);
- ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
- zcs->jobs[wJobID].dstBuff = g_nullBuffer;
- zcs->jobs[wJobID].jobCompleted = 0;
- zcs->consumed += job.srcSize;
- zcs->produced += job.cSize;
- zcs->doneJobID++;
- } else {
- zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
- }
+
+ if ( job.jobCompleted
+ && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
+ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
+ zcs->doneJobID, (U32)job.dstFlushed);
+ ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
+ zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+ zcs->jobs[wJobID].jobCompleted = 0;
+ zcs->consumed += job.srcSize;
+ zcs->produced += job.cSize;
+ zcs->doneJobID++;
+ } else {
+ zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
+ } }
+
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
- if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */
- if (zcs->jobReady) return 1; /* some more work to do ! */
- zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
- return 0; /* everything flushed */
-} }
+ if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
+ }
+ if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */
+ if (zcs->jobReady) return 1; /* at least one more job to do ! */
+ if (zcs->inBuff.filled > 0) return 1; /* input not empty */
+ zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
+ return 0; /* everything flushed */
+}
/** ZSTDMT_compressStream_generic() :
}
/* check for potential compressed data ready to be flushed */
- { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
+ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
return remainingToFlush;
}
if ( mtctx->jobReady /* one job ready for a worker to pick up */
|| (srcSize > 0) /* still some data within input buffer */
|| (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
- DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
+ DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
+ (U32)srcSize, endFrame);
CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
}
/* check if there is any data available to flush */
- return ZSTDMT_flushNextJob(mtctx, output, 1 /* blockToFlush */);
+ return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */);
}
if (cond) { \
DISPLAY("Error => "); \
DISPLAY(__VA_ARGS__); \
- DISPLAY(" (seed %u, test nb %u, line %u) \n", \
- seed, testNb, __LINE__); \
+ DISPLAY(" (seed %u, test nb %u, line %u (sig %08X) \n", \
+ seed, testNb, __LINE__, coreSeed); \
goto _output_error; \
} }
size_t cSize;
int testResult = 0;
U32 testNb = 1;
+ U32 coreSeed = 0; /* just to conform with CHECK_Z macro display */
ZSTD_CStream* zc = ZSTD_createCStream();
ZSTD_DStream* zd = ZSTD_createDStream();
ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2);
size_t maxTestSize;
/* init */
- if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); }
- else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed);
lseed = coreSeed ^ prime32;
+ if (nbTests >= testNb) {
+ DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed);
+ } else {
+ DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed);
+ }
/* states full reset (deliberately not synchronized) */
/* some issues can only happen when reusing states */
UTIL_time_t const startClock = UTIL_getTime();
const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */
size_t dictSize = 0;
- U32 oldTestLog = 0;
int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests;
U32 const nbThreadsMax = bigTests ? 4 : 2;
RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */
memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */
ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
+ DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads);
/* catch up testNb */
for (testNb=1; testNb < startTest; testNb++)
size_t totalTestSize, totalGenSize, cSize;
XXH64_state_t xxhState;
U64 crcOrig;
- U32 resetAllowed = 1;
size_t maxTestSize;
- /* init */
- if (testNb < nbTests) {
- DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
- } else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed);
+ if (nbTests >= testNb) {
+ DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, coreSeed);
+ } else {
+ DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, coreSeed);
+ }
lseed = coreSeed ^ prime32;
/* states full reset (deliberately not synchronized) */
ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads);
CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error")
- resetAllowed=0;
}
if ((FUZ_rand(&lseed) & 0xFF) == 132) {
ZSTD_freeDStream(zd);
}
/* compression init */
- if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */
- && oldTestLog /* at least one test happened */ && resetAllowed) {
- maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
- if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
- { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
- DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
- CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
- }
- } else {
- U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
+ { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
int const cLevelCandidate = ( FUZ_rand(&lseed)
% (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) )
int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */
int const cLevel = MIN(cLevelMin, cLevelMax);
maxTestSize = FUZ_rLogLength(&lseed, testLog);
- oldTestLog = testLog;
- /* random dictionary selection */
- dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
- { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
- dict = srcBuffer + dictStart;
- }
- { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
- ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
- DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
- params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
- params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
- params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
- params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
- DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
- CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
- CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
- CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
- } }
+
+ if (FUZ_rand(&lseed)&1) { /* simple init */
+ int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
+ DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
+ CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
+ } else { /* advanced init */
+ /* random dictionary selection */
+ dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
+ { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
+ dict = srcBuffer + dictStart;
+ }
+ { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
+ ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
+ DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
+ params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
+ params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
+ params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
+ params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
+ DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
+ CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
+ CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
+ CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
+ } } }
/* multi-segments compression test */
XXH64_reset(&xxhState, 0);
} }
crcOrig = XXH64_digest(&xxhState);
cSize = outBuff.pos;
- DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize);
+ DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n",
+ (U32)totalTestSize, (U32)cSize);
}
/* multi - fragments decompression test */
+ assert(totalTestSize < dstBufferSize);
+ memset(dstBuffer, 170, totalTestSize); /* init dest area */
if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
CHECK_Z( ZSTD_resetDStream(zd) );
} else {
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize;
- DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
+ DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n",
+ (U32)readCSrcSize, (U32)dstBuffSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize);
}
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
- DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos);
+ DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n",
+ (U32)inBuff.pos, (U32)outBuff.pos);
}
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);