]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
XP: add a pre-splitter
authorYann Collet <cyan@fb.com>
Tue, 3 Sep 2024 21:35:29 +0000 (14:35 -0700)
committerYann Collet <cyan@fb.com>
Thu, 17 Oct 2024 18:40:13 +0000 (11:40 -0700)
instead of ingesting only full blocks, make an analysis of data, and infer where to split.

lib/compress/zstd_compress.c
lib/compress/zstd_preSplit.c [new file with mode: 0644]
lib/compress/zstd_preSplit.h [new file with mode: 0644]

index aad25049d7907e47362fa7c42ae00a9f13830194..040333e5904a71fc06a5645b6272b88b51b9c0af 100644 (file)
@@ -883,7 +883,7 @@ size_t ZSTD_CCtxParams_setParameter(ZSTD_CCtx_params* CCtxParams,
             value = ZSTDMT_JOBSIZE_MIN;
         FORWARD_IF_ERROR(ZSTD_cParam_clampBounds(param, &value), "");
         assert(value >= 0);
-        CCtxParams->jobSize = value;
+        CCtxParams->jobSize = (size_t)value;
         return CCtxParams->jobSize;
 #endif
 
@@ -1002,7 +1002,8 @@ size_t ZSTD_CCtxParams_setParameter(ZSTD_CCtx_params* CCtxParams,
     case ZSTD_c_maxBlockSize:
         if (value!=0)    /* 0 ==> default */
             BOUNDCHECK(ZSTD_c_maxBlockSize, value);
-        CCtxParams->maxBlockSize = value;
+        assert(value>=0);
+        CCtxParams->maxBlockSize = (size_t)value;
         return CCtxParams->maxBlockSize;
 
     case ZSTD_c_searchForExternalRepcodes:
@@ -2444,7 +2445,8 @@ static size_t ZSTD_resetCCtx_byCopyingCDict(ZSTD_CCtx* cctx,
     }
 
     /* Zero the hashTable3, since the cdict never fills it */
-    {   int const h3log = cctx->blockState.matchState.hashLog3;
+    assert(cctx->blockState.matchState.hashLog3 <= 31);
+    {   U32 const h3log = cctx->blockState.matchState.hashLog3;
         size_t const h3Size = h3log ? ((size_t)1 << h3log) : 0;
         assert(cdict->matchState.hashLog3 == 0);
         ZSTD_memset(cctx->blockState.matchState.hashTable3, 0, h3Size * sizeof(U32));
@@ -4484,6 +4486,18 @@ static void ZSTD_overflowCorrectIfNeeded(ZSTD_matchState_t* ms,
     }
 }
 
+#include "zstd_preSplit.h"
+
+static size_t ZSTD_optimalBlockSize(const void* src, size_t srcSize, size_t blockSizeMax, ZSTD_strategy strat)
+{
+    if (srcSize <= 128 KB || blockSizeMax < 128 KB)
+        return MIN(srcSize, blockSizeMax);
+    (void)strat;
+    if (strat >= ZSTD_btlazy2)
+        return ZSTD_splitBlock_4k(src, srcSize, blockSizeMax);
+    return 92 KB;
+}
+
 /*! ZSTD_compress_frameChunk() :
 *   Compress a chunk of data into one or multiple blocks.
 *   All blocks will be terminated, all input will be consumed.
@@ -4496,7 +4510,7 @@ static size_t ZSTD_compress_frameChunk(ZSTD_CCtx* cctx,
                                const void* src, size_t srcSize,
                                      U32 lastFrameChunk)
 {
-    size_t blockSize = cctx->blockSize;
+    size_t blockSizeMax = cctx->blockSize;
     size_t remaining = srcSize;
     const BYTE* ip = (const BYTE*)src;
     BYTE* const ostart = (BYTE*)dst;
@@ -4505,20 +4519,21 @@ static size_t ZSTD_compress_frameChunk(ZSTD_CCtx* cctx,
 
     assert(cctx->appliedParams.cParams.windowLog <= ZSTD_WINDOWLOG_MAX);
 
-    DEBUGLOG(4, "ZSTD_compress_frameChunk (blockSize=%u)", (unsigned)blockSize);
+    DEBUGLOG(4, "ZSTD_compress_frameChunk (blockSizeMax=%u)", (unsigned)blockSizeMax);
     if (cctx->appliedParams.fParams.checksumFlag && srcSize)
         XXH64_update(&cctx->xxhState, src, srcSize);
 
     while (remaining) {
         ZSTD_matchState_t* const ms = &cctx->blockState.matchState;
-        U32 const lastBlock = lastFrameChunk & (blockSize >= remaining);
+        U32 const lastBlock = lastFrameChunk & (blockSizeMax >= remaining);
+        size_t blockSize = ZSTD_optimalBlockSize(ip, remaining, blockSizeMax, cctx->appliedParams.cParams.strategy);
+        assert(blockSize <= remaining);
 
         /* TODO: See 3090. We reduced MIN_CBLOCK_SIZE from 3 to 2 so to compensate we are adding
          * additional 1. We need to revisit and change this logic to be more consistent */
         RETURN_ERROR_IF(dstCapacity < ZSTD_blockHeaderSize + MIN_CBLOCK_SIZE + 1,
                         dstSize_tooSmall,
                         "not enough space to store compressed block");
-        if (remaining < blockSize) blockSize = remaining;
 
         ZSTD_overflowCorrectIfNeeded(
             ms, &cctx->workspace, &cctx->appliedParams, ip, ip + blockSize);
@@ -5022,7 +5037,7 @@ size_t ZSTD_loadCEntropy(ZSTD_compressedBlockState_t* bs, void* workspace,
                 RETURN_ERROR_IF(bs->rep[u] > dictContentSize, dictionary_corrupted, "");
     }   }   }
 
-    return dictPtr - (const BYTE*)dict;
+    return (size_t)(dictPtr - (const BYTE*)dict);
 }
 
 /* Dictionary format :
diff --git a/lib/compress/zstd_preSplit.c b/lib/compress/zstd_preSplit.c
new file mode 100644 (file)
index 0000000..eaa1c05
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) Meta Platforms, Inc. and affiliates.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#include "../common/mem.h" /* U64 */
+#include "zstd_preSplit.h"
+
+
+#define BLOCKSIZE_MIN 3500
+#define THRESHOLD_PENALTY_RATE 16
+#define THRESHOLD_BASE (THRESHOLD_PENALTY_RATE - 2)
+#define THRESHOLD_PENALTY 4
+
+#define HASHLENGTH 2
+#define HASHLOG 10
+#define HASHTABLESIZE (1 << HASHLOG)
+#define HASHMASK (HASHTABLESIZE - 1)
+#define KNUTH 0x9e3779b9
+
+static unsigned hash2(const void *p)
+{
+    return (U32)(MEM_read16(p)) * KNUTH >> (32 - HASHLOG);
+}
+
+
+/* ==================================== */
+/* Global array -> for testing only !!! */
+/* ==================================== */
+typedef struct {
+  int events[HASHTABLESIZE];
+  S64 nbEvents;
+} FingerPrint;
+static FingerPrint pastEvents = {};
+static FingerPrint newEvents = {};
+
+static void initStats(void) {
+  memset(&pastEvents, 0, sizeof(pastEvents));
+  memset(&newEvents, 0, sizeof(newEvents));
+}
+/* ==================================== */
+
+static void addToFingerprint(FingerPrint* fp, const void* src, size_t s) {
+  const char* p = src;
+  size_t limit = s - HASHLENGTH + 1;
+  assert(s >= HASHLENGTH);
+  for (size_t n = 0; n < limit; n++) {
+    fp->events[hash2(p++)]++;
+  }
+  fp->nbEvents += limit;
+}
+
+static void recordFingerprint(FingerPrint *fp, const void *src, size_t s) {
+  memset(fp, 0, sizeof(*fp));
+  addToFingerprint(fp, src, s);
+}
+
+static S64 abs64(S64 i) { return (i < 0) ? -i : i; }
+
+static S64 fpDistance(const FingerPrint *fp1, const FingerPrint *fp2) {
+  S64 distance = 0;
+  for (size_t n = 0; n < HASHTABLESIZE; n++) {
+    distance +=
+        abs64(fp1->events[n] * fp2->nbEvents - fp2->events[n] * fp1->nbEvents);
+  }
+  return distance;
+}
+
+// Compare newEvents with pastEvents
+// return 1 when considered "too different"
+// debug:write Deviation value in %
+static int compareFingerprints(const FingerPrint *ref,
+                            const FingerPrint *new,
+                            int penalty)
+{
+    if (ref->nbEvents <= BLOCKSIZE_MIN)
+        return 0;
+    {   S64 p50 = ref->nbEvents * new->nbEvents;
+        S64 deviation = fpDistance(ref, new);
+        // printf("Deviation: %.2f%% \n", (double)deviation / (double)ref * 100.);
+        S64 threshold = p50 * (THRESHOLD_BASE + penalty) / THRESHOLD_PENALTY_RATE;
+        return deviation >= threshold;
+    }
+}
+
+static void mergeEvents(FingerPrint *acc, const FingerPrint *new) {
+  for (size_t n = 0; n < HASHTABLESIZE; n++) {
+    acc->events[n] += new->events[n];
+  }
+  acc->nbEvents += new->nbEvents;
+}
+
+static void flushEvents(void) {
+  for (size_t n = 0; n < HASHTABLESIZE; n++) {
+    pastEvents.events[n] = newEvents.events[n];
+  }
+  pastEvents.nbEvents = newEvents.nbEvents;
+  memset(&newEvents, 0, sizeof(newEvents));
+}
+
+static void removeEvents(FingerPrint *acc, const FingerPrint *slice) {
+  for (size_t n = 0; n < HASHTABLESIZE; n++) {
+    assert(acc->events[n] >= slice->events[n]);
+    acc->events[n] -= slice->events[n];
+  }
+  acc->nbEvents -= slice->nbEvents;
+}
+
+#define CHUNKSIZE (8 << 10)
+/* Note: technically, we use CHUNKSIZE, so that's 8 KB */
+size_t ZSTD_splitBlock_4k(const void* src, size_t srcSize, size_t blockSizeMax)
+{
+    const char* p = src;
+    int penalty = THRESHOLD_PENALTY;
+    size_t pos = 0;
+    if (srcSize <= blockSizeMax) return srcSize;
+    assert(blockSizeMax == (128 << 10));
+
+    initStats();
+    for (pos = 0; pos < blockSizeMax;) {
+        assert(pos <= blockSizeMax - CHUNKSIZE);
+        recordFingerprint(&newEvents, p + pos, CHUNKSIZE);
+        if (compareFingerprints(&pastEvents, &newEvents, penalty)) {
+            return pos;
+        } else {
+            mergeEvents(&pastEvents, &newEvents);
+            memset(&newEvents, 0, sizeof(newEvents));
+            penalty = penalty - 1 + (penalty == 0);
+        }
+        pos += CHUNKSIZE;
+    }
+    return blockSizeMax;
+    (void)flushEvents; (void)removeEvents;
+}
diff --git a/lib/compress/zstd_preSplit.h b/lib/compress/zstd_preSplit.h
new file mode 100644 (file)
index 0000000..148fc19
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) Meta Platforms, Inc. and affiliates.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#ifndef ZSTD_PRESPLIT_H
+#define ZSTD_PRESPLIT_H
+
+#include <stddef.h>  /* size_t */
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+size_t ZSTD_splitBlock_4k(const void* src, size_t srcSize, size_t blockSizeMax);
+
+#if defined (__cplusplus)
+}
+#endif
+
+#endif /* ZSTD_PRESPLIT_H */