typedef struct buffer_s {
void* start;
size_t capacity;
-} buffer_t;
+} Buffer;
-static const buffer_t g_nullBuffer = { NULL, 0 };
+static const Buffer g_nullBuffer = { NULL, 0 };
typedef struct ZSTDMT_bufferPool_s {
ZSTD_pthread_mutex_t poolMutex;
unsigned totalBuffers;
unsigned nbBuffers;
ZSTD_customMem cMem;
- buffer_t* buffers;
+ Buffer* buffers;
} ZSTDMT_bufferPool;
static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
ZSTD_customFree(bufPool, cMem);
return NULL;
}
- bufPool->buffers = (buffer_t*)ZSTD_customCalloc(maxNbBuffers * sizeof(buffer_t), cMem);
+ bufPool->buffers = (Buffer*)ZSTD_customCalloc(maxNbBuffers * sizeof(Buffer), cMem);
if (bufPool->buffers==NULL) {
ZSTDMT_freeBufferPool(bufPool);
return NULL;
static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
{
size_t const poolSize = sizeof(*bufPool);
- size_t const arraySize = bufPool->totalBuffers * sizeof(buffer_t);
+ size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer);
unsigned u;
size_t totalBufferSize = 0;
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
* assumption : bufPool must be valid
* @return : a buffer, with start pointer and size
* note: allocation may fail, in this case, start==NULL and size==0 */
-static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
+static Buffer ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{
size_t const bSize = bufPool->bufferSize;
DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize);
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers) { /* try to use an existing buffer */
- buffer_t const buf = bufPool->buffers[--(bufPool->nbBuffers)];
+ Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.capacity;
bufPool->buffers[bufPool->nbBuffers] = g_nullBuffer;
if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) {
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
/* create new buffer */
DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer");
- { buffer_t buffer;
+ { Buffer buffer;
void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);
buffer.start = start; /* note : start can be NULL if malloc fails ! */
buffer.capacity = (start==NULL) ? 0 : bSize;
* @return : a buffer that is at least the buffer pool buffer size.
* If a reallocation happens, the data in the input buffer is copied.
*/
-static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
+static Buffer ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, Buffer buffer)
{
size_t const bSize = bufPool->bufferSize;
if (buffer.capacity < bSize) {
void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);
- buffer_t newBuffer;
+ Buffer newBuffer;
newBuffer.start = start;
newBuffer.capacity = start == NULL ? 0 : bSize;
if (start != NULL) {
#endif
/* store buffer for later re-use, up to pool capacity */
-static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
+static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, Buffer buf)
{
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* compatible with release on NULL */
return ZSTDMT_sizeof_bufferPool(seqPool);
}
-static RawSeqStore_t bufferToSeq(buffer_t buffer)
+static RawSeqStore_t bufferToSeq(Buffer buffer)
{
RawSeqStore_t seq = kNullRawSeqStore;
seq.seq = (rawSeq*)buffer.start;
return seq;
}
-static buffer_t seqToBuffer(RawSeqStore_t seq)
+static Buffer seqToBuffer(RawSeqStore_t seq)
{
- buffer_t buffer;
+ Buffer buffer;
buffer.start = seq.seq;
buffer.capacity = seq.capacity * sizeof(rawSeq);
return buffer;
typedef struct {
void const* start;
size_t size;
-} range_t;
+} Range;
typedef struct {
/* All variables in the struct are protected by mutex. */
ZSTD_pthread_mutex_t ldmWindowMutex;
ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
-} serialState_t;
+} SerialState;
static int
-ZSTDMT_serialState_reset(serialState_t* serialState,
+ZSTDMT_serialState_reset(SerialState* serialState,
ZSTDMT_seqPool* seqPool,
ZSTD_CCtx_params params,
size_t jobSize,
return 0;
}
-static int ZSTDMT_serialState_init(serialState_t* serialState)
+static int ZSTDMT_serialState_init(SerialState* serialState)
{
int initError = 0;
ZSTD_memset(serialState, 0, sizeof(*serialState));
return initError;
}
-static void ZSTDMT_serialState_free(serialState_t* serialState)
+static void ZSTDMT_serialState_free(SerialState* serialState)
{
ZSTD_customMem cMem = serialState->params.customMem;
ZSTD_pthread_mutex_destroy(&serialState->mutex);
ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);
}
-static void ZSTDMT_serialState_update(serialState_t* serialState,
+static void ZSTDMT_serialState_update(SerialState* serialState,
ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore,
- range_t src, unsigned jobID)
+ Range src, unsigned jobID)
{
/* Wait for our turn */
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
}
}
-static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
+static void ZSTDMT_serialState_ensureFinished(SerialState* serialState,
unsigned jobID, size_t cSize)
{
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
/* ===== Worker thread ===== */
/* ------------------------------------------ */
-static const range_t kNullRange = { NULL, 0 };
+static const Range kNullRange = { NULL, 0 };
typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
- serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */
- buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
- range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
- range_t src; /* set by mtctx, then read by worker & mtctx => no barrier */
+ SerialState* serial; /* Thread-safe - used by mtctx and (all) workers */
+ Buffer dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
+ Range prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
+ Range src; /* set by mtctx, then read by worker & mtctx => no barrier */
unsigned jobID; /* set by mtctx, then read by worker => no barrier */
unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
RawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
- buffer_t dstBuff = job->dstBuff;
+ Buffer dstBuff = job->dstBuff;
size_t lastCBlockSize = 0;
/* resources */
/* ------------------------------------------ */
typedef struct {
- range_t prefix; /* read-only non-owned prefix buffer */
- buffer_t buffer;
+ Range prefix; /* read-only non-owned prefix buffer */
+ Buffer buffer;
size_t filled;
-} inBuff_t;
+} InBuff_t;
typedef struct {
BYTE* buffer; /* The round input buffer. All jobs get references
* the inBuff is sent to the worker thread.
* pos <= capacity.
*/
-} roundBuff_t;
+} RoundBuff_t;
-static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};
+static const RoundBuff_t kNullRoundBuff = {NULL, 0, 0};
#define RSYNC_LENGTH 32
/* Don't create chunks smaller than the zstd block size.
U64 hash;
U64 hitMask;
U64 primePower;
-} rsyncState_t;
+} RSyncState_t;
struct ZSTDMT_CCtx_s {
POOL_ctx* factory;
size_t targetSectionSize;
size_t targetPrefixSize;
int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
- inBuff_t inBuff;
- roundBuff_t roundBuff;
- serialState_t serial;
- rsyncState_t rsync;
+ InBuff_t inBuff;
+ RoundBuff_t roundBuff;
+ SerialState serial;
+ RSyncState_t rsync;
unsigned jobIDMask;
unsigned doneJobID;
unsigned nextJobID;
* If the data of the first job is broken up into two segments, we cover both
* sections.
*/
-static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
+static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
{
unsigned const firstJobID = mtctx->doneJobID;
unsigned const lastJobID = mtctx->nextJobID;
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
if (consumed < mtctx->jobs[wJobID].src.size) {
- range_t range = mtctx->jobs[wJobID].prefix;
+ Range range = mtctx->jobs[wJobID].prefix;
if (range.size == 0) {
/* Empty prefix */
range = mtctx->jobs[wJobID].src;
/**
* Returns non-zero iff buffer and range overlap.
*/
-static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
+static int ZSTDMT_isOverlapped(Buffer buffer, Range range)
{
BYTE const* const bufferStart = (BYTE const*)buffer.start;
BYTE const* const rangeStart = (BYTE const*)range.start;
}
}
-static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
+static int ZSTDMT_doesOverlapWindow(Buffer buffer, ZSTD_window_t window)
{
- range_t extDict;
- range_t prefix;
+ Range extDict;
+ Range prefix;
DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
extDict.start = window.dictBase + window.lowLimit;
|| ZSTDMT_isOverlapped(buffer, prefix);
}
-static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
+static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, Buffer buffer)
{
if (mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable) {
ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
*/
static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
{
- range_t const inUse = ZSTDMT_getInputDataInUse(mtctx);
+ Range const inUse = ZSTDMT_getInputDataInUse(mtctx);
size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
size_t const target = mtctx->targetSectionSize;
- buffer_t buffer;
+ Buffer buffer;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
assert(mtctx->inBuff.buffer.start == NULL);
typedef struct {
size_t toLoad; /* The number of bytes to load from the input. */
int flush; /* Boolean declaring if we must flush because we found a synchronization point. */
-} syncPoint_t;
+} SyncPoint;
/**
* Searches through the input for a synchronization point. If one is found, we
* Otherwise, we will load as many bytes as possible and instruct the caller
* to continue as normal.
*/
-static syncPoint_t
+static SyncPoint
findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
{
BYTE const* const istart = (BYTE const*)input.src + input.pos;
U64 const primePower = mtctx->rsync.primePower;
U64 const hitMask = mtctx->rsync.hitMask;
- syncPoint_t syncPoint;
+ SyncPoint syncPoint;
U64 hash;
BYTE const* prev;
size_t pos;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
}
if (mtctx->inBuff.buffer.start != NULL) {
- syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input);
+ SyncPoint const syncPoint = findSynchronizationPoint(mtctx, *input);
if (syncPoint.flush && endOp == ZSTD_e_continue) {
endOp = ZSTD_e_flush;
}