unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
- zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
- zcs->jobs[jobID].firstChunk = (jobID==0);
+ zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = 0;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
/* get a new buffer for next input - save remaining into it */
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
zcs->jobs[jobID].cctx = NULL;
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
+ if (ZSTD_isError(job.cSize)) {
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
- zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
- zcs->jobs[jobID].firstChunk = (jobID==0);
+ zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
zcs->inBuff.filled = 0;
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
}
/* check if there is any data available to flush */
+ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
+ if (ZSTD_isError(job.cSize)) {
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;