]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstd cli can now compress using multi-threading
authorYann Collet <cyan@fb.com>
Fri, 20 Jan 2017 00:59:56 +0000 (16:59 -0800)
committerYann Collet <cyan@fb.com>
Fri, 20 Jan 2017 01:04:28 +0000 (17:04 -0800)
added : command -T#
added : ZSTD_resetCStream() (zstdmt_compress)
added : FIO_setNbThreads()  (fileio)

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
programs/Makefile
programs/bench.c
programs/bench.h
programs/fileio.c
programs/fileio.h
programs/zstdcli.c

index dd1fd3452b49412ac3b57bb849a8d4884b38150e..de503227103d9543b3aaf4b658e50065a8663b00 100644 (file)
@@ -443,6 +443,13 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t di
     return 0;
 }
 
+/* ZSTDMT_resetCStream() :
+ * pledgedSrcSize is optional and can be zero == unknown */
+size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
+{
+    return ZSTDMT_initCStream_advanced(zcs, zcs->dict, zcs->dictSize, zcs->params, pledgedSrcSize);
+}
+
 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
     return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0);
index d1b01a79e07039f5130abba66d4373ba19b81b6d..759906db135bcfa4502ae6c1268c5e1d246cc61c 100644 (file)
@@ -20,6 +20,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
 /* ===   Streaming functions   === */
 
 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
+size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize);   /**< pledgedSrcSize is optional and can be zero == unknown */
 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
                                    ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */
 
index 02c924f39aa51ab99086b1c78bccaebafb948706..f2a0ff26e5997d4828683238d12bea6010990c72 100644 (file)
@@ -22,7 +22,7 @@ else
 ALIGN_LOOP =
 endif
 
-CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder
+CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder
 CFLAGS  ?= -O3
 CFLAGS  += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
           -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \
index 5299b47120120b9910ec6b3fb33f569fc30510ba..74d26ca061d895fe17610f6ffd2646fac47e0315 100644 (file)
@@ -9,6 +9,14 @@
 
 
 
+/* **************************************
+*  Tuning parameters
+****************************************/
+#ifndef BMK_TIMETEST_DEFAULT_S   /* default minimum time per test */
+#define BMK_TIMETEST_DEFAULT_S 3
+#endif
+
+
 /* **************************************
 *  Compiler Warnings
 ****************************************/
@@ -43,7 +51,6 @@
 #  define ZSTD_GIT_COMMIT_STRING ZSTD_EXPAND_AND_QUOTE(ZSTD_GIT_COMMIT)
 #endif
 
-#define NBSECONDS             3
 #define TIMELOOP_MICROSEC     1*1000000ULL /* 1 second */
 #define ACTIVEPERIOD_MICROSEC 70*1000000ULL /* 70 seconds */
 #define COOLPERIOD_SEC        10
@@ -109,31 +116,36 @@ static clock_us_t BMK_clockMicroSec(void)
 /* *************************************
 *  Benchmark Parameters
 ***************************************/
-static U32 g_nbSeconds = NBSECONDS;
-static size_t g_blockSize = 0;
 static int g_additionalParam = 0;
 static U32 g_decodeOnly = 0;
-static U32 g_nbThreads = 1;
 
 void BMK_setNotificationLevel(unsigned level) { g_displayLevel=level; }
 
 void BMK_setAdditionalParam(int additionalParam) { g_additionalParam=additionalParam; }
 
-void BMK_SetNbSeconds(unsigned nbSeconds)
+static U32 g_nbSeconds = BMK_TIMETEST_DEFAULT_S;
+void BMK_setNbSeconds(unsigned nbSeconds)
 {
     g_nbSeconds = nbSeconds;
-    DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression -\n", g_nbSeconds);
+    DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression - \n", g_nbSeconds);
 }
 
-void BMK_SetBlockSize(size_t blockSize)
+static size_t g_blockSize = 0;
+void BMK_setBlockSize(size_t blockSize)
 {
     g_blockSize = blockSize;
-    DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
+    if (g_blockSize) DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
 }
 
 void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
 
-void BMK_SetNbThreads(unsigned nbThreads) { g_nbThreads = nbThreads; }
+static U32 g_nbThreads = 1;
+void BMK_setNbThreads(unsigned nbThreads) {
+#ifndef ZSTD_MULTITHREAD
+    if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+#endif
+    g_nbThreads = nbThreads;
+}
 
 
 /* ********************************************************
index 87850bcc30ec26f67753530e1d1fc47ac261827e..2918c02bf1114fece509216bce7a34b0722f063f 100644 (file)
@@ -19,9 +19,9 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dic
                    int cLevel, int cLevelLast, ZSTD_compressionParameters* compressionParams);
 
 /* Set Parameters */
