-[threadpool]
-;initial_size = 5 ; Initial size of the threadpool.
-; ; 0 means the threadpool has no threads initially
-; ; until a task needs a thread.
-;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before
-; ; dying. 0 means threads never time out.
-;max_size = 50 ; Maximum number of threads in the Stasis threadpool.
-; ; 0 means no limit to the number of threads in the
-; ; threadpool.
+[taskpool]
+;minimum_size = 5 ; Minimum number of taskprocessors in taskpool.
+;initial_size = 5 ; Initial size of the taskpool.
+; ; 0 means the taskpool has no taskprocessors initially
+; ; until a task needs a taskprocessor.
+;idle_timeout_sec = 20 ; Number of seconds a taskprocessor should be idle before
+; ; dying. 0 means taskprocessors never time out.
+;max_size = 50 ; Maximum number of taskprocessors in the Stasis taskpool.
+; ; 0 means no limit to the number of taskprocessors in the
+; ; taskpool.
[declined_message_types]
; This config section contains the names of message types that should be prevented
int ast_file_init(void); /*!< Provided by file.c */
void ast_autoservice_init(void); /*!< Provided by autoservice.c */
int ast_tps_init(void); /*!< Provided by taskprocessor.c */
+int ast_taskpool_init(void); /*!< Provided by taskpool.c */
int ast_timing_init(void); /*!< Provided by timing.c */
void ast_stun_init(void); /*!< Provided by stun.c */
int ast_ssl_init(void); /*!< Provided by ssl.c */
#define _AST_SERIALIZER_H
struct ast_threadpool;
+struct ast_taskpool;
/*!
* Maintains a named pool of thread pooled taskprocessors. Also if configured
struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
unsigned int size, struct ast_threadpool *threadpool, int timeout);
+/*!
+ * \brief Create a serializer pool on taskpool.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * Create a serializer pool with an optional shutdown group. If a timeout greater
+ * than -1 is specified then a shutdown group is enabled on the pool.
+ *
+ * \param name The base name for the pool, and used when building taskprocessor(s)
+ * \param size The size of the pool
+ * \param taskpool The backing taskpool to use
+ * \param timeout The timeout used if using a shutdown group (-1 = disabled)
+ *
+ * \return A newly allocated serializer pool object
+ * \retval NULL on error
+ */
+ struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name,
+ unsigned int size, struct ast_taskpool *taskpool, int timeout);
+
/*!
* \brief Retrieve the base name of the serializer pool.
*
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2012-2013, Digium, Inc.
+ *
+ * Mark Michelson <mmmichelson@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+
+#ifndef _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H
+#define _ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H
+
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Create a serializer group shutdown control object.
+ * \since 13.5.0
+ *
+ * \return ao2 object to control shutdown of a serializer group.
+ */
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void);
+
+/*!
+ * \brief Wait for the serializers in the group to shutdown with timeout.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL)
+ * \param timeout Number of seconds to wait for the serializers in the group to shutdown.
+ * Zero if the timeout is disabled.
+ *
+ * \return Number of serializers that did not get shutdown within the timeout.
+ */
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout);
+
+/*!
+ * \brief Increment the number of serializer members in the group.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ */
+ void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group);
+
+ /*!
+ * \brief Decrement the number of serializer members in the group.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ */
+void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group);
+
+#endif /* ASTERISK_SERIALIZER_SHUTDOWN_GROUP_H */
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2025, Sangoma Technologies Corporation
+ *
+ * Joshua C. Colp <jcolp@sangoma.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ * \ref Taskpool
+ *
+ * \page Taskpool API providing queued task execution across threads.
+
+The taskpool API is a specialized API for the queueing of tasks
+in a synchronous or asynchronous manner, to be executed across
+a pool of threads. For cases where serialization is needed a
+serializer API is also provided ensuring that tasks queued to
+the serializer are executed in a serialized fashion within the
+taskpool.
+
+On creation of a taskpool various options can be set and used to
+control the operation of the pool. This includes how many taskprocessors
+are present, whether the pool can grow, whether the pool can shrink,
+and how long idle taskprocessors should exist before being terminated.
+This provides flexibility based on the specific needs of the user of
+the taskpool and the environment.
+
+The queueing of tasks to the taskpool is done using a selector. The
+selector examines the available taskprocessors and decides which one
+to queue the task to. This operation can also examine the state of
+the pool to see if it needs to grow and if enabled and possible does so.
+
+The taskpool API is preferred for many cases over the use of the
+threadpool due to the far lower overhead involved. Taskpools require
+no additional thread or task queue for management of the pool itself and
+the act of queueing tasks, the most common operation, is written to be as
+simple and minimal as possible. Threadpools are best used for long
+running tasks and operations.
+
+*/
+
+#ifndef _ASTERISK_TASKPOOL_H
+#define _ASTERISK_TASKPOOL_H
+
+struct ast_taskpool;
+struct ast_taskprocessor;
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Selectors for choosing which taskprocessor in a pool to use
+ */
+enum ast_taskpool_selector {
+ AST_TASKPOOL_SELECTOR_DEFAULT = 0, /* The selector that is generally the best for most use cases */
+ AST_TASKPOOL_SELECTOR_LEAST_FULL, /* Select the least full taskprocessor */
+ AST_TASKPOOL_SELECTOR_SEQUENTIAL, /* Select taskprocessors in a sequential manner */
+};
+
+struct ast_taskpool_options {
+#define AST_TASKPOOL_OPTIONS_VERSION 1
+ /*! Version of taskpool options in use */
+ int version;
+ /*!
+ * \brief The selector to use for choosing a taskprocessor
+ */
+ enum ast_taskpool_selector selector;
+ /*!
+ * \brief Time limit in seconds for idle dynamic taskprocessors
+ *
+ * A time of 0 or less will mean no timeout.
+ */
+ int idle_timeout;
+ /*!
+ * \brief Number of taskprocessors to increment the pool by
+ */
+ int auto_increment;
+ /*!
+ * \brief Number of taskprocessors that will always exist
+ *
+ * Zero is a valid value if the taskpool will never have taskprocessors
+ * that always exist, allowing the pool to drop to zero if not used.
+ */
+ int minimum_size;
+ /*!
+ * \brief Number of taskprocessors the pool will start with
+ *
+ * Zero is a valid value if the taskpool should start
+ * without any taskprocessors allocated.
+ *
+ * \note This must be equal to or greater than the minimum_size,
+ * otherwise the taskpool will adjust this to the minimum_size.
+ */
+ int initial_size;
+ /*!
+ * \brief Maximum number of taskprocessors a pool may have
+ *
+ * When the taskpool's size increases, it can never increase
+ * beyond this number of taskprocessors.
+ *
+ * Zero is a valid value if the taskpool does not have a
+ * maximum size for taskprocessors.
+ *
+ * \note This must be equal to or greater than the initial_size,
+ * otherwise the taskpool will adjust this to the initial_size.
+ */
+ int max_size;
+ /*!
+ * \brief The threshold for when to grow the pool
+ *
+ * This is the number of tasks that must be in queue before the pool will grow.
+ *
+ * \note If not specified a default of the 50% of the high water threshold defined
+ * in taskprocessor.h will be used.
+ */
+ int growth_threshold;
+ /*!
+ * \brief Function to call when a taskprocessor starts
+ *
+ * This is useful if there is something common that all
+ * taskprocessors in a taskpool need to do when they start.
+ */
+ void (*thread_start)(void);
+ /*!
+ * \brief Function to call when a taskprocessor ends
+ *
+ * This is useful if there is common cleanup to execute when
+ * a taskprocessor completes
+ */
+ void (*thread_end)(void);
+};
+
+/*!
+ * \brief Create a new taskpool
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * This function creates a taskpool. Tasks may be pushed onto this task pool
+ * and will be automatically acted upon by taskprocessors within the pool.
+ *
+ * Only a single taskpool with a given name may exist. This function will fail
+ * if a taskpool with the given name already exists.
+ *
+ * \param name The unique name for the taskpool
+ * \param options The behavioral options for this taskpool
+ * \retval NULL Failed to create the taskpool
+ * \retval non-NULL The newly-created taskpool
+ *
+ * \note The \ref ast_taskpool_shutdown function must be called to shut down the
+ * taskpool and clean up underlying resources fully.
+ */
+struct ast_taskpool *ast_taskpool_create(const char *name,
+ const struct ast_taskpool_options *options);
+
+/*!
+ * \brief Get the current number of taskprocessors in the taskpool
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param pool The taskpool to query
+ * \retval The number of taskprocessors in the taskpool
+ */
+size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool);
+
+/*!
+ * \brief Get the current number of queued tasks in the taskpool
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param pool The taskpool to query
+ * \retval The number of queued tasks in the taskpool
+ */
+long ast_taskpool_queue_size(struct ast_taskpool *pool);
+
+/*!
+ * \brief Push a task to the taskpool
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * Tasks pushed into the taskpool will be automatically taken by
+ * one of the taskprocessors within
+ * \param pool The taskpool to add the task to
+ * \param task The task to add
+ * \param data The parameter for the task
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+ attribute_warn_unused_result;
+
+/*!
+ * \brief Push a task to the taskpool, and wait for completion
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * Tasks pushed into the taskpool will be automatically taken by
+ * one of the taskprocessors within
+ * \param pool The taskpool to add the task to
+ * \param task The task to add
+ * \param data The parameter for the task
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+ attribute_warn_unused_result;
+
+/*!
+ * \brief Shut down a taskpool and remove the underlying taskprocessors
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param pool The pool to shut down
+ *
+ * \note This will decrement the reference to the pool
+ */
+void ast_taskpool_shutdown(struct ast_taskpool *pool);
+
+/*!
+ * \brief Get the taskpool serializer currently associated with this thread.
+ *
+ * \note The returned pointer is valid while the serializer
+ * thread is running.
+ *
+ * \note Use ao2_ref() on serializer if you are going to keep it
+ * for another thread. To unref it you must then use
+ * ast_taskprocessor_unreference().
+ *
+ * \retval serializer on success.
+ * \retval NULL on error or no serializer associated with the thread.
+ */
+ struct ast_taskprocessor *ast_taskpool_serializer_get_current(void);
+
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_taskpool.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a taskprocessor from a \ref ast_taskpool.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relies on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_taskpool for execution.
+ *
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \retval NULL on error.
+ */
+struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool);
+
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_taskpool.
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a taskprocessor from a \ref ast_taskpool.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relies on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_taskpool for execution.
+ * \param shutdown_group Group shutdown controller. (NULL if no group association)
+ *
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \retval NULL on error.
+ */
+struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
+ struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+
+/*!
+ * \brief Push a task to a serializer, and wait for completion
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param serializer The serializer to add the task to
+ * \param task The task to add
+ * \param data The parameter for the task
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data);
+
+#endif /* ASTERISK_TASKPOOL_H */
*/
long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+/*!
+ * \brief Return the listener associated with the taskprocessor
+ */
+struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps);
+
/*!
* \brief Get the current taskprocessor high water alert count.
* \since 13.10.0
struct ast_taskprocessor;
struct ast_threadpool_listener;
+#include "asterisk/serializer_shutdown_group.h"
+
struct ast_threadpool_listener_callbacks {
/*!
* \brief Indicates that the state of threads in the pool has changed
*/
void ast_threadpool_shutdown(struct ast_threadpool *pool);
-struct ast_serializer_shutdown_group;
-
-/*!
- * \brief Create a serializer group shutdown control object.
- * \since 13.5.0
- *
- * \return ao2 object to control shutdown of a serializer group.
- */
-struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void);
-
-/*!
- * \brief Wait for the serializers in the group to shutdown with timeout.
- * \since 13.5.0
- *
- * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL)
- * \param timeout Number of seconds to wait for the serializers in the group to shutdown.
- * Zero if the timeout is disabled.
- *
- * \return Number of serializers that did not get shutdown within the timeout.
- */
-int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout);
-
/*!
* \brief Get the threadpool serializer currently associated with this thread.
* \since 14.0.0
check_init(ast_utils_init(), "Utilities");
check_init(ast_tps_init(), "Task Processor Core");
+ check_init(ast_taskpool_init(), "Taskpool Support");
check_init(ast_fd_init(), "File Descriptor Debugging");
check_init(ast_pbx_init(), "ast_pbx_init");
check_init(aco_init(), "Configuration Option Framework");
#include "asterisk/serializer.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"
+#include "asterisk/serializer_shutdown_group.h"
struct ast_serializer_pool {
/*! Shutdown group to monitor serializers. */
return pool;
}
+struct ast_serializer_pool *ast_serializer_taskpool_create(const char *name,
+ unsigned int size, struct ast_taskpool *taskpool, int timeout)
+{
+ struct ast_serializer_pool *pool;
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ size_t idx;
+
+ ast_assert(size > 0);
+
+ pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
+ if (!pool) {
+ return NULL;
+ }
+
+ strcpy(pool->name, name); /* safe */
+
+ pool->shutdown_group_timeout = timeout;
+ pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
+
+ AST_VECTOR_RW_INIT(&pool->serializers, size);
+
+ for (idx = 0; idx < size; ++idx) {
+ struct ast_taskprocessor *tps;
+
+ /* Create name with seq number appended. */
+ ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
+
+ tps = ast_taskpool_serializer_group(tps_name, taskpool, pool->shutdown_group);
+ if (!tps) {
+ ast_serializer_pool_destroy(pool);
+ ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
+ tps_name);
+ return NULL;
+ }
+
+ if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
+ ast_serializer_pool_destroy(pool);
+ ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
+ tps_name);
+ return NULL;
+ }
+ }
+
+ return pool;
+}
+
const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
{
return pool->name;
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2012-2013, Digium, Inc.
+ *
+ * Mark Michelson <mmmichelson@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+
+#include "asterisk.h"
+
+#include "asterisk/serializer_shutdown_group.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/time.h"
+#include "asterisk/utils.h"
+
+/*! Serializer group shutdown control object. */
+struct ast_serializer_shutdown_group {
+ /*! Shutdown thread waits on this conditional. */
+ ast_cond_t cond;
+ /*! Count of serializers needing to shutdown. */
+ int count;
+};
+
+static void serializer_shutdown_group_dtor(void *vdoomed)
+{
+ struct ast_serializer_shutdown_group *doomed = vdoomed;
+
+ ast_cond_destroy(&doomed->cond);
+}
+
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
+{
+ struct ast_serializer_shutdown_group *shutdown_group;
+
+ shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
+ if (!shutdown_group) {
+ return NULL;
+ }
+ ast_cond_init(&shutdown_group->cond, NULL);
+ return shutdown_group;
+}
+
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
+{
+ int remaining;
+ ast_mutex_t *lock;
+
+ if (!shutdown_group) {
+ return 0;
+ }
+
+ lock = ao2_object_get_lockaddr(shutdown_group);
+ ast_assert(lock != NULL);
+
+ ao2_lock(shutdown_group);
+ if (timeout) {
+ struct timeval start;
+ struct timespec end;
+
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + timeout;
+ end.tv_nsec = start.tv_usec * 1000;
+ while (shutdown_group->count) {
+ if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
+ /* Error or timed out waiting for the count to reach zero. */
+ break;
+ }
+ }
+ } else {
+ while (shutdown_group->count) {
+ if (ast_cond_wait(&shutdown_group->cond, lock)) {
+ /* Error */
+ break;
+ }
+ }
+ }
+ remaining = shutdown_group->count;
+ ao2_unlock(shutdown_group);
+ return remaining;
+}
+
+void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
+{
+ ao2_lock(shutdown_group);
+ ++shutdown_group->count;
+ ao2_unlock(shutdown_group);
+}
+
+void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
+{
+ ao2_lock(shutdown_group);
+ --shutdown_group->count;
+ if (!shutdown_group->count) {
+ ast_cond_signal(&shutdown_group->cond);
+ }
+ ao2_unlock(shutdown_group);
+}
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
<synopsis>Maximum number of threads in the threadpool.</synopsis>
</configOption>
</configObject>
+ <configObject name="taskpool">
+ <since>
+ <version>24.0.0</version>
+ </since>
+ <synopsis>Settings that configure the taskpool Stasis uses to deliver some messages.</synopsis>
+ <configOption name="minimum_size" default="5">
+ <since>
+ <version>24.0.0</version>
+ </since>
+ <synopsis>Minimum number of taskprocessors in the message bus taskpool.</synopsis>
+ </configOption>
+ <configOption name="initial_size" default="5">
+ <since>
+ <version>24.0.0</version>
+ </since>
+ <synopsis>Initial number of taskprocessors in the message bus taskpool.</synopsis>
+ </configOption>
+ <configOption name="idle_timeout_sec" default="20">
+ <since>
+ <version>24.0.0</version>
+ </since>
+ <synopsis>Number of seconds before an idle taskprocessor is disposed of.</synopsis>
+ </configOption>
+ <configOption name="max_size" default="50">
+ <since>
+ <version>24.0.0</version>
+ </since>
+ <synopsis>Maximum number of taskprocessors in the taskpool.</synopsis>
+ </configOption>
+ </configObject>
<configObject name="declined_message_types">
<since>
<version>13.0.0</version>
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
-/*! Thread pool for topics that don't want a dedicated taskprocessor */
-static struct ast_threadpool *threadpool;
+/*! Taskpool for topics that don't want a dedicated taskprocessor */
+static struct ast_taskpool *taskpool;
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
int messages_passed;
/*! \brief Using a mailbox to queue messages */
int uses_mailbox;
- /*! \brief Using stasis threadpool for handling messages */
- int uses_threadpool;
+ /*! \brief Using stasis taskpool for handling messages */
+ int uses_taskpool;
/*! \brief The line number where the subscription originates */
int lineno;
/*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
}
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
- int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+ int needs_mailbox, int use_taskpool, const char *file, int lineno,
const char *func)
{
struct stasis_subscription_statistics *statistics;
statistics->lineno = lineno;
statistics->func = func;
statistics->uses_mailbox = needs_mailbox;
- statistics->uses_threadpool = use_thread_pool;
+ statistics->uses_taskpool = use_taskpool;
strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
statistics->sub = sub;
ao2_link(subscription_stats, statistics);
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
- int use_thread_pool,
+ int use_taskpool,
const char *file,
int lineno,
const char *func)
#ifdef AST_DEVMODE
ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
- sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
+ sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func);
if (ret < 0 || !sub->statistics) {
ao2_ref(sub, -1);
return NULL;
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
- use_thread_pool ? 'p' : 'm',
+ use_taskpool ? 'p' : 'm',
stasis_topic_name(topic));
/*
* acceptable. For a large number of subscribers, a thread
* pool should be used.
*/
- if (use_thread_pool) {
- sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
+ if (use_taskpool) {
+ sub->mailbox = ast_taskpool_serializer(tps_name, taskpool);
} else {
sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
}
struct ao2_container *declined;
};
-/*! \brief Threadpool configuration options */
-struct stasis_threadpool_conf {
- /*! Initial size of the thread pool */
+/*! \brief Taskpool configuration options */
+struct stasis_taskpool_conf {
+ /*! Minimum size of the taskpool */
+ int minimum_size;
+ /*! Initial size of the taskpool */
int initial_size;
- /*! Time, in seconds, before we expire a thread */
+ /*! Time, in seconds, before we expire a taskprocessor */
int idle_timeout_sec;
- /*! Maximum number of thread to allow */
+ /*! Maximum number of taskprocessors to allow */
int max_size;
};
struct stasis_config {
- /*! Thread pool configuration options */
- struct stasis_threadpool_conf *threadpool_options;
+ /*! Taskpool configuration options */
+ struct stasis_taskpool_conf *taskpool_options;
/*! Declined message types */
struct stasis_declined_config *declined_message_types;
};
static struct aco_type threadpool_option = {
.type = ACO_GLOBAL,
.name = "threadpool",
- .item_offset = offsetof(struct stasis_config, threadpool_options),
+ .item_offset = offsetof(struct stasis_config, taskpool_options),
.category = "threadpool",
.category_match = ACO_WHITELIST_EXACT,
};
-static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+static struct aco_type taskpool_option = {
+ .type = ACO_GLOBAL,
+ .name = "taskpool",
+ .item_offset = offsetof(struct stasis_config, taskpool_options),
+ .category = "taskpool",
+ .category_match = ACO_WHITELIST_EXACT,
+};
+
+static struct aco_type *taskpool_options[] = ACO_TYPES(&threadpool_option, &taskpool_option);
/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
static struct aco_type declined_option = {
struct aco_file stasis_conf = {
.filename = "stasis.conf",
- .types = ACO_TYPES(&declined_option, &threadpool_option),
+ .types = ACO_TYPES(&declined_option, &threadpool_option, &taskpool_option),
};
/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
struct stasis_config *cfg = obj;
ao2_cleanup(cfg->declined_message_types);
- ast_free(cfg->threadpool_options);
+ ast_free(cfg->taskpool_options);
}
static void *stasis_config_alloc(void)
return NULL;
}
- cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
- if (!cfg->threadpool_options) {
+ cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options));
+ if (!cfg->taskpool_options) {
ao2_ref(cfg, -1);
return NULL;
}
ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
- ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
+ ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No");
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
ao2_cleanup(topic_all);
topic_all = NULL;
- ast_threadpool_shutdown(threadpool);
- threadpool = NULL;
+ ast_taskpool_shutdown(taskpool);
+ taskpool = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
aco_info_destroy(&cfg_info);
{
struct stasis_config *cfg;
int cache_init;
- struct ast_threadpool_options threadpool_opts = { 0, };
+ struct ast_taskpool_options taskpool_opts = { 0, };
#ifdef AST_DEVMODE
struct ao2_container *subscription_stats;
struct ao2_container *topic_stats;
aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
declined_options, "", declined_handler, 0);
+ aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
+ taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
+ INT_MAX);
aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
- threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+ taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_taskpool_conf, initial_size), 0,
INT_MAX);
aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
- threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+ taskpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
INT_MAX);
aco_option_register(&cfg_info, "max_size", ACO_EXACT,
- threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
- FLDSET(struct stasis_threadpool_conf, max_size), 0,
+ taskpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_taskpool_conf, max_size), 0,
INT_MAX);
if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
return -1;
}
- if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+ if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
ao2_ref(default_cfg, -1);
}
}
- threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
- threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
- threadpool_opts.auto_increment = 1;
- threadpool_opts.max_size = cfg->threadpool_options->max_size;
- threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
- threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
+ taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
+ taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
+ taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
+ taskpool_opts.auto_increment = 1;
+ taskpool_opts.max_size = cfg->taskpool_options->max_size;
+ taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
+ taskpool = ast_taskpool_create("stasis", &taskpool_opts);
ao2_ref(cfg, -1);
- if (!threadpool) {
- ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+ if (!taskpool) {
+ ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
return -1;
}
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2025, Sangoma Technologies Corporation
+ *
+ * Joshua Colp <jcolp@sangoma.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+
+#include "asterisk.h"
+
+#include "asterisk/_private.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/serializer_shutdown_group.h"
+#include "asterisk/utils.h"
+#include "asterisk/time.h"
+#include "asterisk/sched.h"
+
+/*!
+ * \brief A taskpool taskprocessor
+ */
+struct taskpool_taskprocessor {
+ /*! The underlying taskprocessor */
+ struct ast_taskprocessor *taskprocessor;
+ /*! The last time a task was pushed to this taskprocessor */
+ struct timeval last_pushed;
+};
+
+/*!
+ * \brief A container of taskprocessors
+ */
+struct taskpool_taskprocessors {
+ /*! A vector of taskprocessors */
+ AST_VECTOR(, struct taskpool_taskprocessor *) taskprocessors;
+ /*! The next taskprocessor to use for pushing */
+ unsigned int taskprocessor_num;
+};
+
+typedef void (*taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
+ struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached);
+
+/*!
+ * \brief An opaque taskpool structure
+ *
+ * A taskpool is a collection of taskprocessors that
+ * execute tasks, each from their own queue. A selector
+ * determines which taskprocessor to queue to at push
+ * time.
+ */
+struct ast_taskpool {
+ /*! The static taskprocessors, those which will always exist */
+ struct taskpool_taskprocessors static_taskprocessors;
+ /*! The dynamic taskprocessors, those which will be created as needed */
+ struct taskpool_taskprocessors dynamic_taskprocessors;
+ /*! True if the taskpool is in the process of shutting down */
+ int shutting_down;
+ /*! Taskpool-specific options */
+ struct ast_taskpool_options options;
+ /*! Dynamic pool shrinking scheduled item */
+ int shrink_sched_id;
+ /*! The taskprocessor selector to use */
+ taskpool_selector selector;
+ /*! The name of the taskpool */
+ char name[0];
+};
+
+/*! \brief The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) */
+#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
+
+/*! \brief Scheduler used for dynamic pool shrinking */
+static struct ast_sched_context *sched;
+
+/*! \brief Thread storage for the current taskpool */
+AST_THREADSTORAGE_RAW(current_taskpool_pool);
+
+/*!
+ * \internal
+ * \brief Get the current taskpool associated with this thread.
+ */
+static struct ast_taskpool *ast_taskpool_get_current(void)
+{
+ return ast_threadstorage_get_ptr(¤t_taskpool_pool);
+}
+
+/*!
+ * \internal
+ * \brief Shutdown task for taskpool taskprocessor
+ */
+ static int taskpool_taskprocessor_stop(void *data)
+ {
+ struct ast_taskpool *pool = ast_taskpool_get_current();
+
+ /* If a thread stop callback is set on the options, call it */
+ if (pool->options.thread_end) {
+ pool->options.thread_end();
+ }
+
+ ao2_cleanup(pool);
+
+ return 0;
+ }
+
+/*! \internal */
+static void taskpool_taskprocessor_dtor(void *obj)
+{
+ struct taskpool_taskprocessor *taskprocessor = obj;
+
+ if (taskprocessor->taskprocessor && ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_stop, NULL)) {
+ /* We can't actually do anything if this fails, so just accept reality */
+ }
+
+ ast_taskprocessor_unreference(taskprocessor->taskprocessor);
+}
+
+/*!
+ * \internal
+ * \brief Startup task for taskpool taskprocessor
+ */
+static int taskpool_taskprocessor_start(void *data)
+{
+ struct ast_taskpool *pool = data;
+
+ /* Set the pool on the thread for this taskprocessor, inheriting the
+ * reference passed to the task itself.
+ */
+ ast_threadstorage_set_ptr(¤t_taskpool_pool, pool);
+
+ /* If a thread start callback is set on the options, call it */
+ if (pool->options.thread_start) {
+ pool->options.thread_start();
+ }
+
+ return 0;
+}
+
+/*!
+ * \internal
+ * \brief Allocate a taskpool specific taskprocessor
+ */
+static struct taskpool_taskprocessor *taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
+{
+ struct taskpool_taskprocessor *taskprocessor;
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ /* We don't actually need locking for each pool taskprocessor, as the only thing
+ * mutable is the underlying taskprocessor which has its own internal locking.
+ */
+ taskprocessor = ao2_alloc_options(sizeof(*taskprocessor), taskpool_taskprocessor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!taskprocessor) {
+ return NULL;
+ }
+
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name);
+
+ taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
+ if (!taskprocessor->taskprocessor) {
+ ao2_ref(taskprocessor, -1);
+ return NULL;
+ }
+
+ taskprocessor->last_pushed = ast_tvnow();
+
+ if (ast_taskprocessor_push(taskprocessor->taskprocessor, taskpool_taskprocessor_start, ao2_bump(pool))) {
+ ao2_ref(pool, -1);
+ /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to
+ * NULL here.
+ */
+ ast_taskprocessor_unreference(taskprocessor->taskprocessor);
+ taskprocessor->taskprocessor = NULL;
+ return NULL;
+ }
+
+ return taskprocessor;
+}
+
+/*!
+ * \internal
+ * \brief Initialize the taskpool taskprocessors structure
+ */
+static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
+{
+ if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*!
+ * \internal
+ * \brief Clean up the taskpool taskprocessors structure
+ */
+static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
+{
+ /* Access/manipulation of taskprocessors is done with the lock held, and
+ * with a check of the shutdown flag done. This means that outside of holding
+ * the lock we can safely muck with it. Pushing to the taskprocessor is done
+ * outside of the lock, but with a reference to the taskprocessor held.
+ */
+ AST_VECTOR_CALLBACK_VOID(&taskprocessors->taskprocessors, ao2_cleanup);
+ AST_VECTOR_FREE(&taskprocessors->taskprocessors);
+}
+
+/*!
+ * \internal
+ * \brief Determine if a taskpool taskprocessor is idle
+ */
+#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
+
+/*! \internal
+ * \brief Taskpool dynamic pool shrink function
+ */
+static int taskpool_dynamic_pool_shrink(const void *data)
+{
+ struct ast_taskpool *pool = (struct ast_taskpool *)data;
+ int num_removed;
+
+ ao2_lock(pool);
+
+ /* If the pool is shutting down, do nothing and don't reschedule */
+ if (pool->shutting_down) {
+ ao2_unlock(pool);
+ ao2_ref(pool, -1);
+ return 0;
+ }
+
+ /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */
+ num_removed = AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(&pool->dynamic_taskprocessors.taskprocessors, pool->options.idle_timeout * 1000,
+ TASKPROCESSOR_IS_IDLE, ao2_cleanup);
+ if (num_removed) {
+ /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */
+ if (pool->dynamic_taskprocessors.taskprocessor_num >= AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors)) {
+ pool->dynamic_taskprocessors.taskprocessor_num = 0;
+ }
+ }
+
+ ao2_unlock(pool);
+
+ /* It is possible for the pool to have been shut down between unlocking and returning, this is
+ * inherently a race condition we can't eliminate so we will catch it on the next iteration.
+ */
+ return pool->options.idle_timeout * 1000;
+}
+
+/*!
+ * \internal
+ * \brief Sequential taskprocessor selector
+ */
+ static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
+ struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
+{
+ unsigned int taskprocessor_num = taskprocessors->taskprocessor_num;
+
+ if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
+ *growth_threshold_reached = 1;
+ return;
+ }
+
+ taskprocessors->taskprocessor_num++;
+ if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
+ taskprocessors->taskprocessor_num = 0;
+ }
+
+ *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num);
+
+ /* Check to see if this has reached the growth threshold */
+ *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0;
+}
+
+/*!
+ * \interal
+ * \brief Least full taskprocessor selector
+ */
+static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
+ struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
+{
+ struct taskpool_taskprocessor *least_full = NULL;
+ unsigned int i;
+
+ if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
+ *growth_threshold_reached = 1;
+ return;
+ }
+
+ /* We assume that the growth threshold has not yet been reached, until proven otherwise */
+ *growth_threshold_reached = 0;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) {
+ struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i);
+
+ /* If this taskprocessor has no outstanding tasks, it is the best choice */
+ if (!ast_taskprocessor_size(tp->taskprocessor)) {
+ *taskprocessor = tp;
+ return;
+ }
+
+ /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */
+ if (ast_taskprocessor_size(tp->taskprocessor) >= pool->options.growth_threshold) {
+ *growth_threshold_reached = 1;
+ }
+
+ /* The taskprocessor with the fewest tasks should be used */
+ if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) {
+ least_full = tp;
+ }
+ }
+
+ *taskprocessor = least_full;
+}
+
+struct ast_taskpool *ast_taskpool_create(const char *name,
+ const struct ast_taskpool_options *options)
+{
+ struct ast_taskpool *pool;
+
+ /* Enforce versioning on the passed-in options */
+ if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
+ return NULL;
+ }
+
+ pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
+ if (!pool) {
+ return NULL;
+ }
+
+ strcpy(pool->name, name); /* Safe */
+ memcpy(&pool->options, options, sizeof(pool->options));
+ pool->shrink_sched_id = -1;
+
+ /* Verify the passed-in options are valid, and adjust if needed */
+ if (options->initial_size < options->minimum_size) {
+ pool->options.initial_size = options->minimum_size;
+ ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
+ name, options->initial_size, options->minimum_size, options->minimum_size);
+ }
+
+ if (options->max_size && pool->options.initial_size > options->max_size) {
+ pool->options.max_size = pool->options.initial_size;
+ ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
+ name, options->max_size, pool->options.initial_size, pool->options.initial_size);
+ }
+
+ if (!options->auto_increment) {
+ if (!pool->options.minimum_size) {
+ pool->options.minimum_size = 1;
+ ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
+ }
+ if (!pool->options.max_size) {
+ pool->options.max_size = pool->options.minimum_size;
+ ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
+ }
+ if (pool->options.minimum_size != pool->options.max_size) {
+ pool->options.minimum_size = pool->options.max_size;
+ pool->options.initial_size = pool->options.max_size;
+ ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
+ name, options->minimum_size, pool->options.max_size, pool->options.max_size);
+ }
+ } else if (!options->growth_threshold) {
+ pool->options.growth_threshold = TASKPOOL_GROW_THRESHOLD;
+ }
+
+ if (options->selector == AST_TASKPOOL_SELECTOR_DEFAULT || options->selector == AST_TASKPOOL_SELECTOR_LEAST_FULL) {
+ pool->selector = taskpool_least_full_selector;
+ } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
+ pool->selector = taskpool_sequential_selector;
+ } else {
+ ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
+ name, options->selector);
+ pool->selector = taskpool_least_full_selector;
+ }
+
+ if (taskpool_taskprocessors_init(&pool->static_taskprocessors, pool->options.minimum_size)) {
+ ao2_ref(pool, -1);
+ return NULL;
+ }
+
+ /* Create the static taskprocessors based on the passed-in options */
+ for (int i = 0; i < pool->options.minimum_size; i++) {
+ struct taskpool_taskprocessor *taskprocessor;
+
+ taskprocessor = taskpool_taskprocessor_alloc(pool, 's');
+ if (!taskprocessor) {
+ /* The reference to pool is passed to ast_taskpool_shutdown */
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+
+ if (AST_VECTOR_APPEND(&pool->static_taskprocessors.taskprocessors, taskprocessor)) {
+ ao2_ref(taskprocessor, -1);
+ /* The reference to pool is passed to ast_taskpool_shutdown */
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+ }
+
+ if (taskpool_taskprocessors_init(&pool->dynamic_taskprocessors,
+ pool->options.initial_size - pool->options.minimum_size)) {
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+
+ /* Create the dynamic taskprocessor based on the passed-in options */
+ for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
+ struct taskpool_taskprocessor *taskprocessor;
+
+ taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
+ if (!taskprocessor) {
+ /* The reference to pool is passed to ast_taskpool_shutdown */
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+
+ if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, taskprocessor)) {
+ ao2_ref(taskprocessor, -1);
+ /* The reference to pool is passed to ast_taskpool_shutdown */
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+ }
+
+ /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
+ * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
+ * and to reduce complexity.
+ */
+ if (options->idle_timeout && options->auto_increment) {
+ pool->shrink_sched_id = ast_sched_add(sched, options->idle_timeout * 1000, taskpool_dynamic_pool_shrink, ao2_bump(pool));
+ if (pool->shrink_sched_id < 0) {
+ ao2_ref(pool, -1);
+ /* The second reference to pool is passed to ast_taskpool_shutdown */
+ ast_taskpool_shutdown(pool);
+ return NULL;
+ }
+ }
+
+ return pool;
+}
+
+size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool)
+{
+ size_t count;
+
+ ao2_lock(pool);
+ count = AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors);
+ ao2_unlock(pool);
+
+ return count;
+}
+
+#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor))
+
+long ast_taskpool_queue_size(struct ast_taskpool *pool)
+{
+ long queue_size = 0;
+
+ ao2_lock(pool);
+ AST_VECTOR_CALLBACK_VOID(&pool->static_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size);
+ AST_VECTOR_CALLBACK_VOID(&pool->dynamic_taskprocessors.taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, queue_size);
+ ao2_unlock(pool);
+
+ return queue_size;
+}
+
+/*! \internal
+ * \brief Taskpool dynamic pool grow function
+ */
+static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
+{
+ unsigned int num_to_add = pool->options.auto_increment;
+ int i;
+
+ if (!num_to_add) {
+ return;
+ }
+
+ /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */
+ if (pool->options.max_size) {
+ unsigned int current_size = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) + AST_VECTOR_SIZE(&pool->static_taskprocessors.taskprocessors);
+
+ if (current_size + num_to_add > pool->options.max_size) {
+ num_to_add = pool->options.max_size - current_size;
+ }
+ }
+
+ for (i = 0; i < num_to_add; i++) {
+ struct taskpool_taskprocessor *new_taskprocessor;
+
+ new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
+ if (!new_taskprocessor) {
+ return;
+ }
+
+ if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) {
+ ao2_ref(new_taskprocessor, -1);
+ return;
+ }
+
+ if (i == 0) {
+ /* On the first iteration we return the taskprocessor we just added */
+ *taskprocessor = new_taskprocessor;
+ /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */
+ pool->dynamic_taskprocessors.taskprocessor_num = 0;
+ } else if (i == 1) {
+ /* On the second iteration we update the next taskprocessor to use to be this one */
+ pool->dynamic_taskprocessors.taskprocessor_num = AST_VECTOR_SIZE(&pool->dynamic_taskprocessors.taskprocessors) - 1;
+ }
+ }
+}
+
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+ RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
+
+ /* Select the taskprocessor in the pool to use for pushing this task */
+ ao2_lock(pool);
+ if (!pool->shutting_down) {
+ unsigned int growth_threshold_reached = 0;
+
+ /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
+ * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
+ * fails for some reason, it will still fall back to the initially found static one if
+ * it is present.
+ */
+ pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
+ if (pool->options.auto_increment && growth_threshold_reached) {
+ /* If we need to grow then try dynamic taskprocessors */
+ pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
+ if (growth_threshold_reached) {
+ /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
+ taskpool_dynamic_pool_grow(pool, &taskprocessor);
+ }
+
+ /* If a dynamic taskprocessor was used update its last push time */
+ if (taskprocessor) {
+ taskprocessor->last_pushed = ast_tvnow();
+ }
+ }
+ ao2_bump(taskprocessor);
+ }
+ ao2_unlock(pool);
+
+ if (!taskprocessor) {
+ return -1;
+ }
+
+ if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*!
+ * \internal Structure used for synchronous task
+ */
+struct taskpool_sync_task {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ int complete;
+ int fail;
+ int (*task)(void *);
+ void *task_data;
+};
+
+/*!
+ * \internal Initialization function for synchronous task
+ */
+static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int (*task)(void *), void *data)
+{
+ ast_mutex_init(&sync_task->lock);
+ ast_cond_init(&sync_task->cond, NULL);
+ sync_task->complete = 0;
+ sync_task->fail = 0;
+ sync_task->task = task;
+ sync_task->task_data = data;
+ return 0;
+}
+
+/*!
+ * \internal Cleanup function for synchronous task
+ */
+static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
+{
+ ast_mutex_destroy(&sync_task->lock);
+ ast_cond_destroy(&sync_task->cond);
+}
+
+/*!
+ * \internal Function for executing a sychronous task
+ */
+static int taskpool_sync_task(void *data)
+{
+ struct taskpool_sync_task *sync_task = data;
+ int ret;
+
+ sync_task->fail = sync_task->task(sync_task->task_data);
+
+ /*
+ * Once we unlock sync_task->lock after signaling, we cannot access
+ * sync_task again. The thread waiting within ast_taskpool_push_wait()
+ * is free to continue and release its local variable (sync_task).
+ */
+ ast_mutex_lock(&sync_task->lock);
+ sync_task->complete = 1;
+ ast_cond_signal(&sync_task->cond);
+ ret = sync_task->fail;
+ ast_mutex_unlock(&sync_task->lock);
+ return ret;
+}
+
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+ struct taskpool_sync_task sync_task;
+
+ /* If we are already executing within a taskpool taskprocessor then
+ * don't bother pushing a new task, just directly execute the task.
+ */
+ if (ast_taskpool_get_current()) {
+ return task(data);
+ }
+
+ if (taskpool_sync_task_init(&sync_task, task, data)) {
+ return -1;
+ }
+
+ if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) {
+ taskpool_sync_task_cleanup(&sync_task);
+ return -1;
+ }
+
+ ast_mutex_lock(&sync_task.lock);
+ while (!sync_task.complete) {
+ ast_cond_wait(&sync_task.cond, &sync_task.lock);
+ }
+ ast_mutex_unlock(&sync_task.lock);
+
+ taskpool_sync_task_cleanup(&sync_task);
+ return sync_task.fail;
+}
+
+void ast_taskpool_shutdown(struct ast_taskpool *pool)
+{
+ if (!pool) {
+ return;
+ }
+
+ /* Mark this pool as shutting down so nothing new is pushed */
+ ao2_lock(pool);
+ pool->shutting_down = 1;
+ ao2_unlock(pool);
+
+ /* Stop the shrink scheduled item if present */
+ AST_SCHED_DEL_UNREF(sched, pool->shrink_sched_id, ao2_ref(pool, -1));
+
+ /* Clean up all the taskprocessors */
+ taskpool_taskprocessors_cleanup(&pool->static_taskprocessors);
+ taskpool_taskprocessors_cleanup(&pool->dynamic_taskprocessors);
+
+ ao2_ref(pool, -1);
+}
+
+struct serializer {
+ /*! Taskpool the serializer will use to process the jobs. */
+ struct ast_taskpool *pool;
+ /*! Which group will wait for this serializer to shutdown. */
+ struct ast_serializer_shutdown_group *shutdown_group;
+};
+
+static void serializer_dtor(void *obj)
+{
+ struct serializer *ser = obj;
+
+ ao2_cleanup(ser->pool);
+ ser->pool = NULL;
+ ao2_cleanup(ser->shutdown_group);
+ ser->shutdown_group = NULL;
+}
+
+static struct serializer *serializer_create(struct ast_taskpool *pool,
+ struct ast_serializer_shutdown_group *shutdown_group)
+{
+ struct serializer *ser;
+
+ /* This object has a lock so it can be used to ensure exclusive access
+ * to the execution of tasks within the serializer.
+ */
+ ser = ao2_alloc(sizeof(*ser), serializer_dtor);
+ if (!ser) {
+ return NULL;
+ }
+ ser->pool = ao2_bump(pool);
+ ser->shutdown_group = ao2_bump(shutdown_group);
+ return ser;
+}
+
+AST_THREADSTORAGE_RAW(current_taskpool_serializer);
+
+static int execute_tasks(void *data)
+{
+ struct ast_taskpool *pool = ast_taskpool_get_current();
+ struct ast_taskprocessor *tps = data;
+ struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(tps);
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ size_t remaining, requeue = 0;
+
+ /* In a normal scenario this lock will not be in contention with
+ * anything else. It is only if a synchronous task is pushed to
+ * the serializer that it may be blocked on the synchronous
+ * task thread. This is done to ensure that only one thread is executing
+ * tasks from the serializer at a given time, and not out of order
+ * either.
+ */
+ ao2_lock(ser);
+
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps);
+ for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
+ requeue = ast_taskprocessor_execute(tps);
+ }
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL);
+
+ ao2_unlock(ser);
+
+ /* If there are remaining tasks we requeue, this way the serializer
+ * does not hold exclusivity of the taskpool taskprocessor
+ */
+ if (requeue) {
+ /* Ownership passes to the new task */
+ if (ast_taskpool_push(pool, execute_tasks, tps)) {
+ ast_taskprocessor_unreference(tps);
+ }
+ } else {
+ ast_taskprocessor_unreference(tps);
+ }
+
+ return 0;
+}
+
+static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+{
+ if (was_empty) {
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+
+ if (ast_taskpool_push(ser->pool, execute_tasks, tps)) {
+ ast_taskprocessor_unreference(tps);
+ }
+ }
+}
+
+static int serializer_start(struct ast_taskprocessor_listener *listener)
+{
+ /* No-op */
+ return 0;
+}
+
+static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+{
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+ if (ser->shutdown_group) {
+ ast_serializer_shutdown_group_dec(ser->shutdown_group);
+ }
+ ao2_cleanup(ser);
+}
+
+static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+ .task_pushed = serializer_task_pushed,
+ .start = serializer_start,
+ .shutdown = serializer_shutdown,
+};
+
+struct ast_taskprocessor *ast_taskpool_serializer_get_current(void)
+{
+ return ast_threadstorage_get_ptr(¤t_taskpool_serializer);
+}
+
+struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
+ struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
+{
+ struct serializer *ser;
+ struct ast_taskprocessor_listener *listener;
+ struct ast_taskprocessor *tps;
+
+ ser = serializer_create(pool, shutdown_group);
+ if (!ser) {
+ return NULL;
+ }
+
+ listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
+ if (!listener) {
+ ao2_ref(ser, -1);
+ return NULL;
+ }
+
+ tps = ast_taskprocessor_create_with_listener(name, listener);
+ if (!tps) {
+ /* ser ref transferred to listener but not cleaned without tps */
+ ao2_ref(ser, -1);
+ } else if (shutdown_group) {
+ ast_serializer_shutdown_group_inc(shutdown_group);
+ }
+
+ ao2_ref(listener, -1);
+ return tps;
+}
+
+struct ast_taskprocessor *ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
+{
+ return ast_taskpool_serializer_group(name, pool, NULL);
+}
+
+/*!
+ * \internal An empty task callback, used to ensure the serializer does not
+ * go empty. */
+static int taskpool_serializer_empty_task(void *data)
+{
+ return 0;
+}
+
+int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data)
+{
+ struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_taskprocessor *prior_serializer;
+ struct taskpool_sync_task sync_task;
+
+ /* If not in a taskpool taskprocessor we can just queue the task like normal and
+ * wait. */
+ if (!ast_taskpool_get_current()) {
+ if (taskpool_sync_task_init(&sync_task, task, data)) {
+ return -1;
+ }
+
+ if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) {
+ taskpool_sync_task_cleanup(&sync_task);
+ return -1;
+ }
+
+ ast_mutex_lock(&sync_task.lock);
+ while (!sync_task.complete) {
+ ast_cond_wait(&sync_task.cond, &sync_task.lock);
+ }
+ ast_mutex_unlock(&sync_task.lock);
+
+ taskpool_sync_task_cleanup(&sync_task);
+ return sync_task.fail;
+ }
+
+ /* It is possible that we are already executing within a serializer, so stash the existing
+ * away so we can restore it.
+ */
+ prior_serializer = ast_taskpool_serializer_get_current();
+
+ ao2_lock(ser);
+
+ /* There are two cases where we can or have to directly execute this task:
+ * 1. There are no other tasks in the serializer
+ * 2. We are already in the serializer
+ * In the second case if we don't execute the task now, we will deadlock waiting
+ * on it as it will never occur.
+ */
+ if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer);
+ sync_task.fail = task(data);
+ ao2_unlock(ser);
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer);
+ return sync_task.fail;
+ }
+
+ if (taskpool_sync_task_init(&sync_task, task, data)) {
+ ao2_unlock(ser);
+ return -1;
+ }
+
+ /* First we queue the serialized task */
+ if (ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task)) {
+ taskpool_sync_task_cleanup(&sync_task);
+ ao2_unlock(ser);
+ return -1;
+ }
+
+ /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
+ * stops two tasks from being queued for the same serializer at the same time.
+ */
+ if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) {
+ taskpool_sync_task_cleanup(&sync_task);
+ ao2_unlock(ser);
+ return -1;
+ }
+
+ /* Now we execute the tasks on the serializer until our sync task is complete */
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, serializer);
+ while (!sync_task.complete) {
+ /* The sync task is guaranteed to be executed, so doing a while loop on the complete
+ * flag is safe.
+ */
+ ast_taskprocessor_execute(serializer);
+ }
+ taskpool_sync_task_cleanup(&sync_task);
+ ao2_unlock(ser);
+
+ ast_threadstorage_set_ptr(¤t_taskpool_serializer, prior_serializer);
+
+ return sync_task.fail;
+}
+
+/*!
+ * \internal
+ * \brief Clean up resources on Asterisk shutdown
+ */
+static void taskpool_shutdown(void)
+{
+ if (sched) {
+ ast_sched_context_destroy(sched);
+ sched = NULL;
+ }
+}
+
+int ast_taskpool_init(void)
+{
+ sched = ast_sched_context_create();
+ if (!sched) {
+ return -1;
+ }
+
+ if (ast_sched_start_thread(sched)) {
+ return -1;
+ }
+
+ ast_register_cleanup(taskpool_shutdown);
+
+ return 0;
+}
return (tps) ? tps->tps_queue_size : -1;
}
+struct ast_taskprocessor_listener *ast_taskprocessor_listener(struct ast_taskprocessor *tps)
+{
+ return tps ? tps->listener : NULL;
+}
+
/* taskprocessor name accessor */
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
{
return 0;
}
-/*! Serializer group shutdown control object. */
-struct ast_serializer_shutdown_group {
- /*! Shutdown thread waits on this conditional. */
- ast_cond_t cond;
- /*! Count of serializers needing to shutdown. */
- int count;
-};
-
-static void serializer_shutdown_group_dtor(void *vdoomed)
-{
- struct ast_serializer_shutdown_group *doomed = vdoomed;
-
- ast_cond_destroy(&doomed->cond);
-}
-
-struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
-{
- struct ast_serializer_shutdown_group *shutdown_group;
-
- shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
- if (!shutdown_group) {
- return NULL;
- }
- ast_cond_init(&shutdown_group->cond, NULL);
- return shutdown_group;
-}
-
-int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
-{
- int remaining;
- ast_mutex_t *lock;
-
- if (!shutdown_group) {
- return 0;
- }
-
- lock = ao2_object_get_lockaddr(shutdown_group);
- ast_assert(lock != NULL);
-
- ao2_lock(shutdown_group);
- if (timeout) {
- struct timeval start;
- struct timespec end;
-
- start = ast_tvnow();
- end.tv_sec = start.tv_sec + timeout;
- end.tv_nsec = start.tv_usec * 1000;
- while (shutdown_group->count) {
- if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
- /* Error or timed out waiting for the count to reach zero. */
- break;
- }
- }
- } else {
- while (shutdown_group->count) {
- if (ast_cond_wait(&shutdown_group->cond, lock)) {
- /* Error */
- break;
- }
- }
- }
- remaining = shutdown_group->count;
- ao2_unlock(shutdown_group);
- return remaining;
-}
-
-/*!
- * \internal
- * \brief Increment the number of serializer members in the group.
- * \since 13.5.0
- *
- * \param shutdown_group Group shutdown controller.
- */
-static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
-{
- ao2_lock(shutdown_group);
- ++shutdown_group->count;
- ao2_unlock(shutdown_group);
-}
-
-/*!
- * \internal
- * \brief Decrement the number of serializer members in the group.
- * \since 13.5.0
- *
- * \param shutdown_group Group shutdown controller.
- */
-static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
-{
- ao2_lock(shutdown_group);
- --shutdown_group->count;
- if (!shutdown_group->count) {
- ast_cond_signal(&shutdown_group->cond);
- }
- ao2_unlock(shutdown_group);
-}
-
struct serializer {
/*! Threadpool the serializer will use to process the jobs. */
struct ast_threadpool *pool;
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
if (ser->shutdown_group) {
- serializer_shutdown_group_dec(ser->shutdown_group);
+ ast_serializer_shutdown_group_dec(ser->shutdown_group);
}
ao2_cleanup(ser);
}
/* ser ref transferred to listener but not cleaned without tps */
ao2_ref(ser, -1);
} else if (shutdown_group) {
- serializer_shutdown_group_inc(shutdown_group);
+ ast_serializer_shutdown_group_inc(shutdown_group);
}
ao2_ref(listener, -1);
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2025, Sangoma Technologies Inc
+ *
+ * Joshua Colp <jcolp@sangoma.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \brief taskpool unit tests
+ *
+ * \author Joshua Colp <jcolp@sangoma.com>
+ *
+ */
+
+/*** MODULEINFO
+ <depend>TEST_FRAMEWORK</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+#include "asterisk/astobj2.h"
+#include "asterisk/lock.h"
+#include "asterisk/logger.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/test.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/cli.h"
+
+struct test_data {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ int executed;
+ struct ast_taskprocessor *taskprocessor;
+};
+
+static struct test_data *test_alloc(void)
+{
+ struct test_data *td = ast_calloc(1, sizeof(*td));
+ if (!td) {
+ return NULL;
+ }
+ ast_mutex_init(&td->lock);
+ ast_cond_init(&td->cond, NULL);
+ return td;
+}
+
+static void test_destroy(struct test_data *td)
+{
+ ast_mutex_destroy(&td->lock);
+ ast_cond_destroy(&td->cond);
+ ast_free(td);
+}
+
+static int simple_task(void *data)
+{
+ struct test_data *td = data;
+ SCOPED_MUTEX(lock, &td->lock);
+ td->taskprocessor = ast_taskpool_serializer_get_current();
+ td->executed = 1;
+ ast_cond_signal(&td->cond);
+ return 0;
+}
+
+AST_TEST_DEFINE(taskpool_push)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 1,
+ .initial_size = 1,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct timeval start;
+ struct timespec end;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool pushing test";
+ info->description =
+ "Pushes a single task into a taskpool asynchronously and ensures it is executed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ if (ast_taskpool_push(pool, simple_task, td)) {
+ goto end;
+ }
+
+ /* It should not take more than 5 seconds for a single simple task to execute */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 5;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&td->lock);
+ while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&td->lock);
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+AST_TEST_DEFINE(taskpool_push_synchronous)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 1,
+ .initial_size = 1,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_synchronous";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool synchronous pushing test";
+ info->description =
+ "Pushes a single task into a taskpool synchronously and ensures it is executed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ if (ast_taskpool_push_wait(pool, simple_task, td)) {
+ goto end;
+ }
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+AST_TEST_DEFINE(taskpool_push_serializer)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 1,
+ .initial_size = 1,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct ast_taskprocessor *serializer = NULL;
+ struct timeval start;
+ struct timespec end;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_serializer";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool serializer pushing test";
+ info->description =
+ "Pushes a single task into a taskpool serializer and ensures it is executed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ serializer = ast_taskpool_serializer("serializer", pool);
+ if (!serializer) {
+ goto end;
+ }
+
+ if (ast_taskprocessor_push(serializer, simple_task, td)) {
+ goto end;
+ }
+
+ /* It should not take more than 5 seconds for a single simple task to execute */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 5;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&td->lock);
+ while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&td->lock);
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+ if (td->taskprocessor != serializer) {
+ ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskprocessor_unreference(serializer);
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+AST_TEST_DEFINE(taskpool_push_serializer_synchronous)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 1,
+ .initial_size = 1,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct ast_taskprocessor *serializer = NULL;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_serializer_synchronous";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool serializer synchronous pushing test";
+ info->description =
+ "Pushes a single task into a taskpool serializer synchronously and ensures it is executed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ serializer = ast_taskpool_serializer("serializer", pool);
+ if (!serializer) {
+ goto end;
+ }
+
+ if (ast_taskpool_serializer_push_wait(serializer, simple_task, td)) {
+ goto end;
+ }
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+ if (td->taskprocessor != serializer) {
+ ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskprocessor_unreference(serializer);
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+static int requeue_task(void *data)
+{
+ return ast_taskpool_serializer_push_wait(ast_taskpool_serializer_get_current(), simple_task, data);
+}
+
+AST_TEST_DEFINE(taskpool_push_serializer_synchronous_requeue)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 1,
+ .initial_size = 1,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct ast_taskprocessor *serializer = NULL;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_serializer_synchronous_requeue";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool serializer synchronous requeueing test";
+ info->description =
+ "Pushes a single task into a taskpool serializer synchronously and ensures it is requeued and executed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ serializer = ast_taskpool_serializer("serializer", pool);
+ if (!serializer) {
+ goto end;
+ }
+
+ if (ast_taskpool_serializer_push_wait(serializer, requeue_task, td)) {
+ goto end;
+ }
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+ if (td->taskprocessor != serializer) {
+ ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskprocessor_unreference(serializer);
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+AST_TEST_DEFINE(taskpool_push_grow)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 1,
+ .minimum_size = 0,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct timeval start;
+ struct timespec end;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_grow";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool pushing test with auto-grow enabled";
+ info->description =
+ "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool grows.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ if (ast_taskpool_taskprocessors_count(pool) != 0) {
+ ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+ if (ast_taskpool_push(pool, simple_task, td)) {
+ goto end;
+ }
+
+ if (ast_taskpool_taskprocessors_count(pool) != 1) {
+ ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+ /* It should not take more than 5 seconds for a single simple task to execute */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 5;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&td->lock);
+ while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&td->lock);
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+end:
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+AST_TEST_DEFINE(taskpool_push_shrink)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 1,
+ .auto_increment = 1,
+ .minimum_size = 0,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ enum ast_test_result_state res = AST_TEST_PASS;
+ struct timeval start;
+ struct timespec end;
+ int iterations = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "push_shrink";
+ info->category = "/main/taskpool/";
+ info->summary = "Taskpool pushing test with auto-shrink enabled";
+ info->description =
+ "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool shrinks.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+ td = test_alloc();
+ if (!td) {
+ return AST_TEST_FAIL;
+ }
+
+ pool = ast_taskpool_create(info->name, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ if (ast_taskpool_taskprocessors_count(pool) != 0) {
+ ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+ if (ast_taskpool_push(pool, simple_task, td)) {
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+ if (ast_taskpool_taskprocessors_count(pool) != 1) {
+ ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+ /* We give 10 seconds for the pool to shrink back to normal, but if it happens earlier we
+ * stop our check early.
+ */
+ ast_mutex_lock(&td->lock);
+ do {
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 1;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ if (ast_cond_timedwait(&td->cond, &td->lock, &end) == ETIMEDOUT) {
+ iterations++;
+ }
+ } while (ast_taskpool_taskprocessors_count(pool) != 0 && iterations != 10);
+
+ if (!td->executed) {
+ ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
+ res = AST_TEST_FAIL;
+ }
+
+ if (ast_taskpool_taskprocessors_count(pool) != 0) {
+ ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
+ res = AST_TEST_FAIL;
+ goto end;
+ }
+
+end:
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return res;
+}
+
+struct efficiency_task_data {
+ struct ast_taskpool *pool;
+ int num_tasks_executed;
+ int shutdown;
+};
+
+static int efficiency_task(void *data)
+{
+ struct efficiency_task_data *etd = data;
+
+ if (etd->shutdown) {
+ ao2_ref(etd->pool, -1);
+ return 0;
+ }
+
+ ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1);
+
+ if (ast_taskpool_push(etd->pool, efficiency_task, etd)) {
+ ao2_ref(etd->pool, -1);
+ return -1;
+ }
+
+ return 0;
+}
+
+static char *handle_cli_taskpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 5,
+ .initial_size = 5,
+ .max_size = 5,
+ };
+ struct efficiency_task_data etd = {
+ .pool = NULL,
+ .num_tasks_executed = 0,
+ .shutdown = 0,
+ };
+ struct timeval start;
+ struct timespec end;
+ int i;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "taskpool push efficiency";
+ e->usage =
+ "Usage: taskpool push efficiency\n"
+ " Pushes 200 tasks to a taskpool and measures\n"
+ " the number of tasks executed within 30 seconds.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ td = test_alloc();
+ if (!td) {
+ return CLI_SUCCESS;
+ }
+
+ pool = ast_taskpool_create("taskpool_push_efficiency", &options);
+ if (!pool) {
+ goto end;
+ }
+
+ etd.pool = pool;
+
+ /* Push in 200 tasks, cause why not */
+ for (i = 0; i < 200; i++) {
+ /* Ensure that the task has a reference to the pool */
+ ao2_bump(pool);
+ if (ast_taskpool_push(pool, efficiency_task, &etd)) {
+ goto end;
+ }
+ }
+
+ /* Wait for 30 seconds */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 30;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&td->lock);
+ while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&td->lock);
+
+ /* Give the total tasks executed, and tell each task to not requeue */
+ ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed);
+
+end:
+ etd.shutdown = 1;
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return CLI_SUCCESS;
+}
+
+struct serializer_efficiency_task_data {
+ struct ast_taskprocessor *serializer[2];
+ int *num_tasks_executed;
+ int *shutdown;
+};
+
+static int serializer_efficiency_task(void *data)
+{
+ struct serializer_efficiency_task_data *etd = data;
+ struct ast_taskprocessor *taskprocessor = etd->serializer[0];
+
+ if (*etd->shutdown) {
+ return 0;
+ }
+
+ ast_atomic_fetchadd_int(etd->num_tasks_executed, +1);
+
+ /* We ping pong a task between a pair of taskprocessors to ensure that
+ * a single taskprocessor does not receive a thread from the threadpool
+ * exclusively.
+ */
+ if (taskprocessor == ast_taskpool_serializer_get_current()) {
+ taskprocessor = etd->serializer[1];
+ }
+
+ if (ast_taskprocessor_push(taskprocessor,
+ serializer_efficiency_task, etd)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static char *handle_cli_taskpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ast_taskpool *pool = NULL;
+ struct test_data *td = NULL;
+ struct ast_taskpool_options options = {
+ .version = AST_TASKPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .minimum_size = 5,
+ .initial_size = 5,
+ .max_size = 5,
+ };
+ struct serializer_efficiency_task_data etd[200];
+ struct timeval start;
+ struct timespec end;
+ int i;
+ int num_tasks_executed = 0;
+ int shutdown = 0;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "taskpool push serializer efficiency";
+ e->usage =
+ "Usage: taskpool push serializer efficiency\n"
+ " Pushes 200 tasks to a taskpool in serializers and measures\n"
+ " the number of tasks executed within 30 seconds.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+ td = test_alloc();
+ if (!td) {
+ return CLI_SUCCESS;
+ }
+
+ memset(&etd, 0, sizeof(etd));
+
+ pool = ast_taskpool_create("taskpool_push_serializer_efficiency", &options);
+ if (!pool) {
+ goto end;
+ }
+
+ /* We create 400 (200 pairs) of serializers */
+ for (i = 0; i < 200; i++) {
+ char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
+ etd[i].serializer[0] = ast_taskpool_serializer(serializer_name, pool);
+ if (!etd[i].serializer[0]) {
+ goto end;
+ }
+
+ ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
+ etd[i].serializer[1] = ast_taskpool_serializer(serializer_name, pool);
+ if (!etd[i].serializer[1]) {
+ goto end;
+ }
+
+ etd[i].num_tasks_executed = &num_tasks_executed;
+ etd[i].shutdown = &shutdown;
+ }
+
+ /* And once created we push in 200 tasks */
+ for (i = 0; i < 200; i++) {
+ if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) {
+ goto end;
+ }
+ }
+
+ /* Wait for 30 seconds */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 30;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&td->lock);
+ while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&td->lock);
+
+ /* Give the total tasks executed, and tell each task to not requeue */
+ ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed);
+ shutdown = 1;
+
+end:
+ /* We need to unreference each serializer */
+ for (i = 0; i < 200; i++) {
+ ast_taskprocessor_unreference(etd[i].serializer[0]);
+ ast_taskprocessor_unreference(etd[i].serializer[1]);
+ }
+ ast_taskpool_shutdown(pool);
+ test_destroy(td);
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli[] = {
+ AST_CLI_DEFINE(handle_cli_taskpool_push_efficiency, "Push tasks to a taskpool and measure efficiency"),
+ AST_CLI_DEFINE(handle_cli_taskpool_push_serializer_efficiency, "Push tasks to a taskpool in serializers and measure efficiency"),
+};
+
+static int unload_module(void)
+{
+ ast_cli_unregister_multiple(cli, ARRAY_LEN(cli));
+ AST_TEST_UNREGISTER(taskpool_push);
+ AST_TEST_UNREGISTER(taskpool_push_synchronous);
+ AST_TEST_UNREGISTER(taskpool_push_serializer);
+ AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous);
+ AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous_requeue);
+ AST_TEST_UNREGISTER(taskpool_push_grow);
+ AST_TEST_UNREGISTER(taskpool_push_shrink);
+ return 0;
+}
+
+static int load_module(void)
+{
+ ast_cli_register_multiple(cli, ARRAY_LEN(cli));
+ AST_TEST_REGISTER(taskpool_push);
+ AST_TEST_REGISTER(taskpool_push_synchronous);
+ AST_TEST_REGISTER(taskpool_push_serializer);
+ AST_TEST_REGISTER(taskpool_push_serializer_synchronous);
+ AST_TEST_REGISTER(taskpool_push_serializer_synchronous_requeue);
+ AST_TEST_REGISTER(taskpool_push_grow);
+ AST_TEST_REGISTER(taskpool_push_shrink);
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskpool test module");
}
if (!tld->empty_notice) {
- ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
+ if (test) {
+ ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
+ }
res = AST_TEST_FAIL;
}
return res;
}
+struct efficiency_task_data {
+ struct ast_threadpool *pool;
+ int num_tasks_executed;
+ int shutdown;
+};
+
+static int efficiency_task(void *data)
+{
+ struct efficiency_task_data *etd = data;
+
+ if (etd->shutdown) {
+ return 0;
+ }
+
+ ast_atomic_fetchadd_int(&etd->num_tasks_executed, +1);
+
+ if (ast_threadpool_push(etd->pool, efficiency_task, etd)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static char *handle_cli_threadpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ struct test_listener_data *tld = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 5,
+ .max_size = 5,
+ };
+ struct efficiency_task_data etd = {
+ .pool = NULL,
+ .num_tasks_executed = 0,
+ .shutdown = 0,
+ };
+ struct timeval start;
+ struct timespec end;
+ int i;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "threadpool push efficiency";
+ e->usage =
+ "Usage: threadpool push efficiency\n"
+ " Pushes 200 tasks to a threadpool and measures\n"
+ " the number of tasks executed within 30 seconds.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return CLI_SUCCESS;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create("threadpool_push_efficiency", listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ etd.pool = pool;
+
+ /* Push in 200 tasks, cause why not */
+ for (i = 0; i < 200; i++) {
+ if (ast_threadpool_push(pool, efficiency_task, &etd)) {
+ goto end;
+ }
+ }
+
+ /* Wait for 30 seconds */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 30;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ /* Give the total tasks executed, and tell each task to not requeue */
+ ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", etd.num_tasks_executed);
+ etd.shutdown = 1;
+
+ res = wait_for_empty_notice(NULL, tld);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+end:
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return CLI_SUCCESS;
+}
+
AST_TEST_DEFINE(threadpool_initial_threads)
{
struct ast_threadpool *pool = NULL;
return res;
}
+struct serializer_efficiency_task_data {
+ struct ast_taskprocessor *serializer[2];
+ int *num_tasks_executed;
+ int *shutdown;
+};
+
+static int serializer_efficiency_task(void *data)
+{
+ struct serializer_efficiency_task_data *etd = data;
+ struct ast_taskprocessor *taskprocessor = etd->serializer[0];
+
+ if (*etd->shutdown) {
+ return 0;
+ }
+
+ ast_atomic_fetchadd_int(etd->num_tasks_executed, +1);
+
+ /* We ping pong a task between a pair of taskprocessors to ensure that
+ * a single taskprocessor does not receive a thread from the threadpool
+ * exclusively.
+ */
+ if (taskprocessor == ast_threadpool_serializer_get_current()) {
+ taskprocessor = etd->serializer[1];
+ }
+
+ if (ast_taskprocessor_push(taskprocessor,
+ serializer_efficiency_task, etd)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static char *handle_cli_threadpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd,
+ struct ast_cli_args *a)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ struct test_listener_data *tld = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 5,
+ .max_size = 5,
+ };
+ struct serializer_efficiency_task_data etd[200];
+ struct timeval start;
+ struct timespec end;
+ int i;
+ int num_tasks_executed = 0;
+ int shutdown = 0;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "threadpool push serializer efficiency";
+ e->usage =
+ "Usage: threadpool push serializer efficiency\n"
+ " Pushes 200 tasks to a threadpool in serializers and measures\n"
+ " the number of tasks executed within 30 seconds.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return CLI_SUCCESS;
+ }
+
+ memset(&etd, 0, sizeof(etd));
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create("threadpool_push_serializer_efficiency", listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ /* We create 400 (200 pairs) of serializers */
+ for (i = 0; i < 200; i++) {
+ char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
+ etd[i].serializer[0] = ast_threadpool_serializer(serializer_name, pool);
+ if (!etd[i].serializer[0]) {
+ goto end;
+ }
+
+ ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
+ etd[i].serializer[1] = ast_threadpool_serializer(serializer_name, pool);
+ if (!etd[i].serializer[1]) {
+ goto end;
+ }
+
+ etd[i].num_tasks_executed = &num_tasks_executed;
+ etd[i].shutdown = &shutdown;
+ }
+
+ /* And once created we push in 200 tasks */
+ for (i = 0; i < 200; i++) {
+ if (ast_taskprocessor_push(etd[i].serializer[0], serializer_efficiency_task, &etd[i])) {
+ goto end;
+ }
+ }
+
+ /* Wait for 30 seconds */
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + 30;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ /* Give the total tasks executed, and tell each task to not requeue */
+ ast_cli(a->fd, "Total tasks executed in 30 seconds: %d\n", num_tasks_executed);
+ shutdown = 1;
+
+ res = wait_for_empty_notice(NULL, tld);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+end:
+ /* We need to unreference each serializer */
+ for (i = 0; i < 200; i++) {
+ ast_taskprocessor_unreference(etd[i].serializer[0]);
+ ast_taskprocessor_unreference(etd[i].serializer[1]);
+ }
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return CLI_SUCCESS;
+}
+
AST_TEST_DEFINE(threadpool_serializer_dupe)
{
enum ast_test_result_state res = AST_TEST_FAIL;
return res;
}
+static struct ast_cli_entry cli[] = {
+ AST_CLI_DEFINE(handle_cli_threadpool_push_efficiency, "Push tasks to a threadpool and measure efficiency"),
+ AST_CLI_DEFINE(handle_cli_threadpool_push_serializer_efficiency, "Push tasks to a threadpool in serializers and measure efficiency"),
+};
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_more_destruction);
ast_test_unregister(threadpool_serializer);
ast_test_unregister(threadpool_serializer_dupe);
+ ast_cli_unregister_multiple(cli, ARRAY_LEN(cli));
return 0;
}
static int load_module(void)
{
+ ast_cli_register_multiple(cli, ARRAY_LEN(cli));
ast_test_register(threadpool_push);
ast_test_register(threadpool_initial_threads);
ast_test_register(threadpool_thread_creation);