static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{
DEBUGLOG(2, "ZSTDMT_releaseBuffer");
- if (buf.start == NULL) return; /* release on NULL */
+ if (buf.start == NULL) return; /* compatible with release on NULL */
pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */
/* ===== Thread worker ===== */
-typedef struct {
- buffer_t buffer;
- size_t filled;
-} inBuff_t;
-
typedef struct {
buffer_t src;
const void* srcStart;
- size_t srcSize;
size_t dictSize;
+ size_t srcSize;
buffer_t dstBuff;
size_t cSize;
size_t dstFlushed;
_endJob:
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
+ ZSTDMT_releaseBuffer(job->bufPool, job->src);
+ job->src = g_nullBuffer; job->srcStart = NULL;
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1;
job->jobScanned = 0;
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
+typedef struct {
+ buffer_t buffer;
+ size_t filled;
+} inBuff_t;
+
struct ZSTDMT_CCtx_s {
POOL_ctx* factory;
ZSTDMT_jobDescription* jobs;
}
+/* Note : missing checksum at the end ! */
size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
+ mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].dictSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+ if (zcs->params.fParams.checksumFlag)
+ XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize);
+
/* get a new buffer for next input */
if (!endFrame) {
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
- DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1;
ZSTDMT_releaseAllJobResources(zcs);
return ERROR(memory_allocation);
}
- DEBUGLOG(5, "inBuff currently filled to %u", (U32)zcs->inBuff.filled);
zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
- DEBUGLOG(5, "new job : inBuff filled to %u, with %u dict and %u src",
- (U32)zcs->inBuff.filled, (U32)newDictSize,
- (U32)(zcs->inBuff.filled - newDictSize));
memmove(zcs->inBuff.buffer.start,
(const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
zcs->inBuff.filled);
- DEBUGLOG(5, "new inBuff pre-filled");
zcs->dictSize = newDictSize;
} else { /* if (endFrame==1) */
- DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0;
zcs->dictSize = 0;
}
DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
if (zcs->params.fParams.checksumFlag) {
- XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "writing checksum : %08X \n", checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
} }
- ZSTDMT_releaseBuffer(zcs->bufPool, job.src);
- zcs->jobs[wJobID].srcStart = NULL;
- zcs->jobs[wJobID].src = g_nullBuffer;
zcs->jobs[wJobID].jobScanned = 1;
}
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);