]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : move on when not enough memory for a new input buffer
authorYann Collet <cyan@fb.com>
Thu, 28 Sep 2017 18:46:19 +0000 (11:46 -0700)
committerYann Collet <cyan@fb.com>
Thu, 28 Sep 2017 18:46:19 +0000 (11:46 -0700)
just continue operations without input forward progress,
instead of an error that stops current compression session.

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c

index eff6c57c6137da3e6197c9295655e97a067cc4db..117a1f58b2d8713750cd5692c10186c7b5ac2c41 100644 (file)
@@ -642,7 +642,7 @@ ZSTD_compressionParameters ZSTD_adjustCParams_internal(ZSTD_compressionParameter
     /* resize windowLog if src is small, to use less memory when necessary */
     ZSTD_STATIC_ASSERT(ZSTD_CONTENTSIZE_UNKNOWN == (0ULL - 1));
     if ( (dictSize || (srcSize+1 > 1))  /* srcSize test depends on static assert condition */
-      && (srcSize-1 < (1ULL<<ZSTD_WINDOWLOG_MAX)) ) /* no correction is srcSize is large enough */ {
+      && (srcSize-1 < (1ULL<<ZSTD_WINDOWLOG_MAX)) ) /* no correction when srcSize is large enough */ {
         U32 const minSrcSize = (srcSize==0) ? 513 : 0;
         U64 const rSize = srcSize + dictSize + minSrcSize;
         if (rSize < (1ULL<<ZSTD_WINDOWLOG_MAX)) {
index a1f4c5a704723a6cffb4076db445f00385543512..d83abaf39bfded476ca63aed6aca34c20739f73f 100644 (file)
@@ -976,7 +976,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
 
 
 /** ZSTDMT_compressStream_generic() :
- *  internal use only
+ *  internal use only - exposed to be invoked from zstd_compress.c
  *  assumption : output and input are valid (pos <= size)
  * @return : minimum amount of data remaining to flush, 0 if none */
 size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
@@ -985,6 +985,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                                      ZSTD_EndDirective endOp)
 {
     size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize;
+    unsigned forwardInputProgress = 0;
     assert(output->pos <= output->size);
     assert(input->pos  <= input->size);
     if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
@@ -995,10 +996,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
         return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
     }
 
-    /* single-pass shortcut (note : this is synchronous-mode) */
-    if ( (mtctx->nextJobID==0)      /* just started */
-      && (mtctx->inBuff.filled==0)  /* nothing buffered */
-      && (endOp==ZSTD_e_end)        /* end order */
+    /* single-pass shortcut (note : synchronous-mode) */
+    if ( (mtctx->nextJobID == 0)     /* just started */
+      && (mtctx->inBuff.filled == 0) /* nothing buffered */
+      && (endOp == ZSTD_e_end)       /* end order */
       && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
         size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
                 (char*)output->dst + output->pos, output->size - output->pos,
@@ -1016,18 +1017,16 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     /* fill input buffer */
     if (input->size > input->pos) {   /* support NULL input */
         if (mtctx->inBuff.buffer.start == NULL) {
-            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
-            if (mtctx->inBuff.buffer.start == NULL) {
-                ZSTDMT_waitForAllJobsCompleted(mtctx);
-                return ERROR(memory_allocation);
-            }
+            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);  /* note : may fail, in which case, no forward input progress */
             mtctx->inBuff.filled = 0;
         }
-        {   size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
+        if (mtctx->inBuff.buffer.start) {
+            size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
             DEBUGLOG(5, "inBuff:%08X;  inBuffSize=%u;  ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
             memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
             input->pos += toLoad;
             mtctx->inBuff.filled += toLoad;
+            forwardInputProgress = toLoad>0;
     }   }
 
     if ( (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
@@ -1036,7 +1035,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     }
 
     /* check for potential compressed data ready to be flushed */
-    CHECK_F( ZSTDMT_flushNextJob(mtctx, output, (mtctx->inBuff.filled == mtctx->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
+    CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */
 
     if (input->pos < input->size)  /* input not consumed : do not flush yet */
         endOp = ZSTD_e_continue;