]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
reduced zstdmt latency when using small custom section sizes with high compression...
authorYann Collet <cyan@fb.com>
Fri, 27 Jan 2017 23:55:30 +0000 (15:55 -0800)
committerYann Collet <cyan@fb.com>
Fri, 27 Jan 2017 23:55:30 +0000 (15:55 -0800)
Previous version was requiring a fairly large initial amount of input data
before starting to create compression jobs.
This new version starts the process much sooner.

lib/compress/zstdmt_compress.c

index 5f0bf2ab15f864d04ba648940b175626612c2256..ca9bf6a26406cae7ffd42752bc6daf279f2b5f73 100644 (file)
@@ -273,6 +273,7 @@ struct ZSTDMT_CCtx_s {
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
     size_t targetSectionSize;
+    size_t marginSize;
     size_t inBuffSize;
     size_t dictSize;
     size_t targetDictSize;
@@ -514,8 +515,9 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
     zcs->frameContentSize = pledgedSrcSize;
     zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
     zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
+    zcs->marginSize = zcs->targetSectionSize >> 2;
     zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0;
-    zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize;
+    zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize;
     zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
     if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
     zcs->inBuff.filled = 0;
@@ -680,6 +682,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
+    size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
     if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Only flush is allowed. Restart with init */
     if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
 
@@ -690,7 +693,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->inBuff.filled += toLoad;
     }
 
-    if ( (zcs->inBuff.filled == zcs->inBuffSize)  /* filled enough : let's compress */
+    if ( (zcs->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
         && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {   /* avoid overwriting job round buffer */
         CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
     }