}
-/* note : only works if no job is running !
- * return : 1 on success, 0 on failure */
-static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
+/* note : only works if no job is running ! */
+static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
- if (ctx->numThreadsBusy > 0) return 0;
+ if (ctx->numThreadsBusy > 0) return NULL;
if (numThreads <= ctx->threadCapacity) {
ctx->threadLimit = numThreads;
- return 1;
+ return ctx;
}
/* numThreads > threadCapacity */
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
- if (!threadPool) return 0;
+ if (!threadPool) return NULL;
+ /* replace existing thread pool */
+ memcpy(threadPool, ctx->threads, ctx->threadCapacity);
+ ZSTD_free(ctx->threads, ctx->customMem);
+ ctx->threads = threadPool;
/* Initialize additional threads */
{ size_t threadId;
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
break;
} }
if (threadId != numThreads) { /* not all threads successfully init */
- /* how to destroy existing threads ? */
- /* POOL_join destroy all existing threads, not just newly created ones */
- return 0;
- }
- }
- /* replace existing thread pool */
- memcpy(threadPool, ctx->threads, ctx->threadCapacity);
- ZSTD_free(ctx->threads, ctx->customMem);
- ctx->threads = threadPool;
- }
+ ctx->threadCapacity = threadId;
+ return NULL; /* will release the pool */
+ } } }
+ /* successfully expanded */
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
- return 1;
+ return ctx;
}
-/* return : 1 on success, 0 on failure */
-int POOL_resize(POOL_ctx* ctx, size_t numThreads)
+/* @return : a working pool on success, NULL on failure
+ * note : starting context is considered consumed. */
+POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads)
{
- int result;
- if (!ctx) return 0;
+ POOL_ctx* newCtx;
+ if (ctx==NULL) return NULL;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
- result = POOL_resize_internal(ctx, numThreads);
+ newCtx = POOL_resize_internal(ctx, numThreads);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
- return result;
+ if (newCtx!=ctx) POOL_free(ctx);
+ return newCtx;
}
/**
(void)ctx;
}
+POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads) {
+ (void)numThreads;
+ return ctx;
+}
+
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
*/
void POOL_free(POOL_ctx* ctx);
+/*! POOL_resize() :
+ * Expands or shrinks pool's number of threads.
+ * This is more efficient than releasing and creating a new context.
+ * @return : a new pool context on success, NULL on failure
+ * note : new pool context might have same address as original one, but it's not guaranteed.
+ * consider starting context as consumed, only rely on returned one.
+ * note 2 : only numThreads can be resized, queueSize is unchanged.
+ */
+POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);
+
/*! POOL_sizeof() :
* @return threadpool memory usage
* note : compatible with NULL (returns 0 in this case)