]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Changed : input divided into roughly equal parts.
authorYann Collet <cyan@fb.com>
Thu, 29 Dec 2016 00:24:01 +0000 (01:24 +0100)
committerYann Collet <cyan@fb.com>
Thu, 29 Dec 2016 00:24:01 +0000 (01:24 +0100)
Debug : can measure time waiting for mutexes to unlock.

lib/compress/zstdmt_compress.c
programs/bench.c

index 0f14dbf3168fc039d912f1febcd860333a836ba3..c86be870153d9848aaef41b9c855bc646c68d930 100644 (file)
@@ -5,12 +5,41 @@
 
 #if 0
 #  include <stdio.h>
-   static unsigned g_debugLevel = 4;
+#  include <unistd.h>
+#  include <sys/times.h>
+   static unsigned g_debugLevel = 2;
 #  define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+
+static unsigned long long GetCurrentClockTimeMicroseconds()
+{
+   static clock_t _ticksPerSecond = 0;
+   if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
+
+   struct tms junk; clock_t newTicks = (clock_t) times(&junk);
+   return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
+}
+
+#define MUTEX_WAIT_TIME_DLEVEL 5
+#define PTHREAD_MUTEX_LOCK(mutex) \
+if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
+   unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \
+   pthread_mutex_lock(mutex); \
+   unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
+   unsigned long long elapsedTime = (afterTime-beforeTime); \
+   if (elapsedTime > 1000) {  /* or whatever threshold you like; I'm using 1 millisecond here */ \
+      DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \
+                (long int) pthread_self(), elapsedTime, #mutex); \
+  } \
+} else pthread_mutex_lock(mutex);
+
 #else
+
 #  define DEBUGLOG(l, ...)   /* disabled */
+#  define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
+
 #endif
 
+
 #define ZSTDMT_NBTHREADS_MAX 128
 #define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
 
@@ -38,8 +67,9 @@ static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t d
     dbm.out.pos = 0;
     dbm.frameIDToWrite = 0;
     pthread_mutex_init(&dbm.frameTable_mutex, NULL);
-    pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
-    pthread_mutex_lock(&dbm.allFramesWritten_mutex);  /* maybe could be merged into init ? */
+    pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex;
+    pthread_mutex_init(allFramesWritten_mutex, NULL);
+    PTHREAD_MUTEX_LOCK(allFramesWritten_mutex);  /* maybe could be merged into init ? */
     dbm.nbStackedFrames = 0;
     return dbm;
 }
@@ -89,7 +119,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
 
     /* check if correct frame ordering; stack otherwise */
     DEBUGLOG(5, "considering writing frame %u ", frameID);
-    pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+    PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
     if (frameID != dstBufferManager->frameIDToWrite) {
         DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u  ", frameID, dstBufferManager->frameIDToWrite);
         frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
@@ -112,7 +142,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
     lastFrameWritten = isLastFrame;
 
     /* check if more frames are stacked */
-    pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+    PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
     unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
     while (frameWritten) {
         unsigned u;
@@ -127,7 +157,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
                 lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
                 dstBufferManager->frameIDToWrite = frameID+1;
                 /* remove frame from stack */
-                pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+                PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
                 dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
                 dstBufferManager->nbStackedFrames -= 1;
                 frameWritten = dstBufferManager->nbStackedFrames>0;
@@ -166,7 +196,7 @@ typedef struct ZSTDMT_jobAgency_s {
 static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
 {
     DEBUGLOG(5, "starting job posting  ");
-    pthread_mutex_lock(&jobAgency->jobApply_mutex);   /* wait for a thread to take previous job */
+    PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex);   /* wait for a thread to take previous job */
     DEBUGLOG(5, "job posting mutex acquired ");
     jobAgency->jobAnnounce = job;   /* post job */
     pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex);   /* announce */
@@ -175,7 +205,7 @@ static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription jo
 
 static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
 {
-    pthread_mutex_lock(&jobAgency->jobAnnounce_mutex);   /* should check return code */
+    PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex);   /* should check return code */
     ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
     pthread_mutex_unlock(&jobAgency->jobApply_mutex);
     return job;
@@ -192,7 +222,7 @@ typedef struct ZSTDMT_bufferPool_s {
 
 static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
 {
-    pthread_mutex_lock(&pool->bufferPool_mutex);
+    PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
     if (pool->nbBuffers) {   /* try to use an existing buffer */
         pool->nbBuffers--;
         buffer_t const buf = pool->bTable[pool->nbBuffers];
@@ -213,7 +243,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
 /* effectively store buffer for later re-use, up to pool capacity */
 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
 {
-    pthread_mutex_lock(&pool->bufferPool_mutex);
+    PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
     if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
         pthread_mutex_unlock(&pool->bufferPool_mutex);
         free(buf.start);
@@ -240,6 +270,7 @@ static void* ZSTDMT_compressionThread(void* arg)
     ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
     ZSTD_CCtx* const cctx = ZSTD_createCCtx();
     if (cctx==NULL) return NULL;   /* allocation failure : thread not started */
+    DEBUGLOG(3, "thread %li created  ", (long int)pthread_self());
     for (;;) {
         ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
         if (job.src == NULL) {
@@ -254,7 +285,7 @@ static void* ZSTDMT_compressionThread(void* arg)
         DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
         //size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
         size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
-        if (ZSTD_isError(cSize)) return (void*)(cSize);   /* error */
+        if (ZSTD_isError(cSize)) return (void*)(cSize);   /* error - find a better way */
         size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame);   /* pas clair */
         if (ZSTD_isError(writeError)) return (void*)writeError;
         if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
@@ -269,7 +300,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     /* init jobAgency */
     pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL);   /* check return value ? */
     pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
-    pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex);   /* no job at beginning */
+    PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex);   /* no job at beginning */
     /* init bufferPool */
     pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
     /* start all workers */
@@ -299,7 +330,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
     ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency;
     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
     size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
-    unsigned const nbFrames = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
+    unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
+    unsigned const nbFrames = MIN(nbFramesMax, cctx->nbThreads);
     size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
     size_t remainingSrcSize = srcSize;
     const char* const srcStart = (const char*)src;
@@ -320,7 +352,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
             remainingSrcSize -= frameSize;
     }   }
 
-    pthread_mutex_lock(&dbm.allFramesWritten_mutex);
+    PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex);
     DEBUGLOG(4, "compressed size : %u  ", (U32)dbm.out.pos);
     return dbm.out.pos;
 }
index 6009ebc7b5b31917121600da6dd1942f76d8673f..b5cc77eed0414f57721e4952fdb88230886b1d4c 100644 (file)
@@ -159,8 +159,6 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
     U32 nbBlocks;
     UTIL_time_t ticksPerSecond;
 
-    ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads);
-
     /* checks */
     if (!compressedBuffer || !resultBuffer || !blockTable || !ctx || !dctx)
         EXM_THROW(31, "allocation error : not enough memory");
@@ -228,6 +226,8 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
         const char* const marks[NB_MARKS] = { " |", " /", " =",  "\\" };
         U32 markNb = 0;
 
+        ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads);
+
         UTIL_getTime(&coolTime);
         DISPLAYLEVEL(2, "\r%79s\r", "");
         while (!cCompleted || !dCompleted) {