/* ====== Dependencies ====== */
#include <string.h> /* memcpy, memset */
+#include <limits.h> /* INT_MAX */
#include "pool.h" /* threadpool */
#include "threading.h" /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
{
size_t const poolSize = sizeof(*bufPool)
- + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
+ + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
unsigned u;
size_t totalBufferSize = 0;
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
ZSTD_free(buf.start, bufPool->cMem);
}
-/* Sets parameters relevant to the compression job, initializing others to
- * default values. Notably, nbThreads should probably be zero. */
-static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
-{
- ZSTD_CCtx_params jobParams;
- memset(&jobParams, 0, sizeof(jobParams));
-
- jobParams.cParams = params.cParams;
- jobParams.fParams = params.fParams;
- jobParams.compressionLevel = params.compressionLevel;
-
- jobParams.ldmParams = params.ldmParams;
- return jobParams;
-}
/* ===== CCtx Pool ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */
}
-/* ===== Thread worker ===== */
+/* ------------------------------------------ */
+/* ===== Thread worker ===== */
+/* ------------------------------------------ */
typedef struct {
buffer_t src;
const void* srcStart;
size_t prefixSize;
size_t srcSize;
+ size_t readSize;
buffer_t dstBuff;
size_t cSize;
size_t dstFlushed;
unsigned long long fullFrameSize;
} ZSTDMT_jobDescription;
-/* ZSTDMT_compressChunk() : POOL_function type */
+/* ZSTDMT_compressChunk() is a POOL_function type */
void ZSTDMT_compressChunk(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
const void* const src = (const char*)job->srcStart + job->prefixSize;
buffer_t dstBuff = job->dstBuff;
- DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ",
- job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize);
+ /* ressources */
if (cctx==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
-
if (dstBuff.start == NULL) {
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) {
goto _endJob;
}
job->dstBuff = dstBuff;
- DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size);
}
+ /* init */
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
- DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog);
assert(job->firstChunk); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
- size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
- if (ZSTD_isError(forceWindowError)) {
- DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError));
- job->cSize = forceWindowError;
- goto _endJob;
- }
- DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog);
+ { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
+ if (ZSTD_isError(forceWindowError)) {
+ job->cSize = forceWindowError;
+ goto _endJob;
+ } }
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
NULL,
jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) {
- DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
job->cSize = initError;
goto _endJob;
} }
ZSTD_invalidateRepCodes(cctx);
}
- DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size);
- DEBUG_PRINTHEX(6, job->srcStart, 12);
+ /* compress */
+#if 1
job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
- DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u) ",
- (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
- DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
+#else
+ if (sizeof(size_t) > sizeof(int))
+ assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
+ { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
+ const BYTE* ip = (const BYTE*) src;
+ BYTE* const ostart = (BYTE*)dstBuff.start;
+ BYTE* op = ostart;
+ BYTE* oend = op + dstBuff.size;
+ int blockNb;
+ job->cSize = 0;
+ for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) {
+ size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
+ if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+ ip += ZSTD_BLOCKSIZE_MAX;
+ op += cSize; assert(op < oend);
+ /* stats */
+ job->cSize += cSize;
+ job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
+ }
+ /* last block */
+ { size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
+ size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
+ size_t const cSize = (job->lastChunk) ?
+ ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
+ ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
+ if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+ /* stats */
+ job->cSize += cSize;
+ job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
+ }
+ }
+#endif
_endJob:
+ /* release */
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
+ /* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1;
job->jobScanned = 0;
const ZSTD_CDict* cdict;
};
+/* Sets parameters relevant to the compression job, initializing others to
+ * default values. Notably, nbThreads should probably be zero. */
+static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
+{
+ ZSTD_CCtx_params jobParams;
+ memset(&jobParams, 0, sizeof(jobParams));
+
+ jobParams.cParams = params.cParams;
+ jobParams.fParams = params.fParams;
+ jobParams.compressionLevel = params.compressionLevel;
+
+ jobParams.ldmParams = params.ldmParams;
+ return jobParams;
+}
+
static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
+ zcs->jobs[jobID].readSize = 0;
zcs->jobs[jobID].prefixSize = zcs->dictSize;
assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
zcs->jobs[jobID].params = zcs->params;