}
-/* @return : a working pool on success, NULL on failure
- * note : starting context is considered consumed. */
-static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
+/* @return : 0 on success, 1 on error */
+static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (numThreads <= ctx->threadCapacity) {
- if (!numThreads) return NULL;
+ if (!numThreads) return 1;
ctx->threadLimit = numThreads;
- return ctx;
+ return 0;
}
/* numThreads > threadCapacity */
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
- if (!threadPool) return NULL;
+ if (!threadPool) return 1;
/* replace existing thread pool */
- memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(ctx->threads[0]));
+ memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
ZSTD_free(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
/* Initialize additional threads */
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
ctx->threadCapacity = threadId;
- ctx->threadLimit = threadId;
- return NULL; /* will release the pool */
+ return 1;
} }
} }
/* successfully expanded */
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
- return ctx;
+ return 0;
}
-/* @return : a working pool on success, NULL on failure
- * note : starting context is considered consumed. */
-POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads)
+/* @return : 0 on success, 1 on error */
+int POOL_resize(POOL_ctx* ctx, size_t numThreads)
{
- if (ctx==NULL) return NULL;
+ int result;
+ if (ctx==NULL) return 1;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
- { POOL_ctx* const newCtx = POOL_resize_internal(ctx, numThreads);
- if (newCtx!=ctx) {
- POOL_free(ctx);
- return newCtx;
- } }
+ result = POOL_resize_internal(ctx, numThreads);
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
- return ctx;
+ return result;
}
/**
(void)ctx;
}
-POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads) {
- (void)numThreads;
- return ctx;
+int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
+ (void)ctx; (void)numThreads;
+ return 0;
}
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
/*! POOL_resize() :
* Expands or shrinks pool's number of threads.
- * This is more efficient than releasing and creating a new context.
- * @return : a new pool context on success, NULL on failure
- * note : new pool context might have same address as original one, but it's not guaranteed.
- * consider starting context as consumed, only rely on returned one.
- * note 2 : only numThreads can be resized, queueSize is unchanged.
- * note 3 : `numThreads` must be at least 1
+ * This is more efficient than releasing + creating a new context,
+ * since it tries to preserve and re-use existing threads.
+ * `numThreads` must be at least 1.
+ * @return : 0 when resize was successful,
+ * !0 (typically 1) if there is an error.
+ * note : only numThreads can be resized, queueSize remains unchanged.
*/
-POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);
+int POOL_resize(POOL_ctx* ctx, size_t numThreads);
/*! POOL_sizeof() :
* @return threadpool memory usage
* @return : error code if fails, 0 on success */
static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{
- mtctx->factory = POOL_resize(mtctx->factory, nbWorkers);
- if (mtctx->factory == NULL) return ERROR(memory_allocation);
+ if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
ZSTD_pthread_mutex_unlock(&test.mut);
time4threads = UTIL_clockSpanNano(startTime);
- ctx = POOL_resize(ctx, 2/*nbThreads*/);
- ASSERT_TRUE(ctx);
+ ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 );
test.val = 0;
startTime = UTIL_getTime();
{ int i;
static int testThreadReduction(void) {
int result;
poolTest_t test;
- POOL_ctx* ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
+ POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
ASSERT_TRUE(ctx);
{
int const nbWaits = 16;
- POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
+ POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
ASSERT_TRUE(ctx);
test.val = 0;
for (i=0; i<nbWaits; i++)
POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
}
- ctx = POOL_resize(ctx, 1 /*numThreads*/); /* downsize numThreads, to try to break end condition */
+ ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */
POOL_free(ctx); /* must finish all jobs in queue before giving back control */
ASSERT_EQ(test.val, nbWaits);