From: Yann Collet Date: Wed, 25 Jan 2017 01:41:49 +0000 (-0800) Subject: refactor job creation X-Git-Tag: v1.1.3^2~19^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f14a669054dc5bc88ed6ecae31c1304d6d10e75f;p=thirdparty%2Fzstd.git refactor job creation code shared accross ZSTDMT_{compress,flush,end}Stream(), for easier maintenance --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 1baccf0fc..e57908078 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -536,10 +536,71 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { } +static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) +{ + size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); + unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + + if ((cctx==NULL) || (dstBuffer.start==NULL)) { + zcs->jobs[jobID].jobCompleted = 1; + zcs->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return ERROR(memory_allocation); + } + + DEBUGLOG(4, "preparing job %u to compress %u bytes \n", zcs->nextJobID, (U32)srcSize); + zcs->jobs[jobID].src = zcs->inBuff.buffer; + zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; + zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].params = zcs->params; + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ + zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; + zcs->jobs[jobID].dict = NULL; + zcs->jobs[jobID].dictSize = 0; + zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; + zcs->jobs[jobID].dstBuff = dstBuffer; + zcs->jobs[jobID].cctx = cctx; + zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); + zcs->jobs[jobID].lastChunk = endFrame; + zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].dstFlushed = 0; + zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; + zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + + /* get a new buffer for next input */ + if (!endFrame) { + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); + if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ + zcs->jobs[jobID].jobCompleted = 1; + zcs->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return ERROR(memory_allocation); + } + zcs->inBuff.filled -= srcSize; + memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + srcSize, zcs->inBuff.filled); + } else { + zcs->inBuff.buffer = g_nullBuffer; + zcs->inBuff.filled = 0; + zcs->frameEnded = 1; + if (zcs->nextJobID == 0) + zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ + } + + DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); + POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ + zcs->nextJobID++; + return 0; +} + + /* ZSTDMT_flushNextJob() : * output : will be updated with amount of data flushed . * blockToFlush : if >0, the function will block and wait if there is no data available to flush . - * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */ + * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; @@ -613,57 +674,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ - size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); - buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); - unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - - if ((cctx==NULL) || (dstBuffer.start==NULL)) { /* cannot get resources for next job */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); - } - - DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize); - zcs->jobs[jobID].src = zcs->inBuff.buffer; - zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; - zcs->jobs[jobID].srcSize = zcs->targetSectionSize; - zcs->jobs[jobID].params = zcs->params; - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ - zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].dict = NULL; - zcs->jobs[jobID].dictSize = 0; - zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; - zcs->jobs[jobID].dstBuff = dstBuffer; - zcs->jobs[jobID].cctx = cctx; - zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); - zcs->jobs[jobID].lastChunk = 0; - zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; - zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; - - /* get a new buffer for next input - save remaining into it */ - zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); - if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); - } - zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize); - memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled); - - DEBUGLOG(3, "posting job %u (%u bytes) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); - POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* This call is blocking if all workers are busy */ - zcs->nextJobID++; + CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); } /* check for data to flush */ - ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */ + CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */ /* recommended next input size : fill current input buffer */ return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ @@ -677,59 +692,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { - size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); - buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); - unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - - if ((cctx==NULL) || (dstBuffer.start==NULL)) { - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); - } - - zcs->jobs[jobID].src = zcs->inBuff.buffer; - zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; - zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].params = zcs->params; - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ - zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].dict = NULL; - zcs->jobs[jobID].dictSize = 0; - zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; - zcs->jobs[jobID].dstBuff = dstBuffer; - zcs->jobs[jobID].cctx = cctx; - zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); - zcs->jobs[jobID].lastChunk = endFrame; - zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; - zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; - - /* get a new buffer for next input */ - if (!endFrame) { - zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); - zcs->inBuff.filled = 0; - if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); - } - } else { - zcs->inBuff.buffer = g_nullBuffer; - zcs->inBuff.filled = 0; - zcs->frameEnded = 1; - if (zcs->nextJobID == 0) - zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ - } - - DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); - POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ - zcs->nextJobID++; + CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); } /* check if there is any data available to flush */