#include <stdlib.h> /* malloc, calloc, free */
#include "pool.h"
-#include <stdio.h>
-
/* ====== Compiler specifics ====== */
#if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
pthread_t *threads;
size_t numThreads;
- size_t numThreadsBusy;
-
/* The queue is a circular buffer */
POOL_job *queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
- size_t jobsQueued;
-
- size_t marker;
+ /* The number of threads working on jobs */
+ size_t numThreadsBusy;
+ /* Indicates if the queue is empty */
+ int queueEmpty;
/* The mutex protects the queue */
pthread_mutex_t queueMutex;
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex);
-// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
- while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) {
+ while (ctx->queueEmpty && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
- if (!ctx->jobsQueued && !ctx->marker) {
+ if (ctx->queueEmpty) {
pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
{
POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
- ctx->jobsQueued--;
ctx->numThreadsBusy++;
- ctx->marker = 0;
+ ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
ctx->queueHead = 0;
ctx->queueTail = 0;
ctx->numThreadsBusy = 0;
- ctx->jobsQueued = 0;
- ctx->marker = 0;
+ ctx->queueEmpty = 1;
(void)pthread_mutex_init(&ctx->queueMutex, NULL);
(void)pthread_cond_init(&ctx->queuePushCond, NULL);
(void)pthread_cond_init(&ctx->queuePopCond, NULL);
pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque};
- /* Wait until there is space in the queue for the new job */
+
+ // Wait until there is space in the queue for the new job.
+ // If the ctx->queueSize is 1 (the pool was created with an
+ // intended queueSize of 0) and there is no job already waiting,
+ // wait until there is a thread free for the new job.
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
while (ctx->queueHead == newTail && !ctx->shutdown &&
(ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
- ctx->marker)) {
+ !ctx->queueEmpty)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize;
}
/* The queue is still going => there is space */
if (!ctx->shutdown) {
- if (ctx->queueSize > 1) {
- ctx->marker = 1;
- }
+ ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail;
- ctx->jobsQueued++;
}
}
pthread_mutex_unlock(&ctx->queueMutex);