]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
pjsip: Move from threadpool to taskpool
authorJoshua C. Colp <jcolp@sangoma.com>
Tue, 23 Sep 2025 21:54:22 +0000 (18:54 -0300)
committerAsterisk Development Team <asteriskteam@digium.com>
Thu, 30 Oct 2025 16:09:11 +0000 (16:09 +0000)
This change moves the PJSIP module from the threadpool API
to the taskpool API. PJSIP-specific implementations for
task usage have been removed and replaced with calls to
the optimized taskpool implementations instead. The need
for a pool of serializers has also been removed as
taskpool inherently provides this. The default settings
have also been changed to be more realistic for common
usage.

UpgradeNote: The threadpool_* options in pjsip.conf have now
been deprecated though they continue to be read and used.
They have been replaced with taskpool options that give greater
control over the underlying taskpool used for PJSIP. An alembic
upgrade script has been added to add these options to realtime
as well.

21 files changed:
channels/chan_pjsip.c
configs/samples/pjsip.conf.sample
contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py [new file with mode: 0644]
include/asterisk/res_pjsip.h
include/asterisk/res_pjsip_session.h
include/asterisk/taskpool.h
main/taskpool.c
res/res_pjsip.c
res/res_pjsip/config_system.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip/pjsip_config.xml
res/res_pjsip/pjsip_distributor.c
res/res_pjsip/pjsip_options.c
res/res_pjsip/pjsip_resolver.c
res/res_pjsip_mwi.c
res/res_pjsip_outbound_publish.c
res/res_pjsip_outbound_registration.c
res/res_pjsip_refer.c
res/res_pjsip_rfc3326.c
res/res_pjsip_rfc3329.c
res/res_pjsip_session.c

index 5b03f88951d8a6f8922baf318a10a4d294ff24a6..ae3f58a9b02e54635cb57f7711d95a7511dd18c6 100644 (file)
@@ -720,7 +720,7 @@ static int answer(void *data)
                        ast_channel_name(session->channel), err);
                /*
                 * Return this value so we can distinguish between this
-                * failure and the threadpool synchronous push failing.
+                * failure and the taskpool synchronous push failing.
                 */
                SCOPE_EXIT_RTN_VALUE(-2, "pjproject failure\n");
        }
@@ -753,7 +753,7 @@ static int chan_pjsip_answer(struct ast_channel *ast)
        res = ast_sip_push_task_wait_serializer(session->serializer, answer, &ans_data);
        if (res) {
                if (res == -1) {
-                       ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n",
+                       ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the taskpool.\n",
                                ast_channel_name(session->channel));
                }
                ao2_ref(session, -1);
@@ -2599,7 +2599,7 @@ static int chan_pjsip_hangup(struct ast_channel *ast)
        }
 
        if (ast_sip_push_task(channel->session->serializer, hangup, h_data)) {
-               ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
+               ast_log(LOG_WARNING, "Unable to push hangup task to the taskpool. Expect bad things\n");
                goto failure;
        }
 
index c399ba8a24489a89649bbad5832817f176ef0dde..f3619909b626c5a084483175e4c2857a201d36ee 100644 (file)
 ;timer_b=32000  ; Set transaction timer B value milliseconds (default: "32000")
 ;compact_headers=no     ; Use the short forms of common SIP header names
                         ; (default: "no")
-;threadpool_initial_size=0      ; Initial number of threads in the res_pjsip
-                                ; threadpool (default: "0")
-;threadpool_auto_increment=5    ; The amount by which the number of threads is
-                                ; incremented when necessary (default: "5")
-;threadpool_idle_timeout=60     ; Number of seconds before an idle thread
-                                ; should be disposed of (default: "60")
-;threadpool_max_size=0  ; Maximum number of threads in the res_pjsip threadpool
-                        ; A value of 0 indicates no maximum (default: "0")
+;taskpool_minimum_size=4      ; Minimum number of taskprocessors in the res_pjsip
+                              ; taskpool (default: "4")
+;taskpool_initial_size=4      ; Initial number of taskprocessors in the res_pjsip
+                              ; taskpool (default: "4")
+;taskpool_auto_increment=1    ; The amount by which the number of taskprocessors is
+                              ; incremented when necessary (default: "1")
+;taskpool_idle_timeout=60     ; Number of seconds before an idle taskprocessor
+                              ; should be disposed of (default: "60")
+;taskpool_max_size=50 ; Maximum number of taskprocessors in the res_pjsip taskpool
+                      ; A value of 0 indicates no maximum (default: "50")
 ;disable_tcp_switch=yes ; Disable automatic switching from UDP to TCP transports
                         ; if outgoing request is too large.
                         ; See RFC 3261 section 18.1.1.