-void BMK_SetNbSeconds(unsigned nbLoops);
-void BMK_SetBlockSize(size_t blockSize);
-void BMK_SetNbThreads(unsigned nbThreads);
+void BMK_setNbSeconds(unsigned nbLoops);
+void BMK_setBlockSize(size_t blockSize);
+void BMK_setNbThreads(unsigned nbThreads);
 void BMK_setNotificationLevel(unsigned level);
 void BMK_setAdditionalParam(int additionalParam);
 void BMK_setDecodeOnlyMode(unsigned decodeFlag);
index a112cc0499b6d3ffe54e974f6a1f39c89c8f239b..3864a5fabfd22962a5010d1e1e288b927291f70e 100644 (file)
 #include "fileio.h"
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
 #include "zstd.h"
-#ifdef ZSTD_GZDECOMPRESS
-#include "zlib.h"
-#if !defined(z_const)
-    #define z_const
+#ifdef ZSTD_MULTITHREAD
+#  include "zstdmt_compress.h"
 #endif
+#ifdef ZSTD_GZDECOMPRESS
+#  include "zlib.h"
+#  if !defined(z_const)
+#    define z_const
+#  endif
 #endif
 
 
@@ -103,7 +106,13 @@ static U32 g_removeSrcFile = 0;
 void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
 static U32 g_memLimit = 0;
 void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
-
+static U32 g_nbThreads = 1;
+void FIO_setNbThreads(unsigned nbThreads) {
+#ifndef ZSTD_MULTITHREAD
+    if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+#endif
+    g_nbThreads = nbThreads;
+}
 
 
 /*-*************************************
@@ -226,22 +235,30 @@ static size_t FIO_loadFile(void** bufferPtr, const char* fileName)
 *  Compression
 ************************************************************************/
 typedef struct {
+    FILE* srcFile;
+    FILE* dstFile;
     void*  srcBuffer;
     size_t srcBufferSize;
     void*  dstBuffer;
     size_t dstBufferSize;
+#ifdef ZSTD_MULTITHREAD
+    ZSTDMT_CCtx* cctx;
+#else
     ZSTD_CStream* cctx;
-    FILE* dstFile;
-    FILE* srcFile;
+#endif
 } cRess_t;
 
-static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, 
+static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
                                     U64 srcSize, ZSTD_compressionParameters* comprParams)
 {
     cRess_t ress;
     memset(&ress, 0, sizeof(ress));
 
+#ifdef ZSTD_MULTITHREAD
+    ress.cctx = ZSTDMT_createCCtx(g_nbThreads);
+#else
     ress.cctx = ZSTD_createCStream();
+#endif
     if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
     ress.srcBufferSize = ZSTD_CStreamInSize();
     ress.srcBuffer = malloc(ress.srcBufferSize);
@@ -264,7 +281,11 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
             if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength;
             if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength;
             if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
+#ifdef ZSTD_MULTITHREAD
+            {   size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+#else
             {   size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+#endif
                 if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
         }   }
         free(dictBuffer);
@@ -277,7 +298,11 @@ static void FIO_freeCResources(cRess_t ress)
 {
     free(ress.srcBuffer);
     free(ress.dstBuffer);
+#ifdef ZSTD_MULTITHREAD
+    ZSTDMT_freeCCtx(ress.cctx);
+#else
     ZSTD_freeCStream(ress.cctx);   /* never fails */
+#endif
 }
 
 
@@ -296,7 +321,11 @@ static int FIO_compressFilename_internal(cRess_t ress,
     U64 const fileSize = UTIL_getFileSize(srcFileName);
 
     /* init */
+#ifdef ZSTD_MULTITHREAD
+    {   size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize);
+#else
     {   size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize);
+#endif
         if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError));
     }
 
@@ -311,11 +340,14 @@ static int FIO_compressFilename_internal(cRess_t ress,
         /* Compress using buffered streaming */
         {   ZSTD_inBuffer  inBuff = { ress.srcBuffer, inSize, 0 };
             ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
-            { size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
-              if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); }
-            if (inBuff.pos != inBuff.size)
-                /* inBuff should be entirely consumed since buffer sizes are recommended ones */
-                EXM_THROW(24, "Compression error : input block not fully consumed");
+            while (inBuff.pos != inBuff.size) {   /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
+#ifdef ZSTD_MULTITHREAD
+                size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
+#else
+                size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
+#endif
+                if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
+            }
 
             /* Write cBlock */
             { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
@@ -326,13 +358,19 @@ static int FIO_compressFilename_internal(cRess_t ress,
     }
 
     /* End of Frame */
-    {   ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
-        size_t const result = ZSTD_endStream(ress.cctx, &outBuff);
-        if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end");
-
-        { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
-          if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
-        compressedfilesize += outBuff.pos;
+    {   size_t result = 1;
+        while (result!=0) {   /* note : is there any possibility of endless loop ? */
+            ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+#ifdef ZSTD_MULTITHREAD
+            result = ZSTDMT_endStream(ress.cctx, &outBuff);
+#else
+            result = ZSTD_endStream(ress.cctx, &outBuff);
+#endif
+            if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result));
+            { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
+              if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
+            compressedfilesize += outBuff.pos;
+        }
     }
 
     /* Status */
@@ -632,7 +670,7 @@ unsigned long long FIO_decompressFrame(dRess_t* ress,
         if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint));
 
         /* Write block */
-        storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);       
+        storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
         frameSize += outBuff.pos;
         DISPLAYUPDATE(2, "\rDecoded : %u MB...     ", (U32)((alreadyDecoded+frameSize)>>20) );
 
index b716583342333a6012197ac0a84280a587a72a0e..9ef4492945d141b32a66cdba7c535bec01f1d7e4 100644 (file)
@@ -40,6 +40,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag);
 void FIO_setChecksumFlag(unsigned checksumFlag);
 void FIO_setRemoveSrcFile(unsigned flag);
 void FIO_setMemLimit(unsigned memLimit);
