#include "pool.h" /* threadpool */
#include "threading.h" /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
+#include "zstd_ldm.h"
#include "zstdmt_compress.h"
+/* Guards code to support resizing the SeqPool.
+ * We will want to resize the SeqPool to save memory in the future.
+ * Until then, comment the code out since it is unused.
+ */
+#define ZSTD_RESIZE_SEQPOOL 0
/* ====== Debug ====== */
#if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
}
}
+#if ZSTD_RESIZE_SEQPOOL
+/** ZSTDMT_resizeBuffer() :
+ * assumption : bufPool must be valid
+ * @return : a buffer that is at least the buffer pool buffer size.
+ * If a reallocation happens, the data in the input buffer is copied.
+ */
+static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
+{
+ size_t const bSize = bufPool->bufferSize;
+ if (buffer.capacity < bSize) {
+ void* const start = ZSTD_malloc(bSize, bufPool->cMem);
+ buffer_t newBuffer;
+ newBuffer.start = start;
+ newBuffer.capacity = start == NULL ? 0 : bSize;
+ if (start != NULL) {
+ assert(newBuffer.capacity >= buffer.capacity);
+ memcpy(newBuffer.start, buffer.start, buffer.capacity);
+ DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);
+ return newBuffer;
+ }
+ DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
+ }
+ return buffer;
+}
+#endif
+
/* store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{
}
+/* ===== Seq Pool Wrapper ====== */
+
+static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0};
+
+typedef ZSTDMT_bufferPool ZSTDMT_seqPool;
+
+static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)
+{
+ return ZSTDMT_sizeof_bufferPool(seqPool);
+}
+
+static rawSeqStore_t bufferToSeq(buffer_t buffer)
+{
+ rawSeqStore_t seq = {NULL, 0, 0, 0};
+ seq.seq = (rawSeq*)buffer.start;
+ seq.capacity = buffer.capacity / sizeof(rawSeq);
+ return seq;
+}
+
+static buffer_t seqToBuffer(rawSeqStore_t seq)
+{
+ buffer_t buffer;
+ buffer.start = seq.seq;
+ buffer.capacity = seq.capacity * sizeof(rawSeq);
+ return buffer;
+}
+
+static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)
+{
+ if (seqPool->bufferSize == 0) {
+ return kNullRawSeqStore;
+ }
+ return bufferToSeq(ZSTDMT_getBuffer(seqPool));
+}
+
+#if ZSTD_RESIZE_SEQPOOL
+static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
+{
+ return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
+}
+#endif
+
+static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
+{
+ ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq));
+}
+
+static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
+{
+ ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));
+}
+
+static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
+{
+ ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ ZSTDMT_setNbSeq(seqPool, 0);
+ return seqPool;
+}
+
+static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
+{
+ ZSTDMT_freeBufferPool(seqPool);
+}
+
+
+
/* ===== CCtx Pool ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */
} range_t;
typedef struct {
+ /* All variables in the struct are protected by mutex. */
ZSTD_pthread_mutex_t mutex;
ZSTD_pthread_cond_t cond;
ZSTD_CCtx_params params;
+ ldmState_t ldmState;
XXH64_state_t xxhState;
unsigned nextJobID;
+ /* Protects ldmWindow.
+ * Must be acquired after the main mutex when acquiring both.
+ */
+ ZSTD_pthread_mutex_t ldmWindowMutex;
+ ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */
+ ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
} serialState_t;
-static void ZSTDMT_serialState_reset(serialState_t* serialState, ZSTD_CCtx_params params)
+static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
{
+ /* Adjust parameters */
+ if (params.ldmParams.enableLdm) {
+ DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
+ params.ldmParams.windowLog = params.cParams.windowLog;
+ ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams);
+ assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
+ assert(params.ldmParams.hashEveryLog < 32);
+ serialState->ldmState.hashPower =
+ ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
+ }
serialState->nextJobID = 0;
if (params.fParams.checksumFlag)
XXH64_reset(&serialState->xxhState, 0);
+ if (params.ldmParams.enableLdm) {
+ ZSTD_customMem cMem = params.customMem;
+ unsigned const hashLog = params.ldmParams.hashLog;
+ size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t);
+ unsigned const bucketLog =
+ params.ldmParams.hashLog - params.ldmParams.bucketSizeLog;
+ size_t const bucketSize = (size_t)1 << bucketLog;
+ unsigned const prevBucketLog =
+ serialState->params.ldmParams.hashLog -
+ serialState->params.ldmParams.bucketSizeLog;
+ /* Size the seq pool tables */
+ ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
+ /* Reset the window */
+ ZSTD_window_clear(&serialState->ldmState.window);
+ serialState->ldmWindow = serialState->ldmState.window;
+ /* Resize tables and output space if necessary. */
+ if (serialState->params.ldmParams.hashLog < hashLog) {
+ ZSTD_free(serialState->ldmState.hashTable, cMem);
+ serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem);
+ }
+ if (prevBucketLog < bucketLog) {
+ ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
+ serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem);
+ }
+ if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)
+ return 1;
+ /* Zero the tables */
+ memset(serialState->ldmState.hashTable, 0, hashSize);
+ memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
+ }
serialState->params = params;
+ return 0;
}
static int ZSTDMT_serialState_init(serialState_t* serialState)
{
int initError = 0;
+ memset(serialState, 0, sizeof(*serialState));
initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL);
initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL);
+ initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL);
+ initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL);
return initError;
}
static void ZSTDMT_serialState_free(serialState_t* serialState)
{
+ ZSTD_customMem cMem = serialState->params.customMem;
ZSTD_pthread_mutex_destroy(&serialState->mutex);
ZSTD_pthread_cond_destroy(&serialState->cond);
+ ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex);
+ ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond);
+ ZSTD_free(serialState->ldmState.hashTable, cMem);
+ ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
}
-static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, unsigned jobID)
+static void ZSTDMT_serialState_update(serialState_t* serialState,
+ ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,
+ range_t src, unsigned jobID)
{
/* Wait for our turn */
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
/* A future job may error and skip our job */
if (serialState->nextJobID == jobID) {
/* It is now our turn, do any processing necessary */
+ if (serialState->params.ldmParams.enableLdm) {
+ size_t error;
+ assert(seqStore.seq != NULL && seqStore.pos == 0 &&
+ seqStore.size == 0 && seqStore.capacity > 0);
+ ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
+ error = ZSTD_ldm_generateSequences(
+ &serialState->ldmState, &seqStore,
+ &serialState->params.ldmParams, src.start, src.size);
+ /* We provide a large enough buffer to never fail. */
+ assert(!ZSTD_isError(error)); (void)error;
+ /* Update ldmWindow to match the ldmState.window and signal the main
+ * thread if it is waiting for a buffer.
+ */
+ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
+ serialState->ldmWindow = serialState->ldmState.window;
+ ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
+ ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
+ }
if (serialState->params.fParams.checksumFlag && src.size > 0)
XXH64_update(&serialState->xxhState, src.start, src.size);
}
serialState->nextJobID++;
ZSTD_pthread_cond_broadcast(&serialState->cond);
ZSTD_pthread_mutex_unlock(&serialState->mutex);
+
+ if (seqStore.size > 0) {
+ size_t const err = ZSTD_referenceExternalSequences(
+ jobCCtx, seqStore.seq, seqStore.size);
+ assert(serialState->params.ldmParams.enableLdm);
+ assert(!ZSTD_isError(err));
+ (void)err;
+ }
}
static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
DEBUGLOG(5, "Skipping past job %u because of error", jobID);
serialState->nextJobID = jobID + 1;
ZSTD_pthread_cond_broadcast(&serialState->cond);
+
+ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
+ ZSTD_window_clear(&serialState->ldmWindow);
+ ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
+ ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
}
ZSTD_pthread_mutex_unlock(&serialState->mutex);
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
+ rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
buffer_t dstBuff = job->dstBuff;
- /* Don't compute the checksum for chunks, but write it in the header */
+ /* Don't compute the checksum for chunks, since we compute it externally,
+ * but write it in the header.
+ */
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
+ /* Don't run LDM for the chunks, since we handle it externally */
+ jobParams.ldmParams.enableLdm = 0;
/* ressources */
if (cctx==NULL) {
goto _endJob;
} } }
- /* Perform serial step as early as possible */
- ZSTDMT_serialState_update(job->serial, job->src, job->jobID);
+ /* Perform serial step as early as possible, but after CCtx initialization */
+ ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start);
DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start);
/* release resources */
+ ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool;
+ ZSTDMT_seqPool* seqPool;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t targetPrefixSize;
mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
+ mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
mtctx->roundBuff = kNullRoundBuff;
- if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | initError) {
+ if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+ ZSTDMT_freeSeqPool(mtctx->seqPool);
ZSTDMT_serialState_free(&mtctx->serial);
ZSTD_freeCDict(mtctx->cdictLocal);
if (mtctx->roundBuff.buffer)
+ ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
+ + ZSTDMT_sizeof_seqPool(mtctx->seqPool)
+ ZSTD_sizeof_CDict(mtctx->cdictLocal)
+ mtctx->roundBuff.capacity;
}
jobParams.compressionLevel = params.compressionLevel;
jobParams.disableLiteralCompression = params.disableLiteralCompression;
- jobParams.ldmParams = params.ldmParams;
return jobParams;
}
static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
{
+ if (params.ldmParams.enableLdm)
+ return MAX(21, params.cParams.chainLog + 4);
return params.cParams.windowLog + 2;
}
static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params)
{
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
+ if (params.ldmParams.enableLdm)
+ return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog);
return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog);
}
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
- ZSTD_CCtx_params const params)
+ ZSTD_CCtx_params params)
{
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
assert(jobParams.nbWorkers == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
+ params.jobSize = (U32)avgJobSize;
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
- ZSTDMT_serialState_reset(&mtctx->serial, params);
+ if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+ return ERROR(memory_allocation);
if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */
U32 jobsTableSize = nbJobs;
mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool;
+ mtctx->jobs[u].seqPool = mtctx->seqPool;
mtctx->jobs[u].serial = &mtctx->serial;
mtctx->jobs[u].jobID = u;
mtctx->jobs[u].firstJob = (u==0);
/* init */
if (params.jobSize == 0) {
- if (params.cParams.windowLog >= 29)
- params.jobSize = ZSTDMT_JOBSIZE_MAX;
- else
- params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
+ params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
}
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
{
+ /* If ldm is enabled we need windowSize space. */
+ size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap */
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0);
- size_t const nbSections = nbWorkers + nbSlackBuffers;
- size_t const capacity = mtctx->targetSectionSize * nbSections;
+ size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
+ /* Compute the total size, and always have enough slack */
+ size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
+ size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
if (mtctx->roundBuff.capacity < capacity) {
if (mtctx->roundBuff.buffer)
ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem);
mtctx->allJobsCompleted = 0;
mtctx->consumed = 0;
mtctx->produced = 0;
- ZSTDMT_serialState_reset(&mtctx->serial, params);
+ if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+ return ERROR(memory_allocation);
return 0;
}
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
+ mtctx->jobs[jobID].seqPool = mtctx->seqPool;
mtctx->jobs[jobID].serial = &mtctx->serial;
mtctx->jobs[jobID].jobID = mtctx->nextJobID;
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
return bufferStart < rangeEnd && rangeStart < bufferEnd;
}
+static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
+{
+ range_t extDict;
+ range_t prefix;
+
+ extDict.start = window.dictBase + window.lowLimit;
+ extDict.size = window.dictLimit - window.lowLimit;
+
+ prefix.start = window.base + window.dictLimit;
+ prefix.size = window.nextSrc - (window.base + window.dictLimit);
+ DEBUGLOG(5, "extDict [0x%zx, 0x%zx)",
+ (size_t)extDict.start,
+ (size_t)extDict.start + extDict.size);
+ DEBUGLOG(5, "prefix [0x%zx, 0x%zx)",
+ (size_t)prefix.start,
+ (size_t)prefix.start + prefix.size);
+
+ return ZSTDMT_isOverlapped(buffer, extDict)
+ || ZSTDMT_isOverlapped(buffer, prefix);
+}
+
+static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
+{
+ if (mtctx->params.ldmParams.enableLdm) {
+ ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
+ DEBUGLOG(5, "source [0x%zx, 0x%zx)",
+ (size_t)buffer.start,
+ (size_t)buffer.start + buffer.capacity);
+ ZSTD_PTHREAD_MUTEX_LOCK(mutex);
+ while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
+ DEBUGLOG(6, "Waiting for LDM to finish...");
+ ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
+ }
+ DEBUGLOG(6, "Done waiting for LDM to finish");
+ ZSTD_pthread_mutex_unlock(mutex);
+ }
+}
+
/**
* Attempts to set the inBuff to the next section to fill.
* If any part of the new section is still in use we give up.
DEBUGLOG(6, "Waiting for buffer...");
return 0;
}
+ ZSTDMT_waitForLdmComplete(mtctx, buffer);
memmove(start, mtctx->inBuff.prefix.start, prefixSize);
mtctx->inBuff.prefix.start = start;
mtctx->roundBuff.pos = prefixSize;
return 0;
}
assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
+
+ ZSTDMT_waitForLdmComplete(mtctx, buffer);
+
DEBUGLOG(5, "Using prefix range [%zx, %zx)",
(size_t)mtctx->inBuff.prefix.start,
(size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size);