#include <stdlib.h> /* malloc, calloc, free */
#include "pool.h"
+#include <stdio.h>
+
/* ====== Compiler specifics ====== */
#if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
pthread_t *threads;
size_t numThreads;
+ size_t numThreadsBusy;
+
/* The queue is a circular buffer */
POOL_job *queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
+
+ size_t jobsQueued;
+
+ size_t marker;
+
/* The mutex protects the queue */
pthread_mutex_t queueMutex;
/* Condition variable for pushers to wait on when the queue is full */
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex);
- while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
+// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
+ while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
- if (ctx->queueHead == ctx->queueTail) {
+ if (!ctx->jobsQueued && !ctx->marker) {
pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
/* Pop a job off the queue */
- { POOL_job const job = ctx->queue[ctx->queueHead];
+ {
+ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
+ ctx->jobsQueued--;
+ ctx->numThreadsBusy++;
+ ctx->marker = 0;
/* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque);
+
+ pthread_mutex_lock(&ctx->queueMutex);
+ ctx->numThreadsBusy--;
+ pthread_mutex_unlock(&ctx->queueMutex);
}
}
/* Unreachable */
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
POOL_ctx *ctx;
/* Check the parameters */
- if (!numThreads || !queueSize) { return NULL; }
+ if (!numThreads) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
if (!ctx) { return NULL; }
ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0;
ctx->queueTail = 0;
+ ctx->numThreadsBusy = 0;
+ ctx->jobsQueued = 0;
+ ctx->marker = 0;
(void)pthread_mutex_init(&ctx->queueMutex, NULL);
(void)pthread_cond_init(&ctx->queuePushCond, NULL);
(void)pthread_cond_init(&ctx->queuePopCond, NULL);
{ POOL_job const job = {function, opaque};
/* Wait until there is space in the queue for the new job */
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
- while (ctx->queueHead == newTail && !ctx->shutdown) {
+ while (ctx->queueHead == newTail && !ctx->shutdown &&
+ (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
+ ctx->marker)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize;
}
/* The queue is still going => there is space */
if (!ctx->shutdown) {
+ if (ctx->queueSize > 1) {
+ ctx->marker = 1;
+ }
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail;
+ ctx->jobsQueued++;
}
}
pthread_mutex_unlock(&ctx->queueMutex);
size_t numThreads;
for (numThreads = 1; numThreads <= 4; ++numThreads) {
size_t queueSize;
- for (queueSize = 1; queueSize <= 2; ++queueSize) {
+ for (queueSize = 0; queueSize <= 2; ++queueSize) {
if (testOrder(numThreads, queueSize)) {
printf("FAIL: testOrder\n");
return 1;
printf("PASS: testOrder\n");
(void)argc;
(void)argv;
- return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1
- : printf("PASS: testInvalid\n"), 0;
+ return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1
+ : printf("PASS: testInvalid\n"), 0;
return 0;
}