#if 0
# include <stdio.h>
- static unsigned g_debugLevel = 4;
+# include <unistd.h>
+# include <sys/times.h>
+ static unsigned g_debugLevel = 2;
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+
+static unsigned long long GetCurrentClockTimeMicroseconds()
+{
+ static clock_t _ticksPerSecond = 0;
+ if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
+
+ struct tms junk; clock_t newTicks = (clock_t) times(&junk);
+ return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
+}
+
+#define MUTEX_WAIT_TIME_DLEVEL 5
+#define PTHREAD_MUTEX_LOCK(mutex) \
+if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
+ unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \
+ pthread_mutex_lock(mutex); \
+ unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
+ unsigned long long elapsedTime = (afterTime-beforeTime); \
+ if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
+ DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \
+ (long int) pthread_self(), elapsedTime, #mutex); \
+ } \
+} else pthread_mutex_lock(mutex);
+
#else
+
# define DEBUGLOG(l, ...) /* disabled */
+# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
+
#endif
+
#define ZSTDMT_NBTHREADS_MAX 128
#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
dbm.out.pos = 0;
dbm.frameIDToWrite = 0;
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
- pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
- pthread_mutex_lock(&dbm.allFramesWritten_mutex); /* maybe could be merged into init ? */
+ pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex;
+ pthread_mutex_init(allFramesWritten_mutex, NULL);
+ PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */
dbm.nbStackedFrames = 0;
return dbm;
}
/* check if correct frame ordering; stack otherwise */
DEBUGLOG(5, "considering writing frame %u ", frameID);
- pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+ PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
if (frameID != dstBufferManager->frameIDToWrite) {
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
lastFrameWritten = isLastFrame;
/* check if more frames are stacked */
- pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+ PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
while (frameWritten) {
unsigned u;
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
dstBufferManager->frameIDToWrite = frameID+1;
/* remove frame from stack */
- pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
+ PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
dstBufferManager->nbStackedFrames -= 1;
frameWritten = dstBufferManager->nbStackedFrames>0;
static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
{
DEBUGLOG(5, "starting job posting ");
- pthread_mutex_lock(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */
+ PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */
DEBUGLOG(5, "job posting mutex acquired ");
jobAgency->jobAnnounce = job; /* post job */
pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */
static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
{
- pthread_mutex_lock(&jobAgency->jobAnnounce_mutex); /* should check return code */
+ PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */
ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
pthread_mutex_unlock(&jobAgency->jobApply_mutex);
return job;
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
{
- pthread_mutex_lock(&pool->bufferPool_mutex);
+ PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
if (pool->nbBuffers) { /* try to use an existing buffer */
pool->nbBuffers--;
buffer_t const buf = pool->bTable[pool->nbBuffers];
/* effectively store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
{
- pthread_mutex_lock(&pool->bufferPool_mutex);
+ PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
pthread_mutex_unlock(&pool->bufferPool_mutex);
free(buf.start);
ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
if (cctx==NULL) return NULL; /* allocation failure : thread not started */
+ DEBUGLOG(3, "thread %li created ", (long int)pthread_self());
for (;;) {
ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
if (job.src == NULL) {
DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
//size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
- if (ZSTD_isError(cSize)) return (void*)(cSize); /* error */
+ if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */
size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */
if (ZSTD_isError(writeError)) return (void*)writeError;
if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
/* init jobAgency */
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
- pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
+ PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
/* init bufferPool */
pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
/* start all workers */
ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency;
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
- unsigned const nbFrames = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
+ unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
+ unsigned const nbFrames = MIN(nbFramesMax, cctx->nbThreads);
size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
size_t remainingSrcSize = srcSize;
const char* const srcStart = (const char*)src;
remainingSrcSize -= frameSize;
} }
- pthread_mutex_lock(&dbm.allFramesWritten_mutex);
+ PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex);
DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos);
return dbm.out.pos;
}