From: Nick Terrell Date: Wed, 14 Nov 2018 20:15:20 +0000 (-0800) Subject: [lib] Add rsyncable mode X-Git-Tag: v1.3.8~46^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b9693d3a49d4d4775b91660a0429b6bfb448dd97;p=thirdparty%2Fzstd.git [lib] Add rsyncable mode - Add rsyncable mode to multithreaded mode - Factor out LDM's hash function for reuse --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index c6d72f584..99a00701d 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -254,6 +254,7 @@ static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param) case ZSTD_p_nbWorkers: case ZSTD_p_jobSize: case ZSTD_p_overlapSizeLog: + case ZSTD_p_rsyncable: case ZSTD_p_enableLongDistanceMatching: case ZSTD_p_ldmHashLog: case ZSTD_p_ldmMinMatch: @@ -315,6 +316,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v case ZSTD_p_jobSize: case ZSTD_p_overlapSizeLog: + case ZSTD_p_rsyncable: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_enableLongDistanceMatching: @@ -441,6 +443,13 @@ size_t ZSTD_CCtxParam_setParameter( return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); #endif + case ZSTD_p_rsyncable : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_rsyncable, value); +#endif + case ZSTD_p_enableLongDistanceMatching : CCtxParams->ldmParams.enableLdm = (value>0); return CCtxParams->ldmParams.enableLdm; @@ -544,6 +553,13 @@ size_t ZSTD_CCtxParam_getParameter( #else *value = CCtxParams->overlapSizeLog; break; +#endif + case ZSTD_p_rsyncable : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + *value = CCtxParams->rsyncable; + break; #endif case ZSTD_p_enableLongDistanceMatching : *value = CCtxParams->ldmParams.enableLdm; @@ -1160,7 +1176,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashEveryLog < 32); - zc->ldmState.hashPower = ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + zc->ldmState.hashPower = ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength); } { size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize)); diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index ffbb53a78..608efd5a0 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -193,6 +193,7 @@ struct ZSTD_CCtx_params_s { unsigned nbWorkers; unsigned jobSize; unsigned overlapSizeLog; + unsigned rsyncable; /* Long distance matching parameters */ ldmParams_t ldmParams; @@ -492,6 +493,64 @@ MEM_STATIC size_t ZSTD_hashPtr(const void* p, U32 hBits, U32 mls) } } +/** ZSTD_ipow() : + * Return base^exponent. + */ +static U64 ZSTD_ipow(U64 base, U64 exponent) +{ + U64 power = 1; + while (exponent) { + if (exponent & 1) power *= base; + exponent >>= 1; + base *= base; + } + return power; +} + +#define ZSTD_ROLL_HASH_CHAR_OFFSET 10 + +/** ZSTD_rollingHash_append() : + * Add the buffer to the hash value. + */ +static U64 ZSTD_rollingHash_append(U64 hash, void const* buf, size_t size) +{ + BYTE const* istart = (BYTE const*)buf; + size_t pos; + for (pos = 0; pos < size; ++pos) { + hash *= prime8bytes; + hash += istart[pos] + ZSTD_ROLL_HASH_CHAR_OFFSET; + } + return hash; +} + +/** ZSTD_rollingHash_compute() : + * Compute the rolling hash value of the buffer. + */ +MEM_STATIC U64 ZSTD_rollingHash_compute(void const* buf, size_t size) +{ + return ZSTD_rollingHash_append(0, buf, size); +} + +/** ZSTD_rollingHash_primePower() : + * Compute the primePower to be passed to ZSTD_rollingHash_rotate() for a hash + * over a window of length bytes. + */ +MEM_STATIC U64 ZSTD_rollingHash_primePower(U32 length) +{ + return ZSTD_ipow(prime8bytes, length - 1); +} + +/** ZSTD_rollingHash_rotate() : + * Rotate the rolling hash by one byte. + */ +MEM_STATIC U64 ZSTD_rollingHash_rotate(U64 hash, BYTE toRemove, BYTE toAdd, U64 primePower) +{ + hash -= (toRemove + ZSTD_ROLL_HASH_CHAR_OFFSET) * primePower; + hash *= prime8bytes; + hash += toAdd + ZSTD_ROLL_HASH_CHAR_OFFSET; + return hash; +} + /*-************************************* * Round buffer management ***************************************/ diff --git a/lib/compress/zstd_ldm.c b/lib/compress/zstd_ldm.c index 6238ddecf..3f180ddbc 100644 --- a/lib/compress/zstd_ldm.c +++ b/lib/compress/zstd_ldm.c @@ -143,56 +143,6 @@ static void ZSTD_ldm_makeEntryAndInsertByTag(ldmState_t* ldmState, } } -/** ZSTD_ldm_getRollingHash() : - * Get a 64-bit hash using the first len bytes from buf. - * - * Giving bytes s = s_1, s_2, ... s_k, the hash is defined to be - * H(s) = s_1*(a^(k-1)) + s_2*(a^(k-2)) + ... + s_k*(a^0) - * - * where the constant a is defined to be prime8bytes. - * - * The implementation adds an offset to each byte, so - * H(s) = (s_1 + HASH_CHAR_OFFSET)*(a^(k-1)) + ... */ -static U64 ZSTD_ldm_getRollingHash(const BYTE* buf, U32 len) -{ - U64 ret = 0; - U32 i; - for (i = 0; i < len; i++) { - ret *= prime8bytes; - ret += buf[i] + LDM_HASH_CHAR_OFFSET; - } - return ret; -} - -/** ZSTD_ldm_ipow() : - * Return base^exp. */ -static U64 ZSTD_ldm_ipow(U64 base, U64 exp) -{ - U64 ret = 1; - while (exp) { - if (exp & 1) { ret *= base; } - exp >>= 1; - base *= base; - } - return ret; -} - -U64 ZSTD_ldm_getHashPower(U32 minMatchLength) { - DEBUGLOG(4, "ZSTD_ldm_getHashPower: mml=%u", minMatchLength); - assert(minMatchLength >= ZSTD_LDM_MINMATCH_MIN); - return ZSTD_ldm_ipow(prime8bytes, minMatchLength - 1); -} - -/** ZSTD_ldm_updateHash() : - * Updates hash by removing toRemove and adding toAdd. */ -static U64 ZSTD_ldm_updateHash(U64 hash, BYTE toRemove, BYTE toAdd, U64 hashPower) -{ - hash -= ((toRemove + LDM_HASH_CHAR_OFFSET) * hashPower); - hash *= prime8bytes; - hash += toAdd + LDM_HASH_CHAR_OFFSET; - return hash; -} - /** ZSTD_ldm_countBackwardsMatch() : * Returns the number of bytes that match backwards before pIn and pMatch. * @@ -261,9 +211,9 @@ static U64 ZSTD_ldm_fillLdmHashTable(ldmState_t* state, const BYTE* cur = lastHashed + 1; while (cur < iend) { - rollingHash = ZSTD_ldm_updateHash(rollingHash, cur[-1], - cur[ldmParams.minMatchLength-1], - state->hashPower); + rollingHash = ZSTD_rollingHash_rotate(rollingHash, cur[-1], + cur[ldmParams.minMatchLength-1], + state->hashPower); ZSTD_ldm_makeEntryAndInsertByTag(state, rollingHash, hBits, (U32)(cur - base), ldmParams); @@ -324,11 +274,11 @@ static size_t ZSTD_ldm_generateSequences_internal( size_t forwardMatchLength = 0, backwardMatchLength = 0; ldmEntry_t* bestEntry = NULL; if (ip != istart) { - rollingHash = ZSTD_ldm_updateHash(rollingHash, lastHashed[0], - lastHashed[minMatchLength], - hashPower); + rollingHash = ZSTD_rollingHash_rotate(rollingHash, lastHashed[0], + lastHashed[minMatchLength], + hashPower); } else { - rollingHash = ZSTD_ldm_getRollingHash(ip, minMatchLength); + rollingHash = ZSTD_rollingHash_compute(ip, minMatchLength); } lastHashed = ip; diff --git a/lib/compress/zstd_ldm.h b/lib/compress/zstd_ldm.h index 21fba4d59..5310e174d 100644 --- a/lib/compress/zstd_ldm.h +++ b/lib/compress/zstd_ldm.h @@ -86,10 +86,6 @@ size_t ZSTD_ldm_getTableSize(ldmParams_t params); */ size_t ZSTD_ldm_getMaxNbSeq(ldmParams_t params, size_t maxChunkSize); -/** ZSTD_ldm_getTableSize() : - * Return prime8bytes^(minMatchLength-1) */ -U64 ZSTD_ldm_getHashPower(U32 minMatchLength); - /** ZSTD_ldm_adjustParameters() : * If the params->hashEveryLog is not set, set it to its default value based on * windowLog and params->hashLog. diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index f4aba1d2c..43afbc1b9 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -471,7 +471,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashEveryLog < 32); serialState->ldmState.hashPower = - ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength); } else { memset(¶ms.ldmParams, 0, sizeof(params.ldmParams)); } @@ -777,6 +777,14 @@ typedef struct { static const roundBuff_t kNullRoundBuff = {NULL, 0, 0}; +#define RSYNC_LENGTH 32 + +typedef struct { + U64 hash; + U64 hitMask; + U64 primePower; +} rsyncState_t; + struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_jobDescription* jobs; @@ -790,6 +798,7 @@ struct ZSTDMT_CCtx_s { inBuff_t inBuff; roundBuff_t roundBuff; serialState_t serial; + rsyncState_t rsync; unsigned singleBlockingThread; unsigned jobIDMask; unsigned doneJobID; @@ -988,6 +997,9 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); params->overlapSizeLog = (value >= 9) ? 9 : value; return value; + case ZSTDMT_p_rsyncable : + params->rsyncable = (value == 0 ? 0 : 1); + return value; default : return ERROR(parameter_unsupported); } @@ -996,15 +1008,7 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value) { DEBUGLOG(4, "ZSTDMT_setMTCtxParameter"); - switch(parameter) - { - case ZSTDMT_p_jobSize : - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); - case ZSTDMT_p_overlapSectionLog : - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); - default : - return ERROR(parameter_unsupported); - } + return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); } size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) @@ -1016,6 +1020,9 @@ size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, case ZSTDMT_p_overlapSectionLog: *value = mtctx->params.overlapSizeLog; break; + case ZSTDMT_p_rsyncable: + *value = mtctx->params.rsyncable; + break; default: return ERROR(parameter_unsupported); } @@ -1381,6 +1388,16 @@ size_t ZSTDMT_initCStream_internal( if (mtctx->targetSectionSize == 0) { mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); } + if (params.rsyncable) { + /* Aim for the targetsectionSize as the average job size. */ + U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20); + U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20; + assert(jobSizeMB >= 1); + DEBUGLOG(4, "rsyncLog = %u", rsyncBits); + mtctx->rsync.hash = 0; + mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1; + mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH); + } if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); @@ -1818,6 +1835,80 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) return 1; } +typedef struct { + size_t toLoad; /* The number of bytes to load from the input. */ + int flush; /* Boolean declaring if we must flush because we found a synchronization point. */ +} syncPoint_t; + +/** + * Searches through the input for a synchronization point. If one is found, we + * will instruct the caller to flush, and return the number of bytes to load. + * Otherwise, we will load as many bytes as possible and instruct the caller + * to continue as normal. + */ +static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) { + BYTE const* const istart = (BYTE const*)input.src + input.pos; + U64 const primePower = mtctx->rsync.primePower; + U64 const hitMask = mtctx->rsync.hitMask; + + syncPoint_t syncPoint; + U64 hash; + BYTE const* prev; + size_t pos; + + syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + syncPoint.flush = 0; + if (!mtctx->params.rsyncable) + /* Rsync is disabled. */ + return syncPoint; + if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH) + /* Not enough to compute the hash. + * We will miss any synchronization points in this RSYNC_LENGTH byte + * window. However, since it depends only in the internal buffers, if the + * state is already synchronized, we will remain synchronized. + * Additionally, the probability that we miss a synchronization point is + * low: RSYNC_LENGTH / targetSectionSize. + */ + return syncPoint; + /* Initialize the loop variables. */ + if (mtctx->inBuff.filled >= RSYNC_LENGTH) { + /* We have enough bytes buffered to initialize the hash. + * Start scanning at the beginning of the input. + */ + pos = 0; + prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH; + hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH); + } else { + /* We don't have enough bytes buffered to initialize the hash, but + * we know we have at least RSYNC_LENGTH bytes total. + * Start scanning after the first RSYNC_LENGTH bytes less the bytes + * already buffered. + */ + pos = RSYNC_LENGTH - mtctx->inBuff.filled; + prev = (BYTE const*)mtctx->inBuff.buffer.start - pos; + hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled); + hash = ZSTD_rollingHash_append(hash, istart, pos); + } + /* Starting with the hash of the previous RSYNC_LENGTH bytes, roll + * through the input. If we hit a synchronization point, then cut the + * job off, and tell the compressor to flush the job. Otherwise, load + * all the bytes and continue as normal. + * If we go too long without a synchronization point (targetSectionSize) + * then a block will be emitted anyways, but this is okay, since if we + * are already synchronized we will remain synchronized. + */ + for (; pos < syncPoint.toLoad; ++pos) { + BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH]; + /* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */ + hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower); + if ((hash & hitMask) == hitMask) { + syncPoint.toLoad = pos + 1; + syncPoint.flush = 1; + break; + } + } + return syncPoint; +} /** ZSTDMT_compressStream_generic() : * internal use only - exposed to be invoked from zstd_compress.c @@ -1844,7 +1935,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* single-pass shortcut (note : synchronous-mode) */ - if ( (mtctx->nextJobID == 0) /* just started */ + if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */ + && (mtctx->nextJobID == 0) /* just started */ && (mtctx->inBuff.filled == 0) /* nothing buffered */ && (!mtctx->jobReady) /* no job already created */ && (endOp == ZSTD_e_end) /* end order */ @@ -1876,14 +1968,17 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); } if (mtctx->inBuff.buffer.start != NULL) { - size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input); + if (syncPoint.flush && endOp == ZSTD_e_continue) { + endOp = ZSTD_e_flush; + } assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", - (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); - memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); - input->pos += toLoad; - mtctx->inBuff.filled += toLoad; - forwardInputProgress = toLoad>0; + (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); + memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); + input->pos += syncPoint.toLoad; + mtctx->inBuff.filled += syncPoint.toLoad; + forwardInputProgress = syncPoint.toLoad>0; } if ((input->pos < input->size) && (endOp == ZSTD_e_end)) endOp = ZSTD_e_flush; /* can't end now : not all input consumed */ diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 12ad9f899..b6bcb9e5d 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -85,7 +85,8 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ typedef enum { ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */ - ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ + ZSTDMT_p_overlapSectionLog, /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ + ZSTDMT_p_rsyncable /* Enables rsyncable mode. */ } ZSTDMT_parameter; /* ZSTDMT_setMTCtxParameter() : diff --git a/lib/zstd.h b/lib/zstd.h index c7e9215da..6eb2dd835 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1149,6 +1149,27 @@ typedef enum { * enum. See the comments on that enum for an * explanation of the feature. */ + ZSTD_p_rsyncable, /* Enables rsyncable mode, which makes compressed + * files more rsync friendly by adding periodic + * synchronization points to the compressed data. + * The target average block size is + * ZSTD_p_jobSize / 2. You can modify the job size + * to increase or decrease the granularity of the + * synchronization point. Once the jobSize is + * smaller than the window size, you will start to + * see degraded compression ratio. + * NOTE: This only works when multithreading is + * enabled. + * NOTE: You probably don't want to use this with + * long range mode, since that will decrease the + * effectiveness of the synchronization points, + * but your milage may vary. + * NOTE: Rsyncable mode will limit the maximum + * compression speed to approximately 400 MB/s. + * If your compression level is already running + * significantly slower than that (< 200 MB/s), + * the speed won't be significantly impacted. + */ } ZSTD_cParameter;