]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
first zstdmt sketch
authorYann Collet <cyan@fb.com>
Tue, 27 Dec 2016 06:19:36 +0000 (07:19 +0100)
committerYann Collet <cyan@fb.com>
Tue, 27 Dec 2016 06:19:36 +0000 (07:19 +0100)
lib/compress/zstdmt_compress.c [new file with mode: 0644]
lib/compress/zstdmt_compress.h [new file with mode: 0644]
programs/Makefile
programs/bench.c

diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c
new file mode 100644 (file)
index 0000000..13cc194
--- /dev/null
@@ -0,0 +1,310 @@
+#include <stdlib.h>   /* malloc */
+#include <pthread.h>
+#include "zstd_internal.h"   /* MIN, ERROR */
+#include "zstdmt_compress.h"
+
+#if 0
+#  include <stdio.h>
+   static unsigned g_debugLevel = 4;
+#  define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+#else
+#  define DEBUGLOG(l, ...)   /* disabled */
+#endif
+
+#define ZSTDMT_NBTHREADS_MAX 128
+#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
+
+typedef struct frameToWrite_s {
+    const void* start;
+    size_t frameSize;
+    unsigned frameID;
+    unsigned isLastFrame;
+} frameToWrite_t;
+
+typedef struct ZSTDMT_dstBuffer_s {
+    ZSTD_outBuffer out;
+    unsigned frameIDToWrite;
+    pthread_mutex_t frameTable_mutex;
+    pthread_mutex_t allFramesWritten_mutex;
+    frameToWrite_t stackedFrame[ZSTDMT_NBSTACKEDFRAMES_MAX];
+    unsigned nbStackedFrames;
+} ZSTDMT_dstBufferManager;
+
+static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t dstCapacity)
+{
+    ZSTDMT_dstBufferManager dbm;
+    dbm.out.dst = dst;
+    dbm.out.size = dstCapacity;
+    dbm.out.pos = 0;
+    dbm.frameIDToWrite = 0;
+    pthread_mutex_init(&dbm.frameTable_mutex, NULL);
+    pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
+    pthread_mutex_lock(&dbm.allFramesWritten_mutex);
+    dbm.nbStackedFrames = 0;
+    return dbm;
+}
+
+/* note : can fail if nbStackedFrames > ZSTDMT_NBSTACKEDFRAMES_MAX.
+ * note2 : can only be called from a section with frameTable_mutex already locked */
+static void ZSTDMT_stackFrameToWrite(ZSTDMT_dstBufferManager* dstBufferManager, frameToWrite_t frame) {
+    dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames++] = frame;
+}
+
+
+typedef struct buffer_s {
+    void* start;
+    size_t bufferSize;
+} buffer_t;
+
+static buffer_t ZSTDMT_getDstBuffer(const ZSTDMT_dstBufferManager* dstBufferManager)
+{
+    ZSTD_outBuffer const out = dstBufferManager->out;
+    buffer_t buf;
+    buf.start = (char*)(out.dst) + out.pos;
+    buf.bufferSize = out.size - out.pos;
+    return buf;
+}
+
+/* condition : stackNumber < dstBufferManager->nbStackedFrames.
+ * note : there can only be one write at a time, due to frameID condition */
+static size_t ZSTDMT_writeFrame(ZSTDMT_dstBufferManager* dstBufferManager, unsigned stackNumber)
+{
+    ZSTD_outBuffer const out = dstBufferManager->out;
+    size_t const frameSize = dstBufferManager->stackedFrame[stackNumber].frameSize;
+    const void* const frameStart = dstBufferManager->stackedFrame[stackNumber].start;
+    if (out.pos + frameSize > out.size)
+        return ERROR(dstSize_tooSmall);
+    DEBUGLOG(3, "writing frame %u (%u bytes) ", dstBufferManager->stackedFrame[stackNumber].frameID, (U32)frameSize);
+    memcpy((char*)out.dst + out.pos, frameStart, frameSize);
+    dstBufferManager->out.pos += frameSize;
+    dstBufferManager->frameIDToWrite = dstBufferManager->stackedFrame[stackNumber].frameID + 1;
+    return 0;
+}
+
+static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
+                                   const void* src, size_t srcSize,
+                                   unsigned frameID, unsigned isLastFrame)
+{
+    unsigned lastFrameWritten = 0;
+
+    /* check if correct frame ordering; stack otherwise */
+    DEBUGLOG(5, "considering writing frame %u ", frameID);
+    pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+    if (frameID != dstBufferManager->frameIDToWrite) {
+        DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u  ", frameID, dstBufferManager->frameIDToWrite);
+        frameToWrite_t frame = { src, srcSize, frameID, isLastFrame };
+        ZSTDMT_stackFrameToWrite(dstBufferManager, frame);
+        pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
+        return 0;
+    }
+    pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
+
+    /* write frame
+     * note : only one write possible due to frameID condition */
+    DEBUGLOG(3, "writing frame %u (%u bytes) ", frameID, (U32)srcSize);
+    ZSTD_outBuffer const out = dstBufferManager->out;
+    if (out.pos + srcSize > out.size)
+        return ERROR(dstSize_tooSmall);
+    if (frameID) /* frameID==0 compress directly in dst buffer */
+        memcpy((char*)out.dst + out.pos, src, srcSize);
+    dstBufferManager->out.pos += srcSize;
+    dstBufferManager->frameIDToWrite = frameID+1;
+    lastFrameWritten = isLastFrame;
+
+    /* check if more frames are stacked */
+    pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+    unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
+    while (frameWritten) {
+        unsigned u;
+        frameID++;
+        frameWritten = 0;
+        for (u=0; u<dstBufferManager->nbStackedFrames; u++) {
+            if (dstBufferManager->stackedFrame[u].frameID == frameID) {
+                pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
+                { size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u);
+                  if (ZSTD_isError(writeError)) return writeError; }
+                lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
+                /* remove frame from stack */
+                pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+                dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
+                dstBufferManager->nbStackedFrames -= 1;
+                frameWritten = dstBufferManager->nbStackedFrames>0;
+                break;
+    }   }   }
+    pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
+
+    /* end reached : last frame written */
+    if (lastFrameWritten) pthread_mutex_unlock(&dstBufferManager->allFramesWritten_mutex);
+    return 0;
+}
+
+
+
+typedef struct ZSTDMT_jobDescription_s {
+    const void* src;   /* NULL means : kill thread */
+    size_t srcSize;
+    int compressionLevel;
+    ZSTDMT_dstBufferManager* dstManager;
+    unsigned frameNumber;
+    unsigned isLastFrame;
+} ZSTDMT_jobDescription;
+
+typedef struct ZSTDMT_jobAgency_s {
+    pthread_mutex_t jobAnnounce_mutex;
+    pthread_mutex_t jobApply_mutex;
+    ZSTDMT_jobDescription jobAnnounce;
+} ZSTDMT_jobAgency;
+
+/* ZSTDMT_postjob() :
+ * This function is blocking as long as previous posted job is not taken.
+ * It could be made non-blocking, with a storage queue.
+ * But blocking has benefits : on top of memory savings,
+ * the caller will be able to measure delay, allowing dynamic speed throttle (via compression level).
+ */
+static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
+{
+    DEBUGLOG(5, "starting job posting  ");
+    pthread_mutex_lock(&jobAgency->jobApply_mutex);   /* wait for a thread to take previous job */
+    DEBUGLOG(5, "job posting mutex acquired ");
+    jobAgency->jobAnnounce = job;   /* post job */
+    pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex);   /* announce */
+    DEBUGLOG(5, "job available now ");
+}
+
+static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
+{
+    pthread_mutex_lock(&jobAgency->jobAnnounce_mutex);   /* should check return code */
+    ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
+    pthread_mutex_unlock(&jobAgency->jobApply_mutex);
+    return job;
+}
+
+
+
+#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
+typedef struct ZSTDMT_bufferPool_s {
+    buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX];
+    unsigned nbBuffers;
+} ZSTDMT_bufferPool;
+
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
+{
+    if (pool->nbBuffers) {   /* try to use an existing buffer */
+        pool->nbBuffers--;
+        buffer_t const buf = pool->bTable[pool->nbBuffers];
+        size_t const availBufferSize = buf.bufferSize;
+        if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))   /* large enough, but not too much */
+            return buf;
+        free(buf.start);   /* size conditions not respected : create a new buffer */
+    }
+    /* create new buffer */
+    buffer_t buf;
+    buf.bufferSize = bSize;
+    buf.start = calloc(1, bSize);
+    return buf;
+}
+
+/* effectively store buffer for later re-use, up to pool capacity */
+static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
+{
+    if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
+        free(buf.start);
+        return;
+    }
+    pool->bTable[pool->nbBuffers++] = buf;   /* store for later re-use */
+}
+
+
+
+struct ZSTDMT_CCtx_s {
+    pthread_t pthread[ZSTDMT_NBTHREADS_MAX];
+    unsigned nbThreads;
+    ZSTDMT_jobAgency jobAgency;
+    ZSTDMT_bufferPool bufferPool;
+};
+
+static void* ZSTDMT_compressionThread(void* arg)
+{
+    if (arg==NULL) return NULL;   /* error : should not be possible */
+    ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) arg;
+    ZSTDMT_jobAgency* const jobAgency = &cctx->jobAgency;
+    ZSTDMT_bufferPool* const pool = &cctx->bufferPool;
+    for (;;) {
+        ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
+        if (job.src == NULL) {
+            DEBUGLOG(4, "thread exit  ")
+            return NULL;
+        }
+        ZSTDMT_dstBufferManager* dstBufferManager = job.dstManager;
+        size_t const dstBufferCapacity = ZSTD_compressBound(job.srcSize);
+        DEBUGLOG(4, "requesting a dstBuffer for frame %u", job.frameNumber);
+        buffer_t const dstBuffer = job.frameNumber ? ZSTDMT_getBuffer(pool, dstBufferCapacity) : ZSTDMT_getDstBuffer(dstBufferManager);  /* lack params */
+        DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
+        size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
+        if (ZSTD_isError(cSize)) return (void*)(cSize);   /* error */
+        size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame);   /* pas clair */
+        if (ZSTD_isError(writeError)) return (void*)writeError;
+        if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
+    }
+}
+
+ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
+{
+    if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
+    ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
+    if (!cctx) return NULL;
+    pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL);   /* check return value ? */
+    pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
+    pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex);   /* no job at beginning */
+    /* start all workers */
+    cctx->nbThreads = nbThreads;
+    DEBUGLOG(2, "nbThreads : %u \n", nbThreads);
+    unsigned t;
+    for (t = 0; t < nbThreads; t++) {
+        pthread_create(&cctx->pthread[t], NULL, ZSTDMT_compressionThread, cctx);  /* check return value ? */
+    }
+    return cctx;
+}
+
+size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx)
+{
+    /* free threads */
+    /* free mutex (if necessary) */
+    /* free bufferPool */
+    free(cctx);   /* incompleted ! */
+    return 0;
+}
+
+size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
+                           void* dst, size_t dstCapacity,
+                     const void* src, size_t srcSize,
+                           int compressionLevel)
+{
+    ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency;
+    ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
+    size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
+    unsigned const nbFrames = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
+    size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
+    size_t remainingSrcSize = srcSize;
+    const char* const srcStart = (const char*)src;
+    size_t frameStartPos = 0;
+    ZSTDMT_dstBufferManager dbm = ZSTDMT_createDstBufferManager(dst, dstCapacity);
+
+    DEBUGLOG(2, "windowLog : %u   => frameSizeTarget : %u      ", params.cParams.windowLog, (U32)frameSizeTarget);
+    DEBUGLOG(2, "nbFrames : %u   (size : %u bytes)   ", nbFrames, (U32)avgFrameSize);
+
+    {   unsigned u;
+        for (u=0; u<nbFrames; u++) {
+            size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
+            DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)frameSize);
+            ZSTDMT_jobDescription const job = { srcStart+frameStartPos, frameSize, compressionLevel,
+                                                &dbm, u, u==(nbFrames-1) };
+            ZSTDMT_postjob(jobAgency, job);
+            frameStartPos += frameSize;
+            remainingSrcSize -= frameSize;
+    }   }
+
+    pthread_mutex_lock(&dbm.allFramesWritten_mutex);
+    DEBUGLOG(4, "compressed size : %u  ", (U32)dbm.out.pos);
+    return dbm.out.pos;
+}
diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h
new file mode 100644 (file)
index 0000000..73ee379
--- /dev/null
@@ -0,0 +1,12 @@
+
+#include <stddef.h>   /* size_t */
+
+typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
+
+ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads);
+size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx);
+
+size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
+                           void* dst, size_t dstCapacity,
+                     const void* src, size_t srcSize,
+                           int compressionLevel);
index 8ec9fc698b1641abb4d7cb95bea0c726c0c1026f..156bf8980ae29095f6ee8900b96a390138a7bc68 100644 (file)
@@ -32,7 +32,7 @@ FLAGS    = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS)
 
 
 ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
-ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
+ZSTDCOMP_FILES := $(ZSTDDIR)/compress/*.c
 ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
 ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
 ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c
index 9a4732a314239d5f9dffa601ae0ed6c9f60b8191..4059072f0fb9ed2e7b57dbdc9a6344809cdfc233 100644 (file)
@@ -115,6 +115,7 @@ void BMK_SetBlockSize(size_t blockSize)
 
 void BMK_setDecodeOnly(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
 
+
 /* ********************************************************
 *  Bench functions
 **********************************************************/
@@ -132,6 +133,8 @@ typedef struct {
 #define MIN(a,b) ((a)<(b) ? (a) : (b))
 #define MAX(a,b) ((a)>(b) ? (a) : (b))
 
+#include "compress/zstdmt_compress.h"
+
 static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                         const char* displayName, int cLevel,
                         const size_t* fileSizes, U32 nbFiles,
@@ -153,6 +156,8 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
     U32 nbBlocks;
     UTIL_time_t ticksPerSecond;
 
+    ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(1);
+
     /* checks */
     if (!compressedBuffer || !resultBuffer || !blockTable || !ctx || !dctx)
         EXM_THROW(31, "allocation error : not enough memory");
@@ -264,6 +269,11 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                                                 blockTable[blockNb].cPtr,  blockTable[blockNb].cRoom,
                                                 blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
                                                 cdict);
+                            } else if (1) {
+                                rSize = ZSTDMT_compressCCtx(mtcctx,
+                                                blockTable[blockNb].cPtr,  blockTable[blockNb].cRoom,
+                                                blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
+                                                cLevel);
                             } else {
                                 rSize = ZSTD_compress_advanced (ctx,
                                                 blockTable[blockNb].cPtr,  blockTable[blockNb].cRoom,
@@ -292,8 +302,10 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                 memcpy(compressedBuffer, srcBuffer, loadedCompressedSize);
             }
 
-            (void)fastestD; (void)crcOrig;   /*  unused when decompression disabled */
 #if 1
+            dCompleted=1;
+            (void)totalDTime; (void)fastestD; (void)crcOrig;   /*  unused when decompression disabled */
+#else
             /* Decompression */
             if (!dCompleted) memset(resultBuffer, 0xD6, srcSize);  /* warm result buffer */