From: Yann Collet Date: Wed, 11 Jan 2017 17:21:25 +0000 (+0100) Subject: ZSTDMT_compress() creates a single frame X-Git-Tag: v1.1.3^2~19^2~47 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5eb749e734120c3b50c4e434b45728ac7cdcc451;p=thirdparty%2Fzstd.git ZSTDMT_compress() creates a single frame The new strategy involves cutting frame at block level. The result is a single frame, preserving ZSTD_getDecompressedSize() As a consequence, bench can now make a full round-trip, since the result is compatible with ZSTD_decompress(). This strategy will not make it possible to decode the frame with multiple threads since the exact cut between independent blocks is not known. MT decoding needs further discussions. --- diff --git a/Makefile b/Makefile index 19b12d0ef..0a3634c39 100644 --- a/Makefile +++ b/Makefile @@ -88,7 +88,7 @@ travis-install: $(MAKE) install PREFIX=~/install_test_dir gpptest: clean - $(MAKE) -C programs all CC=g++ CFLAGS="-O3 -Wall -Wextra -Wundef -Wshadow -Wcast-align -Werror" + CC=g++ $(MAKE) -C programs all CFLAGS="-O3 -Wall -Wextra -Wundef -Wshadow -Wcast-align -Werror" gcc5test: clean gcc-5 -v diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 7626b33a6..c4dbb6ced 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -2408,12 +2408,14 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx, cctx->nextSrc = ip + srcSize; - { size_t const cSize = frame ? + if (srcSize) { + size_t const cSize = frame ? ZSTD_compress_generic (cctx, dst, dstCapacity, src, srcSize, lastFrameChunk) : ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize); if (ZSTD_isError(cSize)) return cSize; return cSize + fhSize; - } + } else + return fhSize; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 8471b7509..ae986468b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -28,8 +28,8 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \ unsigned long long elapsedTime = (afterTime-beforeTime); \ if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ - DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \ - (long int) pthread_self(), elapsedTime, #mutex); \ + DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \ + elapsedTime, #mutex); \ } \ } else pthread_mutex_lock(mutex); @@ -112,6 +112,7 @@ typedef struct { buffer_t dstBuff; int compressionLevel; unsigned frameID; + unsigned long long fullFrameSize; size_t cSize; unsigned jobCompleted; pthread_mutex_t* jobCompleted_mutex; @@ -122,9 +123,26 @@ typedef struct { void ZSTDMT_compressFrame(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; - job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel); + buffer_t dstBuff = job->dstBuff; + ZSTD_parameters const params = ZSTD_getParams(job->compressionLevel, job->fullFrameSize, 0); + size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, params, job->fullFrameSize); + if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } + hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); /* flush frame header */ + if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } + if ((job->frameID & 1) == 0) { /* preserve frame header when it is first beginning of frame */ + dstBuff.start = (char*)dstBuff.start + hSize; + dstBuff.size -= hSize; + } else + hSize = 0; + + job->cSize = (job->frameID>=2) ? /* last chunk signal */ + ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : + ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); + if (!ZSTD_isError(job->cSize)) job->cSize += hSize; DEBUGLOG(5, "frame %u : compressed %u bytes into %u bytes ", (unsigned)job->frameID, (unsigned)job->srcSize, (unsigned)job->cSize); - pthread_mutex_lock(job->jobCompleted_mutex); + +_endJob: + PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; pthread_cond_signal(job->jobCompleted_cond); pthread_mutex_unlock(job->jobCompleted_mutex); @@ -254,10 +272,11 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].srcStart = srcStart + frameStartPos; mtctx->jobs[u].srcSize = frameSize; + mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].compressionLevel = compressionLevel; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; - mtctx->jobs[u].frameID = u; + mtctx->jobs[u].frameID = (u>0) | ((u==nbFrames-1)<<1); mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; @@ -275,7 +294,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, for (frameID=0; frameIDjobCompleted_mutex); + PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); while (mtctx->jobs[frameID].jobCompleted==0) { DEBUGLOG(4, "waiting for jobCompleted signal from frame %u", frameID); pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); diff --git a/lib/zstd.h b/lib/zstd.h index 55cc466d7..198f45eac 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -561,10 +561,10 @@ ZSTDLIB_API size_t ZSTD_sizeof_DStream(const ZSTD_DStream* zds); In which case, it will "discard" the relevant memory section from its history. Finish a frame with ZSTD_compressEnd(), which will write the last block(s) and optional checksum. - It's possible to use a NULL,0 src content, in which case, it will write a final empty block to end the frame, - Without last block mark, frames will be considered unfinished (broken) by decoders. + It's possible to use srcSize==0, in which case, it will write a final empty block to end the frame. + Without last block mark, frames will be considered unfinished (corrupted) by decoders. - You can then reuse `ZSTD_CCtx` (ZSTD_compressBegin()) to compress some new frame. + `ZSTD_CCtx` object can be re-used (ZSTD_compressBegin()) to compress some new frame. */ /*===== Buffer-less streaming compression functions =====*/ diff --git a/programs/Makefile b/programs/Makefile index 77d5ab6e4..ff95ddc62 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -129,7 +129,7 @@ gzstd: zstdmt: CPPFLAGS += -DZSTD_PTHREAD zstdmt: LDFLAGS += -lpthread -zstdmt: clean zstd +zstdmt: zstd generate_res: windres/generate_res.bat diff --git a/programs/bench.c b/programs/bench.c index a3c013a8b..40e1d4aba 100644 --- a/programs/bench.c +++ b/programs/bench.c @@ -321,7 +321,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, memcpy(compressedBuffer, srcBuffer, loadedCompressedSize); } -#if 1 +#if 0 /* disable decompression test */ dCompleted=1; (void)totalDTime; (void)fastestD; (void)crcOrig; /* unused when decompression disabled */ #else @@ -330,13 +330,14 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, UTIL_sleepMilli(1); /* give processor time to other processes */ UTIL_waitForNextTick(ticksPerSecond); - UTIL_getTime(&clockStart); if (!dCompleted) { U64 clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U32 nbLoops = 0; + clock_us_t clockStart; ZSTD_DDict* const ddict = ZSTD_createDDict(dictBuffer, dictBufferSize); if (!ddict) EXM_THROW(2, "ZSTD_createDDict() allocation failure"); + clockStart = BMK_clockMicroSec(); do { U32 blockNb; for (blockNb=0; blockNb= maxTime); } }