diff --git a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py
new file mode 100644 (file)
index 0000000..b07c966
--- /dev/null
@@ -0,0 +1,29 @@
+"""add taskpool options to system
+
+Revision ID: dc7c357dc178
+Revises: abdc9ede147d
+Create Date: 2025-09-24 09:45:17.609185
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'dc7c357dc178'
+down_revision = 'abdc9ede147d'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.add_column('ps_systems', sa.Column('taskpool_minimum_size', sa.Integer))
+    op.add_column('ps_systems', sa.Column('taskpool_initial_size', sa.Integer))
+    op.add_column('ps_systems', sa.Column('taskpool_auto_increment', sa.Integer))
+    op.add_column('ps_systems', sa.Column('taskpool_idle_timeout', sa.Integer))
+    op.add_column('ps_systems', sa.Column('taskpool_max_size', sa.Integer))
+
+def downgrade():
+    op.drop_column('ps_systems', 'taskpool_minimum_size')
+    op.drop_column('ps_systems', 'taskpool_initial_size')
+    op.drop_column('ps_systems', 'taskpool_auto_increment')
+    op.drop_column('ps_systems', 'taskpool_idle_timeout')
+    op.drop_column('ps_systems', 'taskpool_max_size')
index 2fafd5790a2e5d69a9e774d96008507fb94d8c75..c49e9dfac765bc9465bfad69bfe8e695445acabf 100644 (file)
@@ -1921,7 +1921,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * There are three major types of threads that SIP will have to deal with:
  * \li Asterisk threads
  * \li PJSIP threads
- * \li SIP threadpool threads (a.k.a. "servants")
+ * \li SIP taskpool threads (a.k.a. "servants")
  *
  * \par Asterisk Threads
  *
@@ -1963,7 +1963,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * is NULL, then the work will be handed off to whatever servant can currently handle
  * the task. If this pointer is non-NULL, then the task will not be executed until
  * previous tasks pushed with the same serializer have completed. For more information
- * on serializers and the benefits they provide, see \ref ast_threadpool_serializer
+ * on serializers and the benefits they provide, see \ref ast_taskpool_serializer
  *
  * \par Scheduler
  *
@@ -1992,7 +1992,7 @@ typedef int (*ast_sip_task)(void *user_data);
  * \brief Create a new serializer for SIP tasks
  * \since 13.8.0
  *
- * See \ref ast_threadpool_serializer for more information on serializers.
+ * See \ref ast_taskpool_serializer for more information on serializers.
  * SIP creates serializers so that tasks operating on similar data will run
  * in sequence.
  *
@@ -2009,7 +2009,7 @@ struct ast_serializer_shutdown_group;
  * \brief Create a new serializer for SIP tasks
  * \since 13.8.0
  *
- * See \ref ast_threadpool_serializer for more information on serializers.
+ * See \ref ast_taskpool_serializer for more information on serializers.
  * SIP creates serializers so that tasks operating on similar data will run
  * in sequence.
  *
@@ -2251,7 +2251,7 @@ enum ast_sip_scheduler_task_flags {
 struct ast_sip_sched_task;
 
 /*!
- * \brief Schedule a task to run in the res_pjsip thread pool
+ * \brief Schedule a task to run in the res_pjsip taskpool
  * \since 13.9.0
  *
  * \param serializer The serializer to use.  If NULL, don't use a serializer (see note below)
@@ -2266,7 +2266,7 @@ struct ast_sip_sched_task;
  * \par Serialization
  *
  * Specifying a serializer guarantees serialized execution but NOT specifying a serializer
- * may still result in tasks being effectively serialized if the thread pool is busy.
+ * may still result in tasks being effectively serialized if the taskpool is busy.
  * The point of the serializer BTW is not to prevent parallel executions of the SAME task.
  * That happens automatically (see below).  It's to prevent the task from running at the same
  * time as other work using the same serializer, whether or not it's being run by the scheduler.
@@ -3662,15 +3662,15 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
 const char *ast_sip_get_host_ip_string(int af);
 
 /*!
- * \brief Return the size of the SIP threadpool's task queue
+ * \brief Return the size of the SIP taskpool's task queue
  * \since 13.7.0
  */
-long ast_sip_threadpool_queue_size(void);
+long ast_sip_taskpool_queue_size(void);
 
 /*!
- * \brief Retrieve the SIP threadpool object
+ * \brief Retrieve the SIP taskpool object
  */
-struct ast_threadpool *ast_sip_threadpool(void);
+struct ast_taskpool *ast_sip_taskpool(void);
 
 /*!
  * \brief Retrieve transport state
index 75ba60e523b39b9b263268e0cc96d91d437f33f2..7279d0d448f7da7f60db06473fc0292e48307cb4 100644 (file)
@@ -195,7 +195,7 @@ struct ast_sip_session {
        struct ao2_container *datastores;
        /*! Serializer for tasks relating to this SIP session */
        struct ast_taskprocessor *serializer;
-       /*! Non-null if the session serializer is suspended or being suspended. */
+       /*! \deprecated Non-null if the session serializer is suspended or being suspended. */
        struct ast_sip_session_suspender *suspended;
        /*! Requests that could not be sent due to current inv_session state */
        AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests;
index 2a4f963052955956dd19581164a4474692bea58d..bf1c1901ebcea0fa34d86e07f6c70244e443c546 100644 (file)
@@ -318,4 +318,24 @@ struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
  */
 int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data);
 
+/*!
+ * \brief Suspend a serializer, causing tasks to be queued until unsuspended
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param serializer The serializer to suspend
+ */
+void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer);
+
+/*!
+ * \brief Unsuspend a serializer, causing tasks to be executed
+ * \since 23.1.0
+ * \since 22.7.0
+ * \since 20.17.0
+ *
+ * \param serializer The serializer to unsuspend
+ */
+void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer);
+
 #endif /* ASTERISK_TASKPOOL_H */
index 59ac4b0c722c649354506b1c75ae49364a9086c5..987ad3776f6985c83ddfd059c65bf26a3e39c7e8 100644 (file)
@@ -676,6 +676,8 @@ struct serializer {
        struct ast_taskpool *pool;
        /*! Which group will wait for this serializer to shutdown. */
        struct ast_serializer_shutdown_group *shutdown_group;
+       /*! Whether the serializer is suspended or not. */
+       unsigned int suspended:1;
 };
 
 static void serializer_dtor(void *obj)
@@ -727,6 +729,15 @@ static int execute_tasks(void *data)
        ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
        for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
                requeue = ast_taskprocessor_execute(tps);
+
+               /* If the serializer is suspended we will not execute any more tasks and
+                * we will not requeue the taskpool task. Instead it will be requeued when
+                * the serializer is unsuspended.
+                */
+               if (ser->suspended) {
+                       requeue = 0;
+                       break;
+               }
        }
        ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
 
@@ -916,6 +927,72 @@ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int
        return sync_task.fail;
 }
 
+/*!
+ * \internal A task that suspends the serializer after queuing an empty task
+ */
+static int taskpool_serializer_suspend_task(void *data)
+{
+       struct ast_taskprocessor *serializer = data;
+       struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
+       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+       /* If already suspended this is a no-op */
+       if (ser->suspended) {
+               return 0;
+       }
+
+       /* First we queue the empty task to ensure the serializer doesn't reach empty, this
+        * prevents any threads from queueing up a taskpool task that executes the serializer
+        * while it is suspended, allowing us to queue it ourselves when the serializer is
+        * unsuspended.
+        */
+        if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) {
+               return 0;
+       }
+
+       /* Next we suspend the serializer so that the execute_tasks currently executing stops
+        * and doesn't requeue.
+        */
+       ser->suspended = 1;
+
+       return 0;
+ }
+
+void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer)
+{
+       if (ast_taskprocessor_is_task(serializer)) {
+               /* I am the session's serializer thread so I cannot suspend. */
+               return;
+       }
+
+       /* Once this returns there is no thread executing the tasks on the serializer, so they
+        * will accumulate until the serializer is unsuspended.
+        */
+       ast_taskpool_serializer_push_wait(serializer, taskpool_serializer_suspend_task, serializer);
+}
+
+void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer)
+{
+       struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
+       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+       ao2_lock(ser);
+
+       if (!ser->suspended) {
+               ao2_unlock(ser);
+               return;
+       }
+
+       ser->suspended = 0;
+
+       ao2_unlock(ser);
+
+       /* And now we kick off handling of the queued tasks once again */
+       if (ast_taskpool_push(ser->pool, execute_tasks, ao2_bump(serializer))) {
+               ast_taskprocessor_unreference(serializer);
+       }
+}
+
 /*!
  * \internal
  * \brief Clean up resources on Asterisk shutdown
index 5b954b222682ee404a14f1ddb487de68aab408b2..e3e1c59852731c101918da9070f749e60c92d8d3 100644 (file)
@@ -37,7 +37,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/module.h"
 #include "asterisk/serializer.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/uuid.h"
 #include "asterisk/sorcery.h"
 
 #define MOD_DATA_CONTACT "contact"
 
-/*! Number of serializers in pool if one not supplied. */
-#define SERIALIZER_POOL_SIZE           8
-
-/*! Pool of serializers to use if not supplied. */
-static struct ast_serializer_pool *sip_serializer_pool;
-
 static pjsip_endpoint *ast_pjsip_endpoint;
 
-static struct ast_threadpool *sip_threadpool;
+static struct ast_taskpool *sip_taskpool;
 
 /*! Local host address for IPv4 */
 static pj_sockaddr host_ip_ipv4;
@@ -2088,7 +2082,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text)
 
 struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group)
 {
-       return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
+       return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group);
 }
 
 struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
@@ -2099,67 +2093,18 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
 int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
        if (!serializer) {
-               serializer = ast_serializer_pool_get(sip_serializer_pool);
+               return ast_taskpool_push(sip_taskpool, sip_task, task_data);
        }
 
        return ast_taskprocessor_push(serializer, sip_task, task_data);
 }
 
-struct sync_task_data {
-       ast_mutex_t lock;
-       ast_cond_t cond;
-       int complete;
-       int fail;
-       int (*task)(void *);
-       void *task_data;
-};
-
-static int sync_task(void *data)
-{
-       struct sync_task_data *std = data;
-       int ret;
-
-       std->fail = std->task(std->task_data);
-
-       /*
-        * Once we unlock std->lock after signaling, we cannot access
-        * std again.  The thread waiting within ast_sip_push_task_wait()
-        * is free to continue and release its local variable (std).
-        */
-       ast_mutex_lock(&std->lock);
-       std->complete = 1;
-       ast_cond_signal(&std->cond);
-       ret = std->fail;
-       ast_mutex_unlock(&std->lock);
-       return ret;
-}
-
 static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
-       /* This method is an onion */
-       struct sync_task_data std;
-
-       memset(&std, 0, sizeof(std));
-       ast_mutex_init(&std.lock);
-       ast_cond_init(&std.cond, NULL);
-       std.task = sip_task;
-       std.task_data = task_data;
-
-       if (ast_sip_push_task(serializer, sync_task, &std)) {
-               ast_mutex_destroy(&std.lock);
-               ast_cond_destroy(&std.cond);
-               return -1;
-       }
-
-       ast_mutex_lock(&std.lock);
-       while (!std.complete) {
-               ast_cond_wait(&std.cond, &std.lock);
+       if (!serializer) {
+               return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
        }
-       ast_mutex_unlock(&std.lock);
-
-       ast_mutex_destroy(&std.lock);
-       ast_cond_destroy(&std.cond);
-       return std.fail;
+       return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
 }
 
 int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
@@ -2179,23 +2124,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
 int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
        if (!serializer) {
-               /* Caller doesn't care which PJSIP serializer the task executes under. */
-               serializer = ast_serializer_pool_get(sip_serializer_pool);
-               if (!serializer) {
-                       /* No serializer picked to execute the task */
-                       return -1;
-               }
-       }
-       if (ast_taskprocessor_is_task(serializer)) {
-               /*
-                * We are the requested serializer so we must execute
-                * the task now or deadlock waiting on ourself to
-                * execute it.
-                */
-               return sip_task(task_data);
+               return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
        }
 
-       return ast_sip_push_task_wait(serializer, sip_task, task_data);
+       return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
 }
 
 void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
@@ -3454,14 +3386,14 @@ static void remove_request_headers(pjsip_endpoint *endpt)
        }
 }
 
-long ast_sip_threadpool_queue_size(void)
+long ast_sip_taskpool_queue_size(void)
 {
-       return ast_threadpool_queue_size(sip_threadpool);
+       return ast_taskpool_queue_size(sip_taskpool);
 }
 
-struct ast_threadpool *ast_sip_threadpool(void)
+struct ast_taskpool *ast_sip_taskpool(void)
 {
-       return sip_threadpool;
+       return sip_taskpool;
 }
 
 int ast_sip_is_uri_sip_sips(pjsip_uri *uri)
@@ -3801,7 +3733,7 @@ static int unload_pjsip(void *data)
         * These calls need the pjsip endpoint and serializer to clean up.
         * If they're not set, then there's nothing to clean up anyway.
         */
-       if (ast_pjsip_endpoint && sip_serializer_pool) {
+       if (ast_pjsip_endpoint) {
                ast_res_pjsip_cleanup_options_handling();
                ast_res_pjsip_cleanup_message_filter();
                ast_sip_destroy_distributor();
@@ -3921,7 +3853,7 @@ pjsip_media_type pjsip_media_type_text_plain;
 
 static int load_module(void)
 {
-       struct ast_threadpool_options options;
+       struct ast_taskpool_options options;
 
        /* pjproject and config_system need to be initialized before all else */
        if (pj_init() != PJ_SUCCESS) {
@@ -3958,18 +3890,11 @@ static int load_module(void)
                goto error;
        }
 
-       /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
-       sip_get_threadpool_options(&options);
+       /* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */
+       sip_get_taskpool_options(&options);
        options.thread_start = sip_thread_start;
-       sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
-       if (!sip_threadpool) {
-               goto error;
-       }
-
-       sip_serializer_pool = ast_serializer_pool_create(
-               "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1);
-       if (!sip_serializer_pool) {
-               ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n");
+       sip_taskpool = ast_taskpool_create("pjsip", &options);
+       if (!sip_taskpool) {
                goto error;
        }
 
@@ -4049,8 +3974,7 @@ error:
 
        /* These functions all check for NULLs and are safe to call at any time */
        ast_sip_destroy_scheduler();
-       ast_serializer_pool_destroy(sip_serializer_pool);
-       ast_threadpool_shutdown(sip_threadpool);
+       ast_taskpool_shutdown(sip_taskpool);
 
        return AST_MODULE_LOAD_DECLINE;
 }
@@ -4076,12 +4000,11 @@ static int unload_module(void)
        ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
        /* The thread this is called from cannot call PJSIP/PJLIB functions,
-        * so we have to push the work to the threadpool to handle
+        * so we have to push the work to the taskpool to handle
         */
        ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
        ast_sip_destroy_scheduler();
-       ast_serializer_pool_destroy(sip_serializer_pool);
-       ast_threadpool_shutdown(sip_threadpool);
+       ast_taskpool_shutdown(sip_taskpool);
 
        return 0;
 }
index e16738f610532860cc5040c053565220f69d873c..c782c6f28ea7f81ec0c87e1285aad547c4f53bc8 100644 (file)
@@ -24,7 +24,7 @@
 #include "asterisk/res_pjsip.h"
 #include "asterisk/sorcery.h"
 #include "include/res_pjsip_private.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
 #include "asterisk/dns.h"
 #include "asterisk/res_pjsip_cli.h"
 
@@ -41,15 +41,17 @@ struct system_config {
        /*! Should we use short forms for headers? */
        unsigned int compactheaders;
        struct {
-               /*! Initial number of threads in the threadpool */
+               /*! Minimum number of taskprocessors in the taskpool */
+               int minimum_size;
+               /*! Initial number of taskprocessors in the taskpool */
                int initial_size;
-               /*! The amount by which the number of threads is incremented when necessary */
+               /*! The amount by which the number of taskprocessors is incremented when necessary */
                int auto_increment;
-               /*! Thread idle timeout in seconds */
+               /*! Taskprocessor idle timeout in seconds */
                int idle_timeout;
-               /*! Maxumum number of threads in the threadpool */
+               /*! Maxumum number of taskprocessors in the taskpool */
                int max_size;
-       } threadpool;
+       } taskpool;
        /*! Nonzero to disable switching from UDP to TCP transport */
        unsigned int disable_tcp_switch;
        /*!
@@ -63,13 +65,13 @@ struct system_config {
        unsigned int disable_rport;
 };
 
-static struct ast_threadpool_options sip_threadpool_options = {
-       .version = AST_THREADPOOL_OPTIONS_VERSION,
+static struct ast_taskpool_options sip_taskpool_options = {
+       .version = AST_TASKPOOL_OPTIONS_VERSION,
 };
 
-void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options)
+void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options)
 {
-       *threadpool_options = sip_threadpool_options;
+       *taskpool_options = sip_taskpool_options;
 }
 
 static struct ast_sorcery *system_sorcery;
@@ -125,10 +127,11 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj)
 #endif
        }
 
-       sip_threadpool_options.initial_size = system->threadpool.initial_size;
-       sip_threadpool_options.auto_increment = system->threadpool.auto_increment;
-       sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout;
-       sip_threadpool_options.max_size = system->threadpool.max_size;
+       sip_taskpool_options.minimum_size = system->taskpool.minimum_size;
+       sip_taskpool_options.initial_size = system->taskpool.initial_size;
+       sip_taskpool_options.auto_increment = system->taskpool.auto_increment;
+       sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout;
+       sip_taskpool_options.max_size = system->taskpool.max_size;
 
        pjsip_cfg()->endpt.disable_tcp_switch =
                system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE;
@@ -199,14 +202,24 @@ int ast_sip_initialize_system(void)
                        OPT_UINT_T, 0, FLDSET(struct system_config, timerb));
        ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no",
                        OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders));
-       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size));
-       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment));
+       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size));
+       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
+       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
+       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
+       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
+       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
        ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout));
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
+       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
        ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size));
+                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
        ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes",
                        OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch));
        ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes",
index dcb821cedcd6a30c5eca30a5719c722be7248a7c..11582ad1c4536c31c3cad701b9d546a9b908e16e 100644 (file)
@@ -18,7 +18,7 @@
 #include "asterisk/compat.h"
 
 struct ao2_container;
-struct ast_threadpool_options;
+struct ast_taskpool_options;
 struct ast_sip_cli_context;
 
 /*!
@@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void);
  * \brief Initialize the distributor module
  *
  * The distributor module is responsible for taking an incoming
- * SIP message and placing it into the threadpool. Once in the threadpool,
+ * SIP message and placing it into the taskpool. Once in the taskpool,
  * the distributor will perform endpoint lookups and authentication, and
  * then distribute the message up the stack to any further modules.
  *
@@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void);
 
 /*!
  * \internal
- * \brief Get threadpool options
+ * \brief Get taskpool options
  */
-void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options);
+void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options);
 
 /*!
  * \internal
index cef061b84cc10a11468cd2c4d77c0135ae92ca22..8019426da792de12d867068605a0bbe39eb4ea78 100644 (file)
                                        </since>
                                        <synopsis>Use the short forms of common SIP header names.</synopsis>
                                </configOption>
-                               <configOption name="threadpool_initial_size" default="0">
+                               <configOption name="taskpool_minimum_size" default="4">
+                                       <since>
+                                               <version>23.1.0</version>
+                                               <version>22.7.0</version>
+                                               <version>20.17.0</version>
+                                       </since>
+                                       <synopsis>Minimum number of taskprocessors in the res_pjsip taskpool.</synopsis>
+                               </configOption>
+                               <configOption name="taskpool_initial_size" default="4">
+                                       <since>
+                                               <version>23.1.0</version>
+                                               <version>22.7.0</version>
+                                               <version>20.17.0</version>
+                                       </since>
+                                       <synopsis>Initial number of taskprocessors in the res_pjsip taskpool.</synopsis>
+                               </configOption>
+                               <configOption name="taskpool_auto_increment" default="1">
+                                       <since>
+                                               <version>23.1.0</version>
+                                               <version>22.7.0</version>
+                                               <version>20.17.0</version>
+                                       </since>
+                                       <synopsis>The amount by which the number of taskprocessors is incremented when necessary.</synopsis>
+                               </configOption>
+                               <configOption name="taskpool_idle_timeout" default="60">
+                                       <since>
+                                               <version>23.1.0</version>
+                                               <version>22.7.0</version>
+                                               <version>20.17.0</version>
+                                       </since>
+                                       <synopsis>Number of seconds before an idle taskprocessor should be disposed of.</synopsis>
+                               </configOption>
+                               <configOption name="taskpool_max_size" default="50">
+                                       <since>
+                                               <version>23.1.0</version>
+                                               <version>22.7.0</version>
+                                               <version>20.17.0</version>
+                                       </since>
+                                       <synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
+                                       A value of 0 indicates no maximum.</synopsis>
+                               </configOption>
+                               <configOption name="threadpool_initial_size" default="4">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>Initial number of threads in the res_pjsip threadpool.</synopsis>
+                                       <synopsis>Initial number of threads in the res_pjsip taskpool.
+                                               Deprecated in favor of taskpool_initiali_size.</synopsis>
                                </configOption>
-                               <configOption name="threadpool_auto_increment" default="5">
+                               <configOption name="threadpool_auto_increment" default="1">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>The amount by which the number of threads is incremented when necessary.</synopsis>
+                                       <synopsis>The amount by which the number of threads is incremented when necessary.
+                                               Deprecated in favor of taskpool_auto_increment.</synopsis>
                                </configOption>
                                <configOption name="threadpool_idle_timeout" default="60">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>Number of seconds before an idle thread should be disposed of.</synopsis>
+                                       <synopsis>Number of seconds before an idle taskprocessor should be disposed of.
+                                               Deprecated in favor of taskpool_idle_timeout.</synopsis>
                                </configOption>
-                               <configOption name="threadpool_max_size" default="0">
+                               <configOption name="threadpool_max_size" default="50">
                                        <since>
                                                <version>13.7.0</version>
                                        </since>
-                                       <synopsis>Maximum number of threads in the res_pjsip threadpool.
-                                       A value of 0 indicates no maximum.</synopsis>
+                                       <synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
+                                       A value of 0 indicates no maximum.
+                                       Deprecated in favor of taskpool_max_size.</synopsis>
                                </configOption>
                                <configOption name="disable_tcp_switch" default="yes">
                                        <since>
index c96061110192e8a3b77a1875cae73f983536440f..adf18d3b8b6b2ec2f78f2eb27755644c8d163a34 100644 (file)
@@ -24,7 +24,7 @@
 #include "asterisk/acl.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
 #include "asterisk/res_pjsip_cli.h"
 
 static int distribute(void *data);
@@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata)
 {
        struct ast_taskprocessor *serializer;
 
-       serializer = ast_threadpool_serializer_get_current();
+       serializer = ast_taskpool_serializer_get_current();
        if (serializer) {
                const char *name;
 
index 93354b930949ee68a2be0ca1f97b41e5966d4c68..1d75c903ed007d29386fcde0b299c87c42809687 100644 (file)
@@ -33,7 +33,7 @@
 #include "asterisk/statsd.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/serializer_shutdown_group.h"
 
 /*
  * This implementation for OPTIONS support is based around the idea
index 2babfe40de1bec2fafaab5131950fadc105cc049..f5f4a1cfc5a1f11a8f092dd364f343bbf8c718b5 100644 (file)
@@ -31,7 +31,7 @@
 #include "asterisk/res_pjsip.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
 
 #ifdef HAVE_PJSIP_EXTERNAL_RESOLVER
 
@@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip
                return;
        }
 
-       resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current());
+       resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current());
 
        ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host);
        ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve);
index 072ddce5cca180ebe0dfc83f6dcd9aca644fd646..22c7a1f345167fc5709c78a31170cfef217c6e89 100644 (file)
@@ -1573,8 +1573,8 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi",
-               MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME);
+       mwi_serializer_pool = ast_serializer_taskpool_create("pjsip/mwi",
+               MWI_SERIALIZER_POOL_SIZE, ast_sip_taskpool(), MAX_UNLOAD_TIMEOUT_TIME);
        if (!mwi_serializer_pool) {
                ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
        }
index 4b47f2b33f877937790990acb7cf5a6bc759fd6b..538df56125e1dc1d50be11dfc9c9c2108c97c15e 100644 (file)
@@ -33,7 +33,8 @@
 #include "asterisk/res_pjsip_outbound_publish.h"
 #include "asterisk/module.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/serializer_shutdown_group.h"
 #include "asterisk/datastore.h"
 #include "res_pjsip/include/res_pjsip_private.h"
 
index 75744affa96e0e62aba052543b6e76538903f97e..2746eeaea98cf86dac799ae4b36ca1106b569567 100644 (file)
@@ -35,7 +35,8 @@
 #include "asterisk/cli.h"
 #include "asterisk/stasis_system.h"
 #include "asterisk/threadstorage.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
+#include "asterisk/serializer_shutdown_group.h"
 #include "asterisk/statsd.h"
 #include "res_pjsip/include/res_pjsip_private.h"
 #include "asterisk/vector.h"
@@ -1669,7 +1670,7 @@ static void sip_outbound_registration_response_cb(struct pjsip_regc_cbparam *par
         * pjproject callback thread.
         */
        if (ast_sip_push_task(client_state->serializer, handle_registration_response, response)) {
-               ast_log(LOG_WARNING, "Failed to pass incoming registration response to threadpool\n");
+               ast_log(LOG_WARNING, "Failed to pass incoming registration response to taskpool\n");
                ao2_cleanup(response);
        }
 }
@@ -1690,7 +1691,7 @@ static void sip_outbound_registration_state_destroy(void *obj)
                ao2_ref(state->client_state, -1);
        } else if (ast_sip_push_task(state->client_state->serializer,
                handle_client_state_destruction, state->client_state)) {
-               ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to threadpool\n");
+               ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to taskpool\n");
                ao2_ref(state->client_state, -1);
        }
 }
index 6616b1c0c9123c228aa9c1bf607389470be3e2f6..67d7836ce732f8aa670d41c48d7a2d584c8c881f 100644 (file)
@@ -343,7 +343,7 @@ static struct ast_frame *refer_progress_framehook(struct ast_channel *chan, stru
                }
        }
 
-       /* If a notification is due to be sent push it to the thread pool */
+       /* If a notification is due to be sent push it to the taskpool */
        if (notification) {
                /* If the subscription is being terminated we don't need the frame hook any longer */
                if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
@@ -395,7 +395,7 @@ static struct ast_frame *refer_ari_progress_framehook(struct ast_channel *chan,
                progress->ari_state->last_response = *message;
        }
 
-       /* If a notification is due to be sent push it to the thread pool */
+       /* If a notification is due to be sent push it to the taskpool */
        if (notification) {
                /* If the subscription is being terminated we don't need the frame hook any longer */
                if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
index e4e4e1b12fc0363c8c4e02852df61ccf43b3a9f1..a4992225fe6783b7c15707b6cb3fb5cecf0adfdc 100644 (file)
@@ -32,7 +32,7 @@
 #include "asterisk/res_pjsip_session.h"
 #include "asterisk/module.h"
 #include "asterisk/causes.h"
-#include "asterisk/threadpool.h"
+#include "asterisk/taskpool.h"
 
 static void rfc3326_use_reason_header(struct ast_sip_session *session, struct pjsip_rx_data *rdata)
 {
@@ -139,7 +139,7 @@ static void rfc3326_outgoing_request(struct ast_sip_session *session, struct pjs
                 * checks so we must also be running under the call's serializer
                 * thread.
                 */
-               || session->serializer != ast_threadpool_serializer_get_current()) {
+               || session->serializer != ast_taskpool_serializer_get_current()) {
                return;
        }
 
@@ -152,7 +152,7 @@ static void rfc3326_outgoing_response(struct ast_sip_session *session, struct pj
 
        if (status.code < 300
                || !session->channel
-               || session->serializer != ast_threadpool_serializer_get_current()) {
+               || session->serializer != ast_taskpool_serializer_get_current()) {
                return;
        }
 
index f6faff2afe67e782ee4dbb4e119b8a8c33615423..d57392d4f89ad66c2ff33a44dba7dcaae0700505 100644 (file)
@@ -32,7 +32,6 @@
 #include "asterisk/res_pjsip_session.h"
 #include "asterisk/module.h"
 #include "asterisk/causes.h"
-#include "asterisk/threadpool.h"
 
 /*! \brief Private data structure used with the modules's datastore */
 struct rfc3329_store_data {
index bca226c85753f03095eac00365b3cda5e0f6b70f..135aa1927950687851f70d53474ff1cf1382c52b 100644 (file)
@@ -42,6 +42,7 @@
 #include "asterisk/uuid.h"
 #include "asterisk/pbx.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/taskpool.h"
 #include "asterisk/causes.h"
 #include "asterisk/sdp_srtp.h"
 #include "asterisk/dsp.h"
@@ -3127,115 +3128,14 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint,
        return ret_session;
 }
 
-/*! \brief struct controlling the suspension of the session's serializer. */
-struct ast_sip_session_suspender {
-       ast_cond_t cond_suspended;
-       ast_cond_t cond_complete;
-       int suspended;
-       int complete;
-};
-
-static void sip_session_suspender_dtor(void *vdoomed)
-{
-       struct ast_sip_session_suspender *doomed = vdoomed;
-
-       ast_cond_destroy(&doomed->cond_suspended);
-       ast_cond_destroy(&doomed->cond_complete);
-}
-
-/*!
- * \internal
- * \brief Block the session serializer thread task.
- *
- * \param data Pushed serializer task data for suspension.
- *
- * \retval 0
- */
-static int sip_session_suspend_task(void *data)
-{
-       struct ast_sip_session_suspender *suspender = data;
-
-       ao2_lock(suspender);
-
-       /* Signal that the serializer task is now suspended. */
-       suspender->suspended = 1;
-       ast_cond_signal(&suspender->cond_suspended);
-
-       /* Wait for the serializer suspension to be completed. */
-       while (!suspender->complete) {
-               ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender));
-       }
-
-       ao2_unlock(suspender);
-       ao2_ref(suspender, -1);
-
-       return 0;
-}
-
 void ast_sip_session_suspend(struct ast_sip_session *session)
 {
-       struct ast_sip_session_suspender *suspender;
-       int res;
-
-       ast_assert(session->suspended == NULL);
-
-       if (ast_taskprocessor_is_task(session->serializer)) {
-               /* I am the session's serializer thread so I cannot suspend. */
-               return;
-       }
-
-       if (ast_taskprocessor_is_suspended(session->serializer)) {
-               /* The serializer already suspended. */
-               return;
-       }
-
-       suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor);
-       if (!suspender) {
-               /* We will just have to hope that the system does not deadlock */
-               return;
-       }
-       ast_cond_init(&suspender->cond_suspended, NULL);
-       ast_cond_init(&suspender->cond_complete, NULL);
-
-       ao2_ref(suspender, +1);
-       res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender);
-       if (res) {
-               /* We will just have to hope that the system does not deadlock */
-               ao2_ref(suspender, -2);
-               return;
-       }
-
-       session->suspended = suspender;
-
-       /* Wait for the serializer to get suspended. */
-       ao2_lock(suspender);
-       while (!suspender->suspended) {
-               ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender));
-       }
-       ao2_unlock(suspender);
-
-       ast_taskprocessor_suspend(session->serializer);
+       ast_taskpool_serializer_suspend(session->serializer);
 }
 
 void ast_sip_session_unsuspend(struct ast_sip_session *session)
 {
-       struct ast_sip_session_suspender *suspender = session->suspended;
-
-       if (!suspender) {
-               /* Nothing to do */
-               return;
-       }
-       session->suspended = NULL;
-
-       /* Signal that the serializer task suspension is now complete. */
-       ao2_lock(suspender);
-       suspender->complete = 1;
-       ast_cond_signal(&suspender->cond_complete);
-       ao2_unlock(suspender);
-
-       ao2_ref(suspender, -1);
-
-       ast_taskprocessor_unsuspend(session->serializer);
+       ast_taskpool_serializer_unsuspend(session->serializer);
 }
 
 /*!