]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Optimized ZSTDMT single-pass mode speed on large sources
authorYann Collet <cyan@fb.com>
Fri, 30 Jun 2017 22:44:57 +0000 (15:44 -0700)
committerYann Collet <cyan@fb.com>
Fri, 30 Jun 2017 22:44:57 +0000 (15:44 -0700)
by ensuring job sizes remain "not too large"

lib/compress/zstdmt_compress.c

index f7ee7502db1399f25422cb5f5be9034a8a8273c8..c96ef48255977b2c33bb8ae99922dcfb52e285a9 100644 (file)
@@ -331,14 +331,20 @@ struct ZSTDMT_CCtx_s {
     const ZSTD_CDict* cdict;
 };
 
+static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
+{
+    U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
+    U32 const nbJobs = 1 << nbJobsLog2;
+    *nbJobsPtr = nbJobs;
+    return (ZSTDMT_jobDescription*) ZSTD_calloc(
+                            nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
+}
+
 ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
 {
     ZSTDMT_CCtx* mtctx;
-    U32 const minNbJobs = nbThreads + 2;
-    U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
-    U32 const nbJobs = 1 << nbJobsLog2;
-    DEBUGLOG(5, "nbThreads: %u ; minNbJobs: %u ; nbJobsLog2: %u ; nbJobs: %u",
-                nbThreads, minNbJobs, nbJobsLog2, nbJobs);
+    U32 nbJobs = nbThreads + 2;
+    DEBUGLOG(3, "ZSTDMT_createCCtx_advanced");
 
     if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
     if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
@@ -349,13 +355,12 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
     if (!mtctx) return NULL;
     mtctx->cMem = cMem;
     mtctx->nbThreads = nbThreads;
-    mtctx->jobIDMask = nbJobs - 1;
     mtctx->allJobsCompleted = 1;
     mtctx->sectionSize = 0;
     mtctx->overlapRLog = 3;
     mtctx->factory = POOL_create(nbThreads, 1);
-    mtctx->jobs = (ZSTDMT_jobDescription*)ZSTD_calloc(
-        nbJobs * sizeof(*mtctx->jobs), cMem);
+    mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
+    mtctx->jobIDMask = nbJobs - 1;
     mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem);
     mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
     if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) {
@@ -448,24 +453,39 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
                            unsigned overlapRLog)
 {
     size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
-    size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2);
-    unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1;
-    unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads);
+    size_t const chunkSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
+    size_t const chunkMaxSize = chunkSizeTarget << 2;
+    size_t const passSizeMax = chunkMaxSize * mtctx->nbThreads;
+    unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
+    unsigned nbChunksLarge = multiplier * mtctx->nbThreads;
+    unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1;
+    unsigned nbChunksSmall = MIN(nbChunksMax, mtctx->nbThreads);
+    unsigned nbChunks = (multiplier>1) ? nbChunksLarge : nbChunksSmall;
     size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
-    size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize;   /* avoid too small last block */
-    size_t remainingSrcSize = srcSize;
+    size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize;   /* avoid too small last block */
     const char* const srcStart = (const char*)src;
+    size_t remainingSrcSize = srcSize;
     unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize));  /* presumes avgChunkSize >= 256 KB, which should be the case */
     size_t frameStartPos = 0, dstBufferPos = 0;
 
-    DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes  ", params.cParams.windowLog, (U32)chunkTargetSize);
+    DEBUGLOG(4, "windowLog : %2u => chunkSizeTarget : %u bytes  ", params.cParams.windowLog, (U32)chunkSizeTarget);
     DEBUGLOG(4, "nbChunks  : %2u   (chunkSize : %u bytes)   ", nbChunks, (U32)avgChunkSize);
+    assert(avgChunkSize >= 256 KB);  /* required for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B) */
 
     if (nbChunks==1) {   /* fallback to single-thread mode */
         ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
         return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
     }
 
+    if (nbChunks > mtctx->jobIDMask+1) {  /* enlarge job table */
+        U32 nbJobs = nbChunks;
+        ZSTD_free(mtctx->jobs, mtctx->cMem);
+        mtctx->jobIDMask = 0;
+        mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
+        if (mtctx->jobs==NULL) return ERROR(memory_allocation);
+        mtctx->jobIDMask = nbJobs - 1;
+    }
+
     {   unsigned u;
         for (u=0; u<nbChunks; u++) {
             size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
@@ -489,7 +509,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
             mtctx->jobs[u].fullFrameSize = srcSize;
             mtctx->jobs[u].params = params;
             /* do not calculate checksum within sections, but write it in header for first section */
-            if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0;
+            if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
             mtctx->jobs[u].dstBuff = dstBuffer;
             mtctx->jobs[u].cctx = cctx;
             mtctx->jobs[u].firstChunk = (u==0);
@@ -837,11 +857,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     if (mtctx->nbThreads==1) {
         return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
     }
+
+    /* single-pass shortcut (note : this is blocking-mode) */
     if ( (mtctx->nextJobID==0)      /* just started */
       && (mtctx->inBuff.filled==0)  /* nothing buffered yet */
       && (endOp==ZSTD_e_end)        /* end order, immediately at beginning */
-      && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos))  /* enough room */
-      && (mtctx->cdict==NULL) ) {   /* no dictionary */
+      && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
         size_t const cSize = ZSTDMT_compress_advanced(mtctx,
                 (char*)output->dst + output->pos, output->size - output->pos,
                 (const char*)input->src + input->pos, input->size - input->pos,