#include <string.h> /* memcpy */
#include <pool.h> /* threadpool */
#include "threading.h" /* mutex */
-#include "zstd_internal.h" /* MIN, ERROR, ZSTD_* */
+#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstdmt_compress.h"
#if 0
+
# include <stdio.h>
# include <unistd.h>
# include <sys/times.h>
/* ===== Thread worker ===== */
+typedef struct {
+ buffer_t buffer;
+ size_t filled;
+} inBuff_t;
+
typedef struct {
ZSTD_CCtx* cctx;
+ buffer_t src;
const void* srcStart;
size_t srcSize;
buffer_t dstBuff;
}
+/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */
+/* ------------------------------------------ */
struct ZSTDMT_CCtx_s {
POOL_ctx* factory;
ZSTDMT_bufferPool* buffPool;
ZSTDMT_CCtxPool* cctxPool;
- unsigned nbThreads;
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
- ZSTDMT_jobDescription jobs[1]; /* variable size */
+ size_t targetSectionSize;
+ size_t inBuffSize;
+ inBuff_t inBuff;
+ ZSTD_parameters params;
+ unsigned nbThreads;
+ unsigned jobIDMask;
+ unsigned doneJobID;
+ unsigned nextJobID;
+ unsigned frameEnded;
+ ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
};
ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
{
ZSTDMT_CCtx* cctx;
+ U32 const minNbJobs = nbThreads + 1;
+ U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
+ U32 const nbJobs = 1 << nbJobsLog2;
+ DEBUGLOG(4, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
+ nbThreads, minNbJobs, nbJobsLog2, nbJobs);
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
- cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription));
+ cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
if (!cctx) return NULL;
cctx->nbThreads = nbThreads;
+ cctx->jobIDMask = nbJobs - 1;
cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
/* ======= Streaming API ======= */
/* ====================================== */
-#if 0
+#if 1
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
- zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2);
+ zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
- zcs->inBuff.current = 0;
+ zcs->inBuff.filled = 0;
zcs->doneJobID = 0;
zcs->nextJobID = 0;
+ zcs->frameEnded = 0;
return 0;
}
-typedef struct {
- buffer_t buffer;
- unsigned current;
-} inBuff_t;
-
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
+ if (zcs->frameEnded) return ERROR(stage_wrong);
+
/* fill input buffer */
- { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current);
- memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad);
+ { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
+ memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
input->pos += toLoad;
}
- if (zcs->inBuff.current == zcs->inBuffSize) { /* filled enough : let's compress */
+ if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */
size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
- buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */
+ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
- unsigned const jobID = zcs->nextJobID & zcs->jobIDmask;
+ unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
- zcs->jobs[jobID].srcStart = zcs->inBuff.start;
+ zcs->jobs[jobID].src = zcs->inBuff.buffer;
+ zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
zcs->jobs[jobID].fullFrameSize = 0;
- zcs->jobs[jobID].compressionLevel = zcs->compressionLevel;
+ zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
- zcs->jobs[jobID].frameID = (jobID>0);
+ zcs->jobs[jobID].firstChunk = (jobID==0);
+ zcs->jobs[jobID].lastChunk = 0;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
/* get a new buffer for next input - save remaining into it */
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
- zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize;
- memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current);
+ zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
+ memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
- DEBUGLOG(3, "posting job %u (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize);
+ DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
zcs->nextJobID++;
}
/* check if there is any data available to flush */
- { unsigned const jobID = zcs->doneJobID & zcs->jobIDmask;
+ { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
ZSTDMT_jobDescription job = zcs->jobs[jobID];
if (job.jobCompleted) { /* job completed : output can be flushed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */
- free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */
- memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite);
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 };
+ memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
} }
/* recommended next input size : fill current input buffer */
- return zcs->inBuffSize - zcs->inBuff.current;
+ return zcs->inBuffSize - zcs->inBuff.filled;
+}
+
+static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
+{
+ size_t const srcSize = zcs->inBuff.filled;
+
+ if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
+ size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
+ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
+ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
+ unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+ zcs->jobs[jobID].src = zcs->inBuff.buffer;
+ zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+ zcs->jobs[jobID].srcSize = srcSize;
+ zcs->jobs[jobID].fullFrameSize = 0;
+ zcs->jobs[jobID].params = zcs->params;
+ zcs->jobs[jobID].dstBuff = dstBuffer;
+ zcs->jobs[jobID].cctx = cctx;
+ zcs->jobs[jobID].firstChunk = (jobID==0);
+ zcs->jobs[jobID].lastChunk = endFrame;
+ zcs->jobs[jobID].jobCompleted = 0;
+ zcs->jobs[jobID].dstFlushed = 0;
+ zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+ zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+ /* get a new buffer for next input */
+ if (!endFrame) {
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
+ zcs->inBuff.filled = 0;
+ } else {
+ zcs->frameEnded = 1;
+ }
+
+ DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
+ POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
+ zcs->nextJobID++;
+ }
+
+ /* check if there is any data available to flush */
+ { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+ if (job.jobCompleted) { /* job completed : output can be flushed */
+ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
+ memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+ output->pos += toWrite;
+ job.dstFlushed += toWrite;
+ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 };
+ zcs->doneJobID++;
+ } else {
+ zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
+ } }
+ /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+ return (zcs->doneJobID < zcs->nextJobID);
+ }
+}
+
+
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+ return ZSTDMT_flushStream_internal(zcs, output, 0);
+}
+
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+ return ZSTDMT_flushStream_internal(zcs, output, 1);
}
-size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
-size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
#endif