]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ZSTDMT now supports frame checksum
authorYann Collet <cyan@fb.com>
Tue, 24 Jan 2017 19:48:40 +0000 (11:48 -0800)
committerYann Collet <cyan@fb.com>
Tue, 24 Jan 2017 19:48:40 +0000 (11:48 -0800)
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h

index 9e7754b8889780a73dd8ee02b20080df07bf6717..0b91ad4ea82eea4b3191dbbf0141b00b29a92807 100644 (file)
@@ -29,6 +29,8 @@
 #include "threading.h"  /* mutex */
 #include "zstd_internal.h"   /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
 #include "zstdmt_compress.h"
+#define XXH_STATIC_LINKING_ONLY   /* XXH64_state_t */
+#include "xxhash.h"
 
 
 /* ======   Debug   ====== */
@@ -217,6 +219,7 @@ typedef struct {
     unsigned firstChunk;
     unsigned lastChunk;
     unsigned jobCompleted;
+    unsigned jobScanned;
     pthread_mutex_t* jobCompleted_mutex;
     pthread_cond_t* jobCompleted_cond;
     ZSTD_parameters params;
@@ -254,6 +257,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
 _endJob:
     PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
     job->jobCompleted = 1;
+    job->jobScanned = 0;
     pthread_cond_signal(job->jobCompleted_cond);
     pthread_mutex_unlock(job->jobCompleted_mutex);
 }
@@ -273,6 +277,7 @@ struct ZSTDMT_CCtx_s {
     size_t inBuffSize;
     inBuff_t inBuff;
     ZSTD_parameters params;
+    XXH64_state_t xxhState;
     unsigned nbThreads;
     unsigned jobIDMask;
     unsigned doneJobID;
@@ -474,7 +479,6 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
         ZSTDMT_releaseAllJobResources(zcs);
         zcs->allJobsCompleted = 1;
     }
-    params.fParams.checksumFlag = 0;   /* current limitation : no checksum (to be lifted in a later version) */
     zcs->params = params;
     if (updateDict) {
         ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
@@ -492,6 +496,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
     zcs->nextJobID = 0;
     zcs->frameEnded = 0;
     zcs->allJobsCompleted = 0;
+    if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
     return 0;
 }
 
@@ -518,7 +523,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
 
 /* ZSTDMT_flushNextJob() :
  * output : will be updated with amount of data flushed .
- * blockToFlush : the function will block and wait if there is no data available to flush .
+ * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
  * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
@@ -526,28 +531,43 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
     if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
     PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
     while (zcs->jobs[wJobID].jobCompleted==0) {
-        DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);    /* block when nothing available to flush */
-        if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
-        pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+        DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
+        if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
+        pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush */
     }
     pthread_mutex_unlock(&zcs->jobCompleted_mutex);
     /* compression job completed : output can be flushed */
     {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-        size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-        DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
-        ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
-        zcs->jobs[wJobID].cctx = NULL;
-        ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
-        zcs->jobs[wJobID].srcStart = NULL;
-        zcs->jobs[wJobID].src = g_nullBuffer;
-        if (ZSTD_isError(job.cSize)) {
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
-            return job.cSize;
+        if (!job.jobScanned) {
+            if (ZSTD_isError(job.cSize)) {
+                DEBUGLOG(5, "compression error detected ");
+                ZSTDMT_waitForAllJobsCompleted(zcs);
+                ZSTDMT_releaseAllJobResources(zcs);
+                return job.cSize;
+            }
+            ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+            zcs->jobs[wJobID].cctx = NULL;
+            DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
+            if (zcs->params.fParams.checksumFlag) {
+                XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize);
+                if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) {  /* write checksum at end of last section */
+                    U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+                    DEBUGLOG(4, "writing checksum : %08X \n", checksum);
+                    MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
+                    job.cSize += 4;
+                    zcs->jobs[wJobID].cSize += 4;
+            }   }
+            ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+            zcs->jobs[wJobID].srcStart = NULL;
+            zcs->jobs[wJobID].src = g_nullBuffer;
+            zcs->jobs[wJobID].jobScanned = 1;
+        }
+        {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+            output->pos += toWrite;
+            job.dstFlushed += toWrite;
         }
-        memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
-        output->pos += toWrite;
-        job.dstFlushed += toWrite;
         if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => move to next one */
             ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
             zcs->jobs[wJobID].dstBuff = g_nullBuffer;
@@ -583,7 +603,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
         unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
-        if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+        if ((cctx==NULL) || (dstBuffer.start==NULL)) {  /* cannot get resources for next job */
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
             ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -591,11 +611,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             return ERROR(memory_allocation);
         }
 
-        DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
+        DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
         zcs->jobs[jobID].src = zcs->inBuff.buffer;
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
         zcs->jobs[jobID].params = zcs->params;
