]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
trap compression errors, collect back resources from workers
authorYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 22:11:37 +0000 (14:11 -0800)
committerYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 22:11:37 +0000 (14:11 -0800)
lib/compress/zstdmt_compress.c

index f880e8525da87fd8bbb8e8fd7364404201f555fb..3762f5a2592223e51ac918145f0190bc9c41b993 100644 (file)
@@ -423,7 +423,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
         if ((cctx==NULL) || (dstBuffer.start==NULL)) {
-            zcs->jobs[jobID].cSize = ERROR(memory_allocation);
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
             ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -438,7 +437,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->jobs[jobID].params = zcs->params;
         zcs->jobs[jobID].dstBuff = dstBuffer;
         zcs->jobs[jobID].cctx = cctx;
-        zcs->jobs[jobID].firstChunk = (jobID==0);
+        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
         zcs->jobs[jobID].lastChunk = 0;
         zcs->jobs[jobID].jobCompleted = 0;
         zcs->jobs[jobID].dstFlushed = 0;
@@ -448,7 +447,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         /* get a new buffer for next input - save remaining into it */
         zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
         if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-            zcs->jobs[jobID].cSize = ERROR(memory_allocation);
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
             ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -472,6 +470,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             zcs->jobs[jobID].cctx = NULL;
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
             zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
+            if (ZSTD_isError(job.cSize)) {
+                ZSTDMT_waitForAllJobsCompleted(zcs);
+                ZSTDMT_releaseAllJobResources(zcs);
+                return job.cSize;
+            }
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
@@ -499,7 +502,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
         if ((cctx==NULL) || (dstBuffer.start==NULL)) {
-            zcs->jobs[jobID].cSize = ERROR(memory_allocation);
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
             ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -514,7 +516,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         zcs->jobs[jobID].params = zcs->params;
         zcs->jobs[jobID].dstBuff = dstBuffer;
         zcs->jobs[jobID].cctx = cctx;
-        zcs->jobs[jobID].firstChunk = (jobID==0);
+        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
         zcs->jobs[jobID].lastChunk = endFrame;
         zcs->jobs[jobID].jobCompleted = 0;
         zcs->jobs[jobID].dstFlushed = 0;
@@ -526,7 +528,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
             zcs->inBuff.filled = 0;
             if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-                zcs->jobs[jobID].cSize = ERROR(memory_allocation);
                 zcs->jobs[jobID].jobCompleted = 1;
                 zcs->nextJobID++;
                 ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -543,6 +544,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
     }
 
     /* check if there is any data available to flush */
+    if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
     {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
         PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
         while (zcs->jobs[wJobID].jobCompleted==0) {
@@ -555,6 +557,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
+            if (ZSTD_isError(job.cSize)) {
+                ZSTDMT_waitForAllJobsCompleted(zcs);
+                ZSTDMT_releaseAllJobResources(zcs);
+                return job.cSize;
+            }
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;