unsigned doneJobID;
unsigned nextJobID;
unsigned frameEnded;
+ unsigned allJobsCompleted;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
};
if (!cctx) return NULL;
cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1;
+ cctx->allJobsCompleted = 1;
cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
{
if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory);
- ZSTDMT_releaseAllJobResources(mtctx); /* kill workers first */
- ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources first */
+ if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
+ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+ if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ zcs->allJobsCompleted = 1;
+ }
zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
zcs->doneJobID = 0;
zcs->nextJobID = 0;
zcs->frameEnded = 0;
+ zcs->allJobsCompleted = 0;
return 0;
}
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
- if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Finish it and restart a new one */
+ if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
}
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
- return (zcs->doneJobID < zcs->nextJobID);
+ if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
+ zcs->allJobsCompleted = zcs->frameEnded;
+ return 0;
} }
}