return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
}
-#define MUTEX_WAIT_TIME_DLEVEL 5
+#define MUTEX_WAIT_TIME_DLEVEL 6
#define PTHREAD_MUTEX_LOCK(mutex) { \
if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
}
/* fill input buffer */
- { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
+ if (input->src) { /* support NULL input */
+ size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad;
mtctx->inBuff.filled += toLoad;
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
- size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
- if (zcs->frameEnded) {
- /* current frame being ended. Only flush is allowed. Or start new job with init */
- DEBUGLOG(5, "ZSTDMT_compressStream: zcs::frameEnded==1");
- return ERROR(stage_wrong);
- }
- if (zcs->nbThreads==1) {
- return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
- }
-
- /* fill input buffer */
- { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
- memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, (const char*)input->src + input->pos, toLoad);
- input->pos += toLoad;
- zcs->inBuff.filled += toLoad;
- }
-
- if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
- && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
- CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* endFrame */) );
- }
-
- /* check for data to flush */
- CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
+ CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
}
-
-size_t ZSTDMT_compressStream_generic2(ZSTDMT_CCtx* mtctx,
- ZSTD_outBuffer* output,
- ZSTD_inBuffer* input,
- ZSTD_EndDirective endOp)
-{
- DEBUGLOG(5, "ZSTDMT_compressStream_generic");
- DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u",
- (U32)input->pos, (U32)input->size, (U32)endOp);
- if (input->pos < input->size) /* some input to consume */
- CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
- if (input->pos < input->size) /* input not consumed : do not flush yet */
- endOp = ZSTD_e_continue;
- switch(endOp)
- {
- case ZSTD_e_flush:
- return ZSTDMT_flushStream(mtctx, output);
- case ZSTD_e_end:
- DEBUGLOG(5, "endOp:%u; calling ZSTDMT_endStream", (U32)endOp);
- return ZSTDMT_endStream(mtctx, output);
- case ZSTD_e_continue:
- return 1;
- default:
- return ERROR(GENERIC); /* invalid endDirective */
- }
-}