fp.ingested = cctx->consumedSrcSize + buffered;
fp.consumed = cctx->consumedSrcSize;
fp.produced = cctx->producedCSize;
+ fp.flushed = cctx->producedCSize; /* simplified; some data might still be left within streaming output buffer */
fp.currentJobID = 0;
+ fp.nbActiveWorkers = 0;
return fp;
} }
/*! ZSTDMT_updateCParams_whileCompressing() :
- * Updates only a selected set of compression parameters, to remain compatible with current frame.
+ * Updates a selected set of compression parameters, remaining compatible with currently active frame.
* New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
{
/* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
- * Note : mutex will be acquired during statistics collection. */
+ * Note : mutex will be acquired during statistics collection inside workers. */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fps;
DEBUGLOG(5, "ZSTDMT_getFrameProgression");
fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
fps.consumed = mtctx->consumed;
- fps.produced = mtctx->produced;
+ fps.produced = fps.flushed = mtctx->produced;
fps.currentJobID = mtctx->nextJobID;
+ fps.nbActiveWorkers = 0;
{ unsigned jobNb;
unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
mtctx->doneJobID, lastJobNb, mtctx->jobReady)
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
- ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
- { size_t const cResult = mtctx->jobs[wJobID].cSize;
+ ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
+ ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+ { size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
- fps.ingested += mtctx->jobs[wJobID].src.size;
- fps.consumed += mtctx->jobs[wJobID].consumed;
+ fps.ingested += jobPtr->src.size;
+ fps.consumed += jobPtr->consumed;
fps.produced += produced;
+ fps.flushed += jobPtr->dstFlushed;
+ fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
typedef struct {
- unsigned long long ingested;
- unsigned long long consumed;
- unsigned long long produced;
- unsigned currentJobID;
+ unsigned long long ingested; /* nb input bytes read and buffered */
+ unsigned long long consumed; /* nb input bytes actually compressed */
+ unsigned long long produced; /* nb of compressed bytes generated and buffered */
+ unsigned long long flushed; /* nb of compressed bytes flushed : not provided; can be tracked from caller side */
+ unsigned currentJobID; /* MT only : latest started job nb */
+ unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */
} ZSTD_frameProgression;
/* ZSTD_getFrameProgression():
/* check output speed */
if (zfp.currentJobID > 1) {
- static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 };
- static unsigned long long lastFlushedSize = 0;
+ static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
- unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+ unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
assert(zfp.produced >= cpszfp.produced);
+ cpszfp = zfp;
+
if ( (zfp.ingested == cpszfp.ingested)
&& (zfp.consumed == cpszfp.consumed) ) {
- DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n")
+ DISPLAYLEVEL(2, "no data read nor consumed : buffers are full (?) output is too slow => slow down ; or compression is slow + input has reached its limit => can't tell \n")
speedChange = slower;
}
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
speedChange = slower;
}
- cpszfp = zfp;
- lastFlushedSize = compressedfilesize;
}
/* course correct only if there is at least one new job completed */
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
speedChange = slower;
} else if (speedChange == noChange) {
- static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 };
- static unsigned long long lastFlushedSize = 0;
+ static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
- unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+ unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
csuzfp = zfp;
- lastFlushedSize = compressedfilesize;
assert(inputPresented > 0);
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,