]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt: added ability to flush current job before it's completed
authorYann Collet <cyan@fb.com>
Thu, 18 Jan 2018 19:03:27 +0000 (11:03 -0800)
committerYann Collet <cyan@fb.com>
Thu, 18 Jan 2018 19:03:27 +0000 (11:03 -0800)
however, zstdmt may still wait on next available worker,
so it's not smooth yet.

lib/compress/zstdmt_compress.c

index 5624b5e3dd9d67a2deb3676c22cb407a21562323..812bc89b1d916d9517137d55dd3775743b83a4d4 100644 (file)
@@ -315,7 +315,7 @@ typedef struct {
     unsigned firstChunk;
     unsigned lastChunk;
     unsigned jobCompleted;
-    unsigned jobScanned;
+    unsigned checksumWritten;
     ZSTD_pthread_mutex_t* jobCompleted_mutex;
     ZSTD_pthread_cond_t* jobCompleted_cond;
     ZSTD_CCtx_params params;
@@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
         BYTE* op = ostart;
         BYTE* oend = op + dstBuff.size;
         int blockNb;
-        DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
+        DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
         assert(job->cSize == 0);
         for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
@@ -400,10 +400,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
             ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
             job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
+            DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
+                        (U32)cSize, (U32)job->cSize);
+            ZSTD_pthread_cond_signal(job->jobCompleted_cond);
             ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
         }
         /* last block */
-        if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) {
+        if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
             size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
             size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
             size_t const cSize = (job->lastChunk) ?
@@ -428,7 +431,6 @@ _endJob:
     ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
     job->consumed = job->srcSize;
     job->jobCompleted = 1;
-    job->jobScanned = 0;
     ZSTD_pthread_cond_signal(job->jobCompleted_cond);
     ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
 }
@@ -1017,6 +1019,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
     zcs->jobs[jobID].lastChunk = endFrame;
     zcs->jobs[jobID].jobCompleted = 0;
+    zcs->jobs[jobID].checksumWritten = 0;
     zcs->jobs[jobID].dstFlushed = 0;
     zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
     zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
@@ -1069,43 +1072,48 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
-    DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
+    DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
     if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
     ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
-    while (zcs->jobs[wJobID].jobCompleted==0) {
-        DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
+    while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+        DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+                    zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
+        assert(zcs->jobs[wJobID].jobCompleted==0);
         if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
         ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush */
     }
-    ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
-    /* compression job completed : output can be flushed */
+
+    /* some output is available to be flushed */
     {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-        if (!job.jobScanned) {
-            if (ZSTD_isError(job.cSize)) {
-                DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
-                            zcs->doneJobID, ZSTD_getErrorName(job.cSize));
-                ZSTDMT_waitForAllJobsCompleted(zcs);
-                ZSTDMT_releaseAllJobResources(zcs);
-                return job.cSize;
-            }
-            DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
-            if (zcs->params.fParams.checksumFlag) {
-                if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) {  /* write checksum at end of last section */
-                    U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
-                    DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
-                    MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
-                    job.cSize += 4;
-                    zcs->jobs[wJobID].cSize += 4;
-            }   }
-            zcs->jobs[wJobID].jobScanned = 1;
+        ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+        if (ZSTD_isError(job.cSize)) {
+            DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
+                        zcs->doneJobID, ZSTD_getErrorName(job.cSize));
+            ZSTDMT_waitForAllJobsCompleted(zcs);
+            ZSTDMT_releaseAllJobResources(zcs);
+            return job.cSize;
+        }
+        /* add frame checksum if necessary */
+        if ( zcs->frameEnded
+          && (zcs->doneJobID+1 == zcs->nextJobID)
+          && (zcs->params.fParams.checksumFlag)
+          && (!job.checksumWritten) ) {
+            U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+            DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
+            MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
+            job.cSize += 4;
+            zcs->jobs[wJobID].cSize += 4;
+            zcs->jobs[wJobID].checksumWritten = 1;
         }
+        assert(job.cSize >= job.dstFlushed);
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
         }
-        if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => move to next one */
+        if ( job.jobCompleted
+          && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
             ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
             zcs->jobs[wJobID].dstBuff = g_nullBuffer;
             zcs->jobs[wJobID].jobCompleted = 0;