abrupt end + downsizing with running jobs remaining in queue.
also : POOL_resize() requires numThreads >= 1
while ( ctx->queueEmpty
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
- if (ctx->shutdown) {
- ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
- return opaque;
- }
+ if (ctx->shutdown) {
+ /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
+ * a few threads will be shutdown while !queueEmpty,
+ * but enough threads will remain active to finish the queue */
+ ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+ return opaque;
+ }
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* Pop a job off the queue */
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
} /* for (;;) */
- assert(0); /* Unreachable */
+ assert(0); /* Unreachable */
}
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
}
-/* note : only works if no job is running ! */
+/* @return : a working pool on success, NULL on failure
+ * note : starting context is considered consumed. */
static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (numThreads <= ctx->threadCapacity) {
+ if (!numThreads) return NULL;
ctx->threadLimit = numThreads;
return ctx;
}
* note : new pool context might have same address as original one, but it's not guaranteed.
* consider starting context as consumed, only rely on returned one.
* note 2 : only numThreads can be resized, queueSize is unchanged.
+ * note 3 : `numThreads` must be at least 1
*/
POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);
void waitIncFn(void *opaque) {
abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
- UTIL_sleepMilli(1);
+ UTIL_sleepMilli(10);
ZSTD_pthread_mutex_lock(&test->mut);
test->val = test->val + 1;
ZSTD_pthread_mutex_unlock(&test->mut);
{
int const nbWaits = 16;
- POOL_ctx* const ctx = POOL_create(2 /*numThreads*/, nbWaits /*queueSize*/);
+ POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
ASSERT_TRUE(ctx);
test.val = 0;
{ int i;
for (i=0; i<nbWaits; i++)
- POOL_add(ctx, &waitLongFn, &test); /* all jobs either processed on in the queue */
+ POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
}
+ ctx = POOL_resize(ctx, 1 /*numThreads*/); /* downsize numThreads, to try to break end condition */
POOL_free(ctx); /* must finish all jobs in queue before giving back control */
ASSERT_EQ(test.val, nbWaits);