size_t size;
} buffer_t;
+static const buffer_t g_nullBuffer = (buffer_t) { NULL, 0 };
+
typedef struct ZSTDMT_bufferPool_s {
unsigned totalBuffers;;
unsigned nbBuffers;
return cctx;
}
+/* ZSTDMT_releaseAllJobResources() :
+ * Ensure all workers are killed first. */
+static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
+{
+ unsigned jobID;
+ for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff);
+ mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src);
+ mtctx->jobs[jobID].src = g_nullBuffer;
+ ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
+ mtctx->jobs[jobID].cctx = NULL;
+ }
+}
+
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{
+ if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory);
- ZSTDMT_freeBufferPool(mtctx->buffPool);
+ ZSTDMT_releaseAllJobResources(mtctx); /* kill workers first */
+ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources first */
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
+ mtctx->jobs[chunkID].cctx = NULL;
+ mtctx->jobs[chunkID].srcStart = NULL;
{ size_t const cSize = mtctx->jobs[chunkID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
if (chunkID) { /* note : chunk 0 is already written directly into dst */
if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
+ mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
}
dstPos += cSize ;
}
#if 1
+static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
+ while (zcs->doneJobID < zcs->nextJobID) {
+ unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
+ PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ while (zcs->jobs[jobID].jobCompleted==0) {
+ DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
+ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+ }
+ pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ zcs->doneJobID++;
+ }
+}
+
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
- zcs->jobs[jobID].cSize = ERROR(memory_allocation); /* job result : how to collect that error ? */
+ zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
}
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
/* get a new buffer for next input - save remaining into it */
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ zcs->jobs[jobID].cSize = ERROR(memory_allocation);
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
ZSTDMT_jobDescription job = zcs->jobs[jobID];
if (job.jobCompleted) { /* job completed : output can be flushed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 };
+ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+ zcs->jobs[jobID].cctx = NULL;
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+ zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = (buffer_t) { NULL, 0 };
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+ zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->doneJobID++;
} else {
zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */
if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
- buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
- ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
+ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
+ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+
+ if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+ zcs->jobs[jobID].cSize = ERROR(memory_allocation);
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
/* get a new buffer for next input */
if (!endFrame) {
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
zcs->inBuff.filled = 0;
+ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ zcs->jobs[jobID].cSize = ERROR(memory_allocation);
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
} else {
zcs->frameEnded = 1;
}
ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
- ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 };
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;