+        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
         zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
         zcs->jobs[jobID].dict = NULL;
         zcs->jobs[jobID].dictSize = 0;
@@ -626,44 +647,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->nextJobID++;
     }
 
-    /* check if there is any data available to flush */
+    /* check for data to flush */
     ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize));  /* we'll block if it wasn't possible to create new job due to saturation */
-#if 0
-    {   unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
-        unsigned jobCompleted;
-        pthread_mutex_lock(&zcs->jobCompleted_mutex);
-        while (zcs->jobs[jobID].jobCompleted == 0 && zcs->inBuff.filled == zcs->inBuffSize) {
-            /* when no new job could be started, block until there is something to flush, ensuring forward progress */
-            pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
-        }
-        jobCompleted = zcs->jobs[jobID].jobCompleted;
-        pthread_mutex_unlock(&zcs->jobCompleted_mutex);
-        if (jobCompleted) {
-            ZSTDMT_jobDescription const job = zcs->jobs[jobID];
-            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
-            ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
-            zcs->jobs[jobID].cctx = NULL;
-            ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
-            zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
-            if (ZSTD_isError(job.cSize)) {
-                ZSTDMT_waitForAllJobsCompleted(zcs);
-                ZSTDMT_releaseAllJobResources(zcs);
-                return job.cSize;
-            }
-            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
-            output->pos += toWrite;
-            zcs->jobs[jobID].dstFlushed += toWrite;
-            DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed));
-            if (zcs->jobs[jobID].dstFlushed == job.cSize) {   /* output buffer fully flushed => go to next one */
-                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
-                zcs->jobs[jobID].dstBuff = g_nullBuffer;
-                zcs->jobs[jobID].jobCompleted = 0;
-                zcs->doneJobID++;
-    }   }   }
-#endif
+
     /* recommended next input size : fill current input buffer */
-    return zcs->inBuffSize - zcs->inBuff.filled;
+    return zcs->inBuffSize - zcs->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
 }
 
 
@@ -671,7 +659,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 {
     size_t const srcSize = zcs->inBuff.filled;
 
-    DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
+    DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
     if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
         size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
@@ -691,6 +679,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = srcSize;
         zcs->jobs[jobID].params = zcs->params;
+        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
         zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
         zcs->jobs[jobID].dict = NULL;
         zcs->jobs[jobID].dictSize = 0;
@@ -719,6 +708,8 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->inBuff.buffer = g_nullBuffer;
             zcs->inBuff.filled = 0;
             zcs->frameEnded = 1;
+            if (zcs->nextJobID == 0)
+                zcs->params.fParams.checksumFlag = 0;   /* single chunk : checksum is calculated directly within worker thread */
         }
 
         DEBUGLOG(3, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
@@ -729,43 +720,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
     /* check if there is any data available to flush */
     DEBUGLOG(5, "zcs->doneJobID : %u  ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
     return ZSTDMT_flushNextJob(zcs, output, 1);
-
-#if 0
-    {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
-        PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
-        while (zcs->jobs[wJobID].jobCompleted==0) {
-            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);   /* block when nothing available to flush */
-            pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
-        }
-        pthread_mutex_unlock(&zcs->jobCompleted_mutex);
-        /* compression job completed : output can be flushed */
-        {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
-            ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
-            ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
-            if (ZSTD_isError(job.cSize)) {
-                ZSTDMT_waitForAllJobsCompleted(zcs);
-                ZSTDMT_releaseAllJobResources(zcs);
-                return job.cSize;
-            }
-            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
-            output->pos += toWrite;
-            job.dstFlushed += toWrite;
-            if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
-                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
-                zcs->jobs[wJobID].jobCompleted = 0;
-                zcs->doneJobID++;
-            } else {
-                zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
-            }
-            /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
-            if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-            if ((zcs->doneJobID < zcs->nextJobID) || (zcs->inBuff.filled)) return 1;   /* still some buffer to flush */
-            zcs->allJobsCompleted = zcs->frameEnded;
-            return 0;
-    }   }
-#endif
 }
 
 
index bdc4caabc03b433044125de69075c2f6a96cb7d2..84d25f7386d40b9025850bffbbd0411773bf1aca 100644 (file)
@@ -28,9 +28,9 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
 /* ===   Streaming functions   === */
 
 ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
-ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize);   /**< pledgedSrcSize is optional and can be zero == unknown */
-ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,   /**< dict can be released after init */
-                                   ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< params current limitation : no checksum ; pledgedSrcSize is optional and can be zero == unknown */
+ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize);    /**< pledgedSrcSize is optional and can be zero == unknown */
+ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,   /**< dict can be released after init, a local copy is preserved within zcs */
+                                   ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< pledgedSrcSize is optional and can be zero == unknown */
 
 ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);