+void FIO_setNbThreads(unsigned nbThreads);
 
 
 /*-*************************************
index c9d8100eb79e57d80ca4b6441fd1f30bc8078272..de25d0f0ac6781f1ac692347778984238d770b07 100644 (file)
@@ -110,12 +110,15 @@ static int usage_advanced(const char* programName)
     DISPLAY( " -q     : suppress warnings; specify twice to suppress errors too\n");
     DISPLAY( " -c     : force write to standard output, even if it is the console\n");
 #ifdef UTIL_HAS_CREATEFILELIST
-    DISPLAY( " -r     : operate recursively on directories\n");
+    DISPLAY( " -r     : operate recursively on directories \n");
 #endif
 #ifndef ZSTD_NOCOMPRESS
     DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
     DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
-    DISPLAY( "--[no-]check : integrity check (default:enabled)\n");
+    DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
+#ifdef ZSTD_MULTITHREAD
+    DISPLAY( " -T#    : use # threads for compression (default:1) \n");
+#endif
 #endif
 #ifndef ZSTD_NODECOMPRESS
     DISPLAY( "--test  : test compressed file integrity \n");
@@ -233,7 +236,10 @@ int main(int argCount, const char* argv[])
         nextArgumentIsDictID=0,
         nextArgumentsAreFiles=0,
         ultra=0,
-        lastCommand = 0;
+        lastCommand = 0,
+        nbThreads = 1;
+    unsigned bench_nbSeconds = 3;   /* would be better if this value was synchronized from bench */
+    size_t blockSize = 0;
     zstd_operation_mode operation = zom_compress;
     ZSTD_compressionParameters compressionParams;
     int cLevel = ZSTDCLI_CLEVEL_DEFAULT;
@@ -396,39 +402,37 @@ int main(int argCount, const char* argv[])
 
 #ifndef ZSTD_NOBENCH
                         /* Benchmark */
-                    case 'b': operation=zom_bench; argument++; break;
+                    case 'b':
+                        operation=zom_bench;
+                        argument++;
+                        break;
 
                         /* range bench (benchmark only) */
                     case 'e':
-                            /* compression Level */
-                            argument++;
-                            cLevelLast = readU32FromChar(&argument);
-                            break;
+                        /* compression Level */
+                        argument++;
+                        cLevelLast = readU32FromChar(&argument);
+                        break;
 
                         /* Modify Nb Iterations (benchmark only) */
                     case 'i':
                         argument++;
-                        {   U32 const iters = readU32FromChar(&argument);
-                            BMK_setNotificationLevel(displayLevel);
-                            BMK_SetNbSeconds(iters);
-                        }
+                        bench_nbSeconds = readU32FromChar(&argument);
                         break;
 
                         /* cut input into blocks (benchmark only) */
                     case 'B':
                         argument++;
-                        {   size_t const bSize = readU32FromChar(&argument);
-                            BMK_setNotificationLevel(displayLevel);
-                            BMK_SetBlockSize(bSize);
-                        }
+                        blockSize = readU32FromChar(&argument);
                         break;
 
+#endif   /* ZSTD_NOBENCH */
+
                         /* nb of threads (hidden option) */
                     case 'T':
                         argument++;
-                        BMK_SetNbThreads(readU32FromChar(&argument));
+                        nbThreads = readU32FromChar(&argument);
                         break;
-#endif   /* ZSTD_NOBENCH */
 
                         /* Dictionary Selection level */
                     case 's':
@@ -518,6 +522,9 @@ int main(int argCount, const char* argv[])
     if (operation==zom_bench) {
 #ifndef ZSTD_NOBENCH
         BMK_setNotificationLevel(displayLevel);
+        BMK_setBlockSize(blockSize);
+        BMK_setNbThreads(nbThreads);
+        BMK_setNbSeconds(bench_nbSeconds);
         BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast, &compressionParams);
 #endif
         goto _end;
@@ -569,6 +576,7 @@ int main(int argCount, const char* argv[])
     FIO_setNotificationLevel(displayLevel);
     if (operation==zom_compress) {
 #ifndef ZSTD_NOCOMPRESS
+        FIO_setNbThreads(nbThreads);
         if ((filenameIdx==1) && outFileName)
           operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
         else