]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ZSTDMT_compress() creates a single frame
authorYann Collet <cyan@fb.com>
Wed, 11 Jan 2017 17:21:25 +0000 (18:21 +0100)
committerYann Collet <cyan@fb.com>
Wed, 11 Jan 2017 17:21:25 +0000 (18:21 +0100)
The new strategy involves cutting frame at block level.
The result is a single frame, preserving ZSTD_getDecompressedSize()

As a consequence, bench can now make a full round-trip,
since the result is compatible with ZSTD_decompress().

This strategy will not make it possible to decode the frame with multiple threads
since the exact cut between independent blocks is not known.
MT decoding needs further discussions.

Makefile
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/zstd.h
programs/Makefile
programs/bench.c

index 19b12d0ef738a52964680f35c64c0d386c6ad6f0..0a3634c392e128427dd8c102e24ea9ad558b1625 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -88,7 +88,7 @@ travis-install:
        $(MAKE) install PREFIX=~/install_test_dir
 
 gpptest: clean
-       $(MAKE) -C programs all CC=g++ CFLAGS="-O3 -Wall -Wextra -Wundef -Wshadow -Wcast-align -Werror"
+       CC=g++ $(MAKE) -C programs all CFLAGS="-O3 -Wall -Wextra -Wundef -Wshadow -Wcast-align -Werror"
 
 gcc5test: clean
        gcc-5 -v
index 7626b33a685672bc66d993ed790f4f6f31916e48..c4dbb6ced5b7fa97989178007586437b816567c2 100644 (file)
@@ -2408,12 +2408,14 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx,
 
     cctx->nextSrc = ip + srcSize;
 
-    {   size_t const cSize = frame ?
+    if (srcSize) {
+        size_t const cSize = frame ?
                              ZSTD_compress_generic (cctx, dst, dstCapacity, src, srcSize, lastFrameChunk) :
                              ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize);
         if (ZSTD_isError(cSize)) return cSize;
         return cSize + fhSize;
-    }
+    } else
+        return fhSize;
 }
 
 
index 8471b7509127ba2a24e5dbab5d45a3549265c1f7..ae986468baeaec51bbce8dba822936514b13635d 100644 (file)
@@ -28,8 +28,8 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
    unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
    unsigned long long elapsedTime = (afterTime-beforeTime); \
    if (elapsedTime > 1000) {  /* or whatever threshold you like; I'm using 1 millisecond here */ \
-      DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \
-                (long int) pthread_self(), elapsedTime, #mutex); \
+      DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
+               elapsedTime, #mutex); \
   } \
 } else pthread_mutex_lock(mutex);
 
@@ -112,6 +112,7 @@ typedef struct {
     buffer_t dstBuff;
     int compressionLevel;
     unsigned frameID;
+    unsigned long long fullFrameSize;
     size_t cSize;
     unsigned jobCompleted;
     pthread_mutex_t* jobCompleted_mutex;
@@ -122,9 +123,26 @@ typedef struct {
 void ZSTDMT_compressFrame(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
-    job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel);
+    buffer_t dstBuff = job->dstBuff;
+    ZSTD_parameters const params = ZSTD_getParams(job->compressionLevel, job->fullFrameSize, 0);
+    size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, params, job->fullFrameSize);
+    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+    hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);   /* flush frame header */
+    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+    if ((job->frameID & 1) == 0) {   /* preserve frame header when it is first beginning of frame */
+        dstBuff.start = (char*)dstBuff.start + hSize;
+        dstBuff.size -= hSize;
+    } else
+        hSize = 0;
+
+    job->cSize = (job->frameID>=2) ?   /* last chunk signal */
+                 ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
+                 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
+    if (!ZSTD_isError(job->cSize)) job->cSize += hSize;
     DEBUGLOG(5, "frame %u : compressed %u bytes into %u bytes  ", (unsigned)job->frameID, (unsigned)job->srcSize, (unsigned)job->cSize);
-    pthread_mutex_lock(job->jobCompleted_mutex);
+
+_endJob:
+    PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
     job->jobCompleted = 1;
     pthread_cond_signal(job->jobCompleted_cond);
     pthread_mutex_unlock(job->jobCompleted_mutex);
@@ -254,10 +272,11 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
 
             mtctx->jobs[u].srcStart = srcStart + frameStartPos;
             mtctx->jobs[u].srcSize = frameSize;
+            mtctx->jobs[u].fullFrameSize = srcSize;
             mtctx->jobs[u].compressionLevel = compressionLevel;
             mtctx->jobs[u].dstBuff = dstBuffer;
             mtctx->jobs[u].cctx = cctx;
-            mtctx->jobs[u].frameID = u;
+            mtctx->jobs[u].frameID = (u>0) | ((u==nbFrames-1)<<1);
             mtctx->jobs[u].jobCompleted = 0;
             mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
             mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
