ZSTD_invalidateRepCodes(job->cctx);
}
- DEBUGLOG(3, "Compressing : ");
- DEBUG_PRINTHEX(3, job->srcStart, 12);
+ DEBUGLOG(4, "Compressing : ");
+ DEBUG_PRINTHEX(4, job->srcStart, 12);
job->cSize = (job->lastChunk) ? /* last chunk signal */
ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
}
+/* ZSTDMT_flushNextJob() :
+ * output : will be updated with amount of data flushed .
+ * blockToFlush : the function will block and wait if there is no data available to flush .
+ * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
+static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
+{
+ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
+ PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ while (zcs->jobs[wJobID].jobCompleted==0) {
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
+ if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
+ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+ }
+ pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ /* compression job completed : output can be flushed */
+ { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+ DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+ zcs->jobs[wJobID].cctx = NULL;
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+ zcs->jobs[wJobID].srcStart = NULL;
+ zcs->jobs[wJobID].src = g_nullBuffer;
+ if (ZSTD_isError(job.cSize)) {
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
+ memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+ output->pos += toWrite;
+ job.dstFlushed += toWrite;
+ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+ zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+ zcs->jobs[wJobID].jobCompleted = 0;
+ zcs->doneJobID++;
+ } else {
+ zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
+ }
+ /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+ if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
+ zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */
+ return 0; /* everything flushed */
+} }
+
+
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
}
/* check if there is any data available to flush */
+ ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */
+#if 0
{ unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
unsigned jobCompleted;
pthread_mutex_lock(&zcs->jobCompleted_mutex);
zcs->jobs[jobID].jobCompleted = 0;
zcs->doneJobID++;
} } }
-
+#endif
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled;
}
zcs->frameEnded = 1;
}
- DEBUGLOG(1, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+ DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
zcs->nextJobID++;
}
/* check if there is any data available to flush */
- DEBUGLOG(1, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
- if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
+ DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
+ return ZSTDMT_flushNextJob(zcs, output, 1);
+
+#if 0
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
- DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
}
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
- { /* job completed : output can be flushed */
- ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+ /* compression job completed : output can be flushed */
+ { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(1, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) {
zcs->allJobsCompleted = zcs->frameEnded;
return 0;
} }
+#endif
}