From: Yann Collet Date: Mon, 23 Jan 2017 19:43:51 +0000 (-0800) Subject: refactor ZSTDMT streaming flush code X-Git-Tag: v1.1.3^2~19^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=94364bf87a320fed426b2312adbb74e403dc62e5;p=thirdparty%2Fzstd.git refactor ZSTDMT streaming flush code now shared by both ZSTDMT_compressStream() and ZSTDMT_flushStream() --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 135d274f8..9e7754b88 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -244,8 +244,8 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_invalidateRepCodes(job->cctx); } - DEBUGLOG(3, "Compressing : "); - DEBUG_PRINTHEX(3, job->srcStart, 12); + DEBUGLOG(4, "Compressing : "); + DEBUG_PRINTHEX(4, job->srcStart, 12); job->cSize = (job->lastChunk) ? /* last chunk signal */ ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); @@ -516,6 +516,54 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { } +/* ZSTDMT_flushNextJob() : + * output : will be updated with amount of data flushed . + * blockToFlush : the function will block and wait if there is no data available to flush . + * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */ +static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) +{ + unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; + if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ + PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + while (zcs->jobs[wJobID].jobCompleted==0) { + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */ + if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ + pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + } + pthread_mutex_unlock(&zcs->jobCompleted_mutex); + /* compression job completed : output can be flushed */ + { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; + size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); + zcs->jobs[wJobID].cctx = NULL; + ZSTDMT_releaseBuffer(zcs->buffPool, job.src); + zcs->jobs[wJobID].srcStart = NULL; + zcs->jobs[wJobID].src = g_nullBuffer; + if (ZSTD_isError(job.cSize)) { + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; + } + memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + output->pos += toWrite; + job.dstFlushed += toWrite; + if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); + zcs->jobs[wJobID].dstBuff = g_nullBuffer; + zcs->jobs[wJobID].jobCompleted = 0; + zcs->doneJobID++; + } else { + zcs->jobs[wJobID].dstFlushed = job.dstFlushed; + } + /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ + if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); + if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ + zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ + return 0; /* everything flushed */ +} } + + size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ @@ -579,6 +627,8 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu } /* check if there is any data available to flush */ + ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */ +#if 0 { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; unsigned jobCompleted; pthread_mutex_lock(&zcs->jobCompleted_mutex); @@ -611,7 +661,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->jobs[jobID].jobCompleted = 0; zcs->doneJobID++; } } } - +#endif /* recommended next input size : fill current input buffer */ return zcs->inBuffSize - zcs->inBuff.filled; } @@ -671,25 +721,27 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->frameEnded = 1; } - DEBUGLOG(1, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); + DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ zcs->nextJobID++; } /* check if there is any data available to flush */ - DEBUGLOG(1, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID); - if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ + DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID); + return ZSTDMT_flushNextJob(zcs, output, 1); + +#if 0 { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); } pthread_mutex_unlock(&zcs->jobCompleted_mutex); - { /* job completed : output can be flushed */ - ZSTDMT_jobDescription job = zcs->jobs[wJobID]; + /* compression job completed : output can be flushed */ + { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(1, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer; if (ZSTD_isError(job.cSize)) { @@ -713,6 +765,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->allJobsCompleted = zcs->frameEnded; return 0; } } +#endif }