@@ -275,7 +294,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
         for (frameID=0; frameID<nbFrames; frameID++) {
             DEBUGLOG(3, "ready to write frame %u ", frameID);
 
-            pthread_mutex_lock(&mtctx->jobCompleted_mutex);
+            PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
             while (mtctx->jobs[frameID].jobCompleted==0) {
                 DEBUGLOG(4, "waiting for jobCompleted signal from frame %u", frameID);
                 pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
index 55cc466d70377505a39eb70663123ec4618225c9..198f45eacad680eed2d14d3bbcd23812a37b0289 100644 (file)
@@ -561,10 +561,10 @@ ZSTDLIB_API size_t ZSTD_sizeof_DStream(const ZSTD_DStream* zds);
     In which case, it will "discard" the relevant memory section from its history.
 
   Finish a frame with ZSTD_compressEnd(), which will write the last block(s) and optional checksum.
-  It's possible to use a NULL,0 src content, in which case, it will write a final empty block to end the frame,
-  Without last block mark, frames will be considered unfinished (broken) by decoders.
+  It's possible to use srcSize==0, in which case, it will write a final empty block to end the frame.
+  Without last block mark, frames will be considered unfinished (corrupted) by decoders.
 
-  You can then reuse `ZSTD_CCtx` (ZSTD_compressBegin()) to compress some new frame.
+  `ZSTD_CCtx` object can be re-used (ZSTD_compressBegin()) to compress some new frame.
 */
 
 /*=====   Buffer-less streaming compression functions  =====*/
index 77d5ab6e41a793d389c65b0718d43ab8ece1ea4b..ff95ddc62c7de34cca5efa4f63b9361aaba932a6 100644 (file)
@@ -129,7 +129,7 @@ gzstd:
 
 zstdmt: CPPFLAGS += -DZSTD_PTHREAD
 zstdmt: LDFLAGS += -lpthread
-zstdmt: clean zstd
+zstdmt: zstd
 
 generate_res:
        windres/generate_res.bat
index a3c013a8bfb5972045bc7402f9d39e4e6825cd3c..40e1d4abaa8e4556d96f894f488182f797207f5a 100644 (file)
@@ -321,7 +321,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                 memcpy(compressedBuffer, srcBuffer, loadedCompressedSize);
             }
 
-#if 1
+#if 0       /* disable decompression test */
             dCompleted=1;
             (void)totalDTime; (void)fastestD; (void)crcOrig;   /*  unused when decompression disabled */
 #else
@@ -330,13 +330,14 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
 
             UTIL_sleepMilli(1); /* give processor time to other processes */
             UTIL_waitForNextTick(ticksPerSecond);
-            UTIL_getTime(&clockStart);
 
             if (!dCompleted) {
                 U64 clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
                 U32 nbLoops = 0;
+                clock_us_t clockStart;
                 ZSTD_DDict* const ddict = ZSTD_createDDict(dictBuffer, dictBufferSize);
                 if (!ddict) EXM_THROW(2, "ZSTD_createDDict() allocation failure");
+                clockStart = BMK_clockMicroSec();
                 do {
                     U32 blockNb;
                     for (blockNb=0; blockNb<nbBlocks; blockNb++) {
@@ -345,19 +346,19 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                             blockTable[blockNb].cPtr, blockTable[blockNb].cSize,
                             ddict);
                         if (ZSTD_isError(regenSize)) {
-                            DISPLAY("ZSTD_decompress_usingDDict() failed on block %u : %s  \n",
-                                      blockNb, ZSTD_getErrorName(regenSize));
+                            DISPLAY("ZSTD_decompress_usingDDict() failed on block %u of size %u : %s  \n",
+                                      blockNb, (U32)blockTable[blockNb].cSize, ZSTD_getErrorName(regenSize));
                             clockLoop = 0;   /* force immediate test end */
                             break;
                         }
                         blockTable[blockNb].resSize = regenSize;
                     }
                     nbLoops++;
-                } while (UTIL_clockSpanMicro(clockStart, ticksPerSecond) < clockLoop);
+                } while (BMK_clockMicroSec() - clockStart < clockLoop);
                 ZSTD_freeDDict(ddict);
-                {   U64 const clockSpan = UTIL_clockSpanMicro(clockStart, ticksPerSecond);
-                    if (clockSpan < fastestD*nbLoops) fastestD = clockSpan / nbLoops;
-                    totalDTime += clockSpan;
+                {   clock_us_t const clockSpanMicro = BMK_clockMicroSec() - clockStart;
+                    if (clockSpanMicro < fastestD*nbLoops) fastestD = clockSpanMicro / nbLoops;
+                    totalDTime += clockSpanMicro;
                     dCompleted = (totalDTime >= maxTime);
             }   }