]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Revert "pjsip: Move from threadpool to taskpool"
authorJoshua C. Colp <smooth.egg8323@colp.dev>
Mon, 27 Oct 2025 13:06:49 +0000 (10:06 -0300)
committergithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Tue, 28 Oct 2025 12:45:00 +0000 (12:45 +0000)
This reverts commit bb6b76c2d8239b2665223dcbf6d507aa9aa4534e.

21 files changed:
channels/chan_pjsip.c
configs/samples/pjsip.conf.sample
contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py [deleted file]
include/asterisk/res_pjsip.h
include/asterisk/res_pjsip_session.h
include/asterisk/taskpool.h
main/taskpool.c
res/res_pjsip.c
res/res_pjsip/config_system.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip/pjsip_config.xml
res/res_pjsip/pjsip_distributor.c
res/res_pjsip/pjsip_options.c
res/res_pjsip/pjsip_resolver.c
res/res_pjsip_mwi.c
res/res_pjsip_outbound_publish.c
res/res_pjsip_outbound_registration.c
res/res_pjsip_refer.c
res/res_pjsip_rfc3326.c
res/res_pjsip_rfc3329.c
res/res_pjsip_session.c

index f44c51edaea90a4570227cce93637ce3ee7e8f58..e9c037cc22fe7fa3ecd67bcebebf1168d8e24117 100644 (file)
@@ -720,7 +720,7 @@ static int answer(void *data)
                        ast_channel_name(session->channel), err);
                /*
                 * Return this value so we can distinguish between this
-                * failure and the taskpool synchronous push failing.
+                * failure and the threadpool synchronous push failing.
                 */
                SCOPE_EXIT_RTN_VALUE(-2, "pjproject failure\n");
        }
@@ -753,7 +753,7 @@ static int chan_pjsip_answer(struct ast_channel *ast)
        res = ast_sip_push_task_wait_serializer(session->serializer, answer, &ans_data);
        if (res) {
                if (res == -1) {
-                       ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the taskpool.\n",
+                       ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n",
                                ast_channel_name(session->channel));
                }
                ao2_ref(session, -1);
@@ -2601,7 +2601,7 @@ static int chan_pjsip_hangup(struct ast_channel *ast)
        }
 
        if (ast_sip_push_task(channel->session->serializer, hangup, h_data)) {
-               ast_log(LOG_WARNING, "Unable to push hangup task to the taskpool. Expect bad things\n");
+               ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
                goto failure;
        }
 
index f3619909b626c5a084483175e4c2857a201d36ee..c399ba8a24489a89649bbad5832817f176ef0dde 100644 (file)
 ;timer_b=32000  ; Set transaction timer B value milliseconds (default: "32000")
 ;compact_headers=no     ; Use the short forms of common SIP header names
                         ; (default: "no")
-;taskpool_minimum_size=4      ; Minimum number of taskprocessors in the res_pjsip
-                              ; taskpool (default: "4")
-;taskpool_initial_size=4      ; Initial number of taskprocessors in the res_pjsip
-                              ; taskpool (default: "4")
-;taskpool_auto_increment=1    ; The amount by which the number of taskprocessors is
-                              ; incremented when necessary (default: "1")
-;taskpool_idle_timeout=60     ; Number of seconds before an idle taskprocessor
-                              ; should be disposed of (default: "60")
-;taskpool_max_size=50 ; Maximum number of taskprocessors in the res_pjsip taskpool
-                      ; A value of 0 indicates no maximum (default: "50")
+;threadpool_initial_size=0      ; Initial number of threads in the res_pjsip
+                                ; threadpool (default: "0")
+;threadpool_auto_increment=5    ; The amount by which the number of threads is
+                                ; incremented when necessary (default: "5")
+;threadpool_idle_timeout=60     ; Number of seconds before an idle thread
+                                ; should be disposed of (default: "60")
+;threadpool_max_size=0  ; Maximum number of threads in the res_pjsip threadpool
+                        ; A value of 0 indicates no maximum (default: "0")
 ;disable_tcp_switch=yes ; Disable automatic switching from UDP to TCP transports
                         ; if outgoing request is too large.
                         ; See RFC 3261 section 18.1.1.
diff --git a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py
deleted file mode 100644 (file)
index b07c966..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-"""add taskpool options to system
-
-Revision ID: dc7c357dc178
-Revises: abdc9ede147d
-Create Date: 2025-09-24 09:45:17.609185
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = 'dc7c357dc178'
-down_revision = 'abdc9ede147d'
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
-    op.add_column('ps_systems', sa.Column('taskpool_minimum_size', sa.Integer))
-    op.add_column('ps_systems', sa.Column('taskpool_initial_size', sa.Integer))
-    op.add_column('ps_systems', sa.Column('taskpool_auto_increment', sa.Integer))
-    op.add_column('ps_systems', sa.Column('taskpool_idle_timeout', sa.Integer))
-    op.add_column('ps_systems', sa.Column('taskpool_max_size', sa.Integer))
-
-def downgrade():
-    op.drop_column('ps_systems', 'taskpool_minimum_size')
-    op.drop_column('ps_systems', 'taskpool_initial_size')
-    op.drop_column('ps_systems', 'taskpool_auto_increment')
-    op.drop_column('ps_systems', 'taskpool_idle_timeout')
-    op.drop_column('ps_systems', 'taskpool_max_size')
index c49e9dfac765bc9465bfad69bfe8e695445acabf..2fafd5790a2e5d69a9e774d96008507fb94d8c75 100644 (file)
@@ -1921,7 +1921,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * There are three major types of threads that SIP will have to deal with:
  * \li Asterisk threads
  * \li PJSIP threads
- * \li SIP taskpool threads (a.k.a. "servants")
+ * \li SIP threadpool threads (a.k.a. "servants")
  *
  * \par Asterisk Threads
  *
@@ -1963,7 +1963,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * is NULL, then the work will be handed off to whatever servant can currently handle
  * the task. If this pointer is non-NULL, then the task will not be executed until
  * previous tasks pushed with the same serializer have completed. For more information
- * on serializers and the benefits they provide, see \ref ast_taskpool_serializer
+ * on serializers and the benefits they provide, see \ref ast_threadpool_serializer
  *
  * \par Scheduler
  *
@@ -1992,7 +1992,7 @@ typedef int (*ast_sip_task)(void *user_data);
  * \brief Create a new serializer for SIP tasks
  * \since 13.8.0
  *
- * See \ref ast_taskpool_serializer for more information on serializers.
+ * See \ref ast_threadpool_serializer for more information on serializers.
  * SIP creates serializers so that tasks operating on similar data will run
  * in sequence.
  *
@@ -2009,7 +2009,7 @@ struct ast_serializer_shutdown_group;
  * \brief Create a new serializer for SIP tasks
  * \since 13.8.0
  *
- * See \ref ast_taskpool_serializer for more information on serializers.
+ * See \ref ast_threadpool_serializer for more information on serializers.
  * SIP creates serializers so that tasks operating on similar data will run
  * in sequence.
  *
@@ -2251,7 +2251,7 @@ enum ast_sip_scheduler_task_flags {
 struct ast_sip_sched_task;
 
 /*!
- * \brief Schedule a task to run in the res_pjsip taskpool
+ * \brief Schedule a task to run in the res_pjsip thread pool
  * \since 13.9.0
  *
  * \param serializer The serializer to use.  If NULL, don't use a serializer (see note below)
@@ -2266,7 +2266,7 @@ struct ast_sip_sched_task;
  * \par Serialization
  *
  * Specifying a serializer guarantees serialized execution but NOT specifying a serializer
- * may still result in tasks being effectively serialized if the taskpool is busy.
+ * may still result in tasks being effectively serialized if the thread pool is busy.
  * The point of the serializer BTW is not to prevent parallel executions of the SAME task.
  * That happens automatically (see below).  It's to prevent the task from running at the same
  * time as other work using the same serializer, whether or not it's being run by the scheduler.
@@ -3662,15 +3662,15 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
 const char *ast_sip_get_host_ip_string(int af);
 
 /*!
- * \brief Return the size of the SIP taskpool's task queue
+ * \brief Return the size of the SIP threadpool's task queue
  * \since 13.7.0
  */
-long ast_sip_taskpool_queue_size(void);
+long ast_sip_threadpool_queue_size(void);
 
 /*!
- * \brief Retrieve the SIP taskpool object
+ * \brief Retrieve the SIP threadpool object
  */
-struct ast_taskpool *ast_sip_taskpool(void);
+struct ast_threadpool *ast_sip_threadpool(void);
 
 /*!
  * \brief Retrieve transport state
index 7279d0d448f7da7f60db06473fc0292e48307cb4..75ba60e523b39b9b263268e0cc96d91d437f33f2 100644 (file)
@@ -195,7 +195,7 @@ struct ast_sip_session {
        struct ao2_container *datastores;
        /*! Serializer for tasks relating to this SIP session */
        struct ast_taskprocessor *serializer;
-       /*! \deprecated Non-null if the session serializer is suspended or being suspended. */
+       /*! Non-null if the session serializer is suspended or being suspended. */
        struct ast_sip_session_suspender *suspended;
        /*! Requests that could not be sent due to current inv_session state */
        AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests;
index bf1c1901ebcea0fa34d86e07f6c70244e443c546..2a4f963052955956dd19581164a4474692bea58d 100644 (file)
@@ -318,24 +318,4 @@ struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name,
  */
 int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data);
 
-/*!
- * \brief Suspend a serializer, causing tasks to be queued until unsuspended
- * \since 23.1.0
- * \since 22.7.0
- * \since 20.17.0
- *
- * \param serializer The serializer to suspend
- */
-void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer);
-
-/*!
- * \brief Unsuspend a serializer, causing tasks to be executed
- * \since 23.1.0
- * \since 22.7.0
- * \since 20.17.0
- *
- * \param serializer The serializer to unsuspend
- */
-void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer);
-
 #endif /* ASTERISK_TASKPOOL_H */
index 987ad3776f6985c83ddfd059c65bf26a3e39c7e8..59ac4b0c722c649354506b1c75ae49364a9086c5 100644 (file)
@@ -676,8 +676,6 @@ struct serializer {
        struct ast_taskpool *pool;
        /*! Which group will wait for this serializer to shutdown. */
        struct ast_serializer_shutdown_group *shutdown_group;
-       /*! Whether the serializer is suspended or not. */
-       unsigned int suspended:1;
 };
 
 static void serializer_dtor(void *obj)
@@ -729,15 +727,6 @@ static int execute_tasks(void *data)
        ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
        for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
                requeue = ast_taskprocessor_execute(tps);
-
-               /* If the serializer is suspended we will not execute any more tasks and
-                * we will not requeue the taskpool task. Instead it will be requeued when
-                * the serializer is unsuspended.
-                */
-               if (ser->suspended) {
-                       requeue = 0;
-                       break;
-               }
        }
        ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
 
@@ -927,72 +916,6 @@ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int
        return sync_task.fail;
 }
 
-/*!
- * \internal A task that suspends the serializer after queuing an empty task
- */
-static int taskpool_serializer_suspend_task(void *data)
-{
-       struct ast_taskprocessor *serializer = data;
-       struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
-       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
-
-       /* If already suspended this is a no-op */
-       if (ser->suspended) {
-               return 0;
-       }
-
-       /* First we queue the empty task to ensure the serializer doesn't reach empty, this
-        * prevents any threads from queueing up a taskpool task that executes the serializer
-        * while it is suspended, allowing us to queue it ourselves when the serializer is
-        * unsuspended.
-        */
-        if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) {
-               return 0;
-       }
-
-       /* Next we suspend the serializer so that the execute_tasks currently executing stops
-        * and doesn't requeue.
-        */
-       ser->suspended = 1;
-
-       return 0;
- }
-
-void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer)
-{
-       if (ast_taskprocessor_is_task(serializer)) {
-               /* I am the session's serializer thread so I cannot suspend. */
-               return;
-       }
-
-       /* Once this returns there is no thread executing the tasks on the serializer, so they
-        * will accumulate until the serializer is unsuspended.
-        */
-       ast_taskpool_serializer_push_wait(serializer, taskpool_serializer_suspend_task, serializer);
-}
-
-void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer)
-{
-       struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer);
-       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
-
-       ao2_lock(ser);
-
-       if (!ser->suspended) {
-               ao2_unlock(ser);
-               return;
-       }
-
-       ser->suspended = 0;
-
-       ao2_unlock(ser);
-
-       /* And now we kick off handling of the queued tasks once again */
-       if (ast_taskpool_push(ser->pool, execute_tasks, ao2_bump(serializer))) {
-               ast_taskprocessor_unreference(serializer);
-       }
-}
-
 /*!
  * \internal
  * \brief Clean up resources on Asterisk shutdown
index e3e1c59852731c101918da9070f749e60c92d8d3..5b954b222682ee404a14f1ddb487de68aab408b2 100644 (file)
@@ -37,7 +37,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/module.h"
 #include "asterisk/serializer.h"
-#include "asterisk/taskpool.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/uuid.h"
 #include "asterisk/sorcery.h"
 
 #define MOD_DATA_CONTACT "contact"
 
+/*! Number of serializers in pool if one not supplied. */
+#define SERIALIZER_POOL_SIZE           8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_serializer_pool *sip_serializer_pool;
+
 static pjsip_endpoint *ast_pjsip_endpoint;
 
-static struct ast_taskpool *sip_taskpool;
+static struct ast_threadpool *sip_threadpool;
 
 /*! Local host address for IPv4 */
 static pj_sockaddr host_ip_ipv4;
@@ -2082,7 +2088,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text)
 
 struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group)
 {
-       return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group);
+       return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
 }
 
 struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
@@ -2093,18 +2099,67 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
 int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
        if (!serializer) {
-               return ast_taskpool_push(sip_taskpool, sip_task, task_data);
+               serializer = ast_serializer_pool_get(sip_serializer_pool);
        }
 
        return ast_taskprocessor_push(serializer, sip_task, task_data);
 }
 
+struct sync_task_data {
+       ast_mutex_t lock;
+       ast_cond_t cond;
+       int complete;
+       int fail;
+       int (*task)(void *);
+       void *task_data;
+};
+
+static int sync_task(void *data)
+{
+       struct sync_task_data *std = data;
+       int ret;
+
+       std->fail = std->task(std->task_data);
+
+       /*
+        * Once we unlock std->lock after signaling, we cannot access
+        * std again.  The thread waiting within ast_sip_push_task_wait()
+        * is free to continue and release its local variable (std).
+        */
+       ast_mutex_lock(&std->lock);
+       std->complete = 1;
+       ast_cond_signal(&std->cond);
+       ret = std->fail;
+       ast_mutex_unlock(&std->lock);
+       return ret;
+}
+
 static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
-       if (!serializer) {
-               return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
+       /* This method is an onion */
+       struct sync_task_data std;
+
+       memset(&std, 0, sizeof(std));
+       ast_mutex_init(&std.lock);
+       ast_cond_init(&std.cond, NULL);
+       std.task = sip_task;
+       std.task_data = task_data;
+
+       if (ast_sip_push_task(serializer, sync_task, &std)) {
+               ast_mutex_destroy(&std.lock);
+               ast_cond_destroy(&std.cond);
+               return -1;
+       }
+
+       ast_mutex_lock(&std.lock);
+       while (!std.complete) {
+               ast_cond_wait(&std.cond, &std.lock);
        }
-       return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
+       ast_mutex_unlock(&std.lock);
+
+       ast_mutex_destroy(&std.lock);
+       ast_cond_destroy(&std.cond);
+       return std.fail;
 }
 
 int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
@@ -2124,10 +2179,23 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
 int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
        if (!serializer) {
-               return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data);
+               /* Caller doesn't care which PJSIP serializer the task executes under. */
+               serializer = ast_serializer_pool_get(sip_serializer_pool);
+               if (!serializer) {
+                       /* No serializer picked to execute the task */
+                       return -1;
+               }
+       }
+       if (ast_taskprocessor_is_task(serializer)) {
+               /*
+                * We are the requested serializer so we must execute
+                * the task now or deadlock waiting on ourself to
+                * execute it.
+                */
+               return sip_task(task_data);
        }
 
-       return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data);
+       return ast_sip_push_task_wait(serializer, sip_task, task_data);
 }
 
 void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
@@ -3386,14 +3454,14 @@ static void remove_request_headers(pjsip_endpoint *endpt)
        }
 }
 
-long ast_sip_taskpool_queue_size(void)
+long ast_sip_threadpool_queue_size(void)
 {
-       return ast_taskpool_queue_size(sip_taskpool);
+       return ast_threadpool_queue_size(sip_threadpool);
 }
 
-struct ast_taskpool *ast_sip_taskpool(void)
+struct ast_threadpool *ast_sip_threadpool(void)
 {
-       return sip_taskpool;
+       return sip_threadpool;
 }
 
 int ast_sip_is_uri_sip_sips(pjsip_uri *uri)
@@ -3733,7 +3801,7 @@ static int unload_pjsip(void *data)
         * These calls need the pjsip endpoint and serializer to clean up.
         * If they're not set, then there's nothing to clean up anyway.
         */
-       if (ast_pjsip_endpoint) {
+       if (ast_pjsip_endpoint && sip_serializer_pool) {
                ast_res_pjsip_cleanup_options_handling();
                ast_res_pjsip_cleanup_message_filter();
                ast_sip_destroy_distributor();
@@ -3853,7 +3921,7 @@ pjsip_media_type pjsip_media_type_text_plain;
 
 static int load_module(void)
 {
-       struct ast_taskpool_options options;
+       struct ast_threadpool_options options;
 
        /* pjproject and config_system need to be initialized before all else */
        if (pj_init() != PJ_SUCCESS) {
@@ -3890,11 +3958,18 @@ static int load_module(void)
                goto error;
        }
 
-       /* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */
-       sip_get_taskpool_options(&options);
+       /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
+       sip_get_threadpool_options(&options);
        options.thread_start = sip_thread_start;
-       sip_taskpool = ast_taskpool_create("pjsip", &options);
-       if (!sip_taskpool) {
+       sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
+       if (!sip_threadpool) {
+               goto error;
+       }
+
+       sip_serializer_pool = ast_serializer_pool_create(
+               "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1);
+       if (!sip_serializer_pool) {
+               ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n");
                goto error;
        }
 
@@ -3974,7 +4049,8 @@ error:
 
        /* These functions all check for NULLs and are safe to call at any time */
        ast_sip_destroy_scheduler();
-       ast_taskpool_shutdown(sip_taskpool);
+       ast_serializer_pool_destroy(sip_serializer_pool);
+       ast_threadpool_shutdown(sip_threadpool);
 
        return AST_MODULE_LOAD_DECLINE;
 }
@@ -4000,11 +4076,12 @@ static int unload_module(void)
        ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
        /* The thread this is called from cannot call PJSIP/PJLIB functions,
-        * so we have to push the work to the taskpool to handle
+        * so we have to push the work to the threadpool to handle
         */
        ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
        ast_sip_destroy_scheduler();
-       ast_taskpool_shutdown(sip_taskpool);
+       ast_serializer_pool_destroy(sip_serializer_pool);
+       ast_threadpool_shutdown(sip_threadpool);
 
        return 0;
 }
index c782c6f28ea7f81ec0c87e1285aad547c4f53bc8..e16738f610532860cc5040c053565220f69d873c 100644 (file)
@@ -24,7 +24,7 @@
 #include "asterisk/res_pjsip.h"
 #include "asterisk/sorcery.h"
 #include "include/res_pjsip_private.h"
-#include "asterisk/taskpool.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/dns.h"
 #include "asterisk/res_pjsip_cli.h"
 
@@ -41,17 +41,15 @@ struct system_config {
        /*! Should we use short forms for headers? */
        unsigned int compactheaders;
        struct {
-               /*! Minimum number of taskprocessors in the taskpool */
-               int minimum_size;
-               /*! Initial number of taskprocessors in the taskpool */
+               /*! Initial number of threads in the threadpool */
                int initial_size;
-               /*! The amount by which the number of taskprocessors is incremented when necessary */
+               /*! The amount by which the number of threads is incremented when necessary */
                int auto_increment;
-               /*! Taskprocessor idle timeout in seconds */
+               /*! Thread idle timeout in seconds */
                int idle_timeout;
-               /*! Maxumum number of taskprocessors in the taskpool */
+               /*! Maxumum number of threads in the threadpool */
                int max_size;
-       } taskpool;
+       } threadpool;
        /*! Nonzero to disable switching from UDP to TCP transport */
        unsigned int disable_tcp_switch;
        /*!
@@ -65,13 +63,13 @@ struct system_config {
        unsigned int disable_rport;
 };
 
-static struct ast_taskpool_options sip_taskpool_options = {
-       .version = AST_TASKPOOL_OPTIONS_VERSION,
+static struct ast_threadpool_options sip_threadpool_options = {
+       .version = AST_THREADPOOL_OPTIONS_VERSION,
 };
 
-void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options)
+void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options)
 {
-       *taskpool_options = sip_taskpool_options;
+       *threadpool_options = sip_threadpool_options;
 }
 
 static struct ast_sorcery *system_sorcery;
@@ -127,11 +125,10 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj)
 #endif
        }
 
-       sip_taskpool_options.minimum_size = system->taskpool.minimum_size;
-       sip_taskpool_options.initial_size = system->taskpool.initial_size;
-       sip_taskpool_options.auto_increment = system->taskpool.auto_increment;
-       sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout;
-       sip_taskpool_options.max_size = system->taskpool.max_size;
+       sip_threadpool_options.initial_size = system->threadpool.initial_size;
+       sip_threadpool_options.auto_increment = system->threadpool.auto_increment;
+       sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout;
+       sip_threadpool_options.max_size = system->threadpool.max_size;
 
        pjsip_cfg()->endpt.disable_tcp_switch =
                system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE;
@@ -202,24 +199,14 @@ int ast_sip_initialize_system(void)
                        OPT_UINT_T, 0, FLDSET(struct system_config, timerb));
        ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no",
                        OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders));
-       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size));
-       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
-       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size));
-       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
-       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment));
-       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
+       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size));
+       ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5",
+                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment));
        ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout));
-       ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
+                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout));
        ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50",
-                       OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size));
+                       OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size));
        ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes",
                        OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch));
        ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes",
index 11582ad1c4536c31c3cad701b9d546a9b908e16e..dcb821cedcd6a30c5eca30a5719c722be7248a7c 100644 (file)
@@ -18,7 +18,7 @@
 #include "asterisk/compat.h"
 
 struct ao2_container;
-struct ast_taskpool_options;
+struct ast_threadpool_options;
 struct ast_sip_cli_context;
 
 /*!
@@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void);
  * \brief Initialize the distributor module
  *
  * The distributor module is responsible for taking an incoming
- * SIP message and placing it into the taskpool. Once in the taskpool,
+ * SIP message and placing it into the threadpool. Once in the threadpool,
  * the distributor will perform endpoint lookups and authentication, and
  * then distribute the message up the stack to any further modules.
  *
@@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void);
 
 /*!
  * \internal
- * \brief Get taskpool options
+ * \brief Get threadpool options
  */
-void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options);
+void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options);
 
 /*!
  * \internal
index 8019426da792de12d867068605a0bbe39eb4ea78..cef061b84cc10a11468cd2c4d77c0135ae92ca22 100644 (file)
                                        </since>
                                        <synopsis>Use the short forms of common SIP header names.</synopsis>
                                </configOption>
-                               <configOption name="taskpool_minimum_size" default="4">
-                                       <since>
-                                               <version>23.1.0</version>
-                                               <version>22.7.0</version>
-                                               <version>20.17.0</version>
-                                       </since>
-                                       <synopsis>Minimum number of taskprocessors in the res_pjsip taskpool.</synopsis>
-                               </configOption>
-                               <configOption name="taskpool_initial_size" default="4">
-                                       <since>
-                                               <version>23.1.0</version>
-                                               <version>22.7.0</version>
-                                               <version>20.17.0</version>
-                                       </since>
-                                       <synopsis>Initial number of taskprocessors in the res_pjsip taskpool.</synopsis>
-                               </configOption>
-                               <configOption name="taskpool_auto_increment" default="1">
-                                       <since>
-                                               <version>23.1.0</version>
-                                               <version>22.7.0</version>
-                                               <version>20.17.0</version>
-                                       </since>
-                                       <synopsis>The amount by which the number of taskprocessors is incremented when necessary.</synopsis>
-                               </configOption>
-                               <configOption name="taskpool_idle_timeout" default="60">
-                                       <since>
-                                               <version>23.1.0</version>
-                                               <version>22.7.0</version>
-                                               <version>20.17.0</version>
-                                       </since>
-                                       <synopsis>Number of seconds before an idle taskprocessor should be disposed of.</synopsis>
-                               </configOption>
-                               <configOption name="taskpool_max_size" default="50">
-                                       <since>
-                                               <version>23.1.0</version>
-                                               <version>22.7.0</version>
-                                               <version>20.17.0</version>
-                                       </since>
-                                       <synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
-                                       A value of 0 indicates no maximum.</synopsis>
-                               </configOption>
-                               <configOption name="threadpool_initial_size" default="4">
+                               <configOption name="threadpool_initial_size" default="0">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>Initial number of threads in the res_pjsip taskpool.
-                                               Deprecated in favor of taskpool_initiali_size.</synopsis>
+                                       <synopsis>Initial number of threads in the res_pjsip threadpool.</synopsis>
                                </configOption>
-                               <configOption name="threadpool_auto_increment" default="1">
+                               <configOption name="threadpool_auto_increment" default="5">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>The amount by which the number of threads is incremented when necessary.
-                                               Deprecated in favor of taskpool_auto_increment.</synopsis>
+                                       <synopsis>The amount by which the number of threads is incremented when necessary.</synopsis>
                                </configOption>
                                <configOption name="threadpool_idle_timeout" default="60">
                                        <since>
                                                <version>12.0.0</version>
                                        </since>
-                                       <synopsis>Number of seconds before an idle taskprocessor should be disposed of.
-                                               Deprecated in favor of taskpool_idle_timeout.</synopsis>
+                                       <synopsis>Number of seconds before an idle thread should be disposed of.</synopsis>
                                </configOption>
-                               <configOption name="threadpool_max_size" default="50">
+                               <configOption name="threadpool_max_size" default="0">
                                        <since>
                                                <version>13.7.0</version>
                                        </since>
-                                       <synopsis>Maximum number of taskprocessors in the res_pjsip taskpool.
-                                       A value of 0 indicates no maximum.
-                                       Deprecated in favor of taskpool_max_size.</synopsis>
+                                       <synopsis>Maximum number of threads in the res_pjsip threadpool.
+                                       A value of 0 indicates no maximum.</synopsis>
                                </configOption>
                                <configOption name="disable_tcp_switch" default="yes">
                                        <since>
index adf18d3b8b6b2ec2f78f2eb27755644c8d163a34..c96061110192e8a3b77a1875cae73f983536440f 100644 (file)
@@ -24,7 +24,7 @@
 #include "asterisk/acl.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/taskpool.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/res_pjsip_cli.h"
 
 static int distribute(void *data);
@@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata)
 {
        struct ast_taskprocessor *serializer;
 
-       serializer = ast_taskpool_serializer_get_current();
+       serializer = ast_threadpool_serializer_get_current();
        if (serializer) {
                const char *name;
 
index 1d75c903ed007d29386fcde0b299c87c42809687..93354b930949ee68a2be0ca1f97b41e5966d4c68 100644 (file)
@@ -33,7 +33,7 @@
 #include "asterisk/statsd.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/serializer_shutdown_group.h"
+#include "asterisk/threadpool.h"
 
 /*
  * This implementation for OPTIONS support is based around the idea
index f5f4a1cfc5a1f11a8f092dd364f343bbf8c718b5..2babfe40de1bec2fafaab5131950fadc105cc049 100644 (file)
@@ -31,7 +31,7 @@
 #include "asterisk/res_pjsip.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/taskpool.h"
+#include "asterisk/threadpool.h"
 
 #ifdef HAVE_PJSIP_EXTERNAL_RESOLVER
 
@@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip
                return;
        }
 
-       resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current());
+       resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current());
 
        ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host);
        ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve);
index 22c7a1f345167fc5709c78a31170cfef217c6e89..072ddce5cca180ebe0dfc83f6dcd9aca644fd646 100644 (file)
@@ -1573,8 +1573,8 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       mwi_serializer_pool = ast_serializer_taskpool_create("pjsip/mwi",
-               MWI_SERIALIZER_POOL_SIZE, ast_sip_taskpool(), MAX_UNLOAD_TIMEOUT_TIME);
+       mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi",
+               MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME);
        if (!mwi_serializer_pool) {
                ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
        }
index 538df56125e1dc1d50be11dfc9c9c2108c97c15e..4b47f2b33f877937790990acb7cf5a6bc759fd6b 100644 (file)
@@ -33,8 +33,7 @@
 #include "asterisk/res_pjsip_outbound_publish.h"
 #include "asterisk/module.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/taskpool.h"
-#include "asterisk/serializer_shutdown_group.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/datastore.h"
 #include "res_pjsip/include/res_pjsip_private.h"
 
index 2746eeaea98cf86dac799ae4b36ca1106b569567..75744affa96e0e62aba052543b6e76538903f97e 100644 (file)
@@ -35,8 +35,7 @@
 #include "asterisk/cli.h"
 #include "asterisk/stasis_system.h"
 #include "asterisk/threadstorage.h"
-#include "asterisk/taskpool.h"
-#include "asterisk/serializer_shutdown_group.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/statsd.h"
 #include "res_pjsip/include/res_pjsip_private.h"
 #include "asterisk/vector.h"
@@ -1670,7 +1669,7 @@ static void sip_outbound_registration_response_cb(struct pjsip_regc_cbparam *par
         * pjproject callback thread.
         */
        if (ast_sip_push_task(client_state->serializer, handle_registration_response, response)) {
-               ast_log(LOG_WARNING, "Failed to pass incoming registration response to taskpool\n");
+               ast_log(LOG_WARNING, "Failed to pass incoming registration response to threadpool\n");
                ao2_cleanup(response);
        }
 }
@@ -1691,7 +1690,7 @@ static void sip_outbound_registration_state_destroy(void *obj)
                ao2_ref(state->client_state, -1);
        } else if (ast_sip_push_task(state->client_state->serializer,
                handle_client_state_destruction, state->client_state)) {
-               ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to taskpool\n");
+               ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to threadpool\n");
                ao2_ref(state->client_state, -1);
        }
 }
index 67d7836ce732f8aa670d41c48d7a2d584c8c881f..6616b1c0c9123c228aa9c1bf607389470be3e2f6 100644 (file)
@@ -343,7 +343,7 @@ static struct ast_frame *refer_progress_framehook(struct ast_channel *chan, stru
                }
        }
 
-       /* If a notification is due to be sent push it to the taskpool */
+       /* If a notification is due to be sent push it to the thread pool */
        if (notification) {
                /* If the subscription is being terminated we don't need the frame hook any longer */
                if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
@@ -395,7 +395,7 @@ static struct ast_frame *refer_ari_progress_framehook(struct ast_channel *chan,
                progress->ari_state->last_response = *message;
        }
 
-       /* If a notification is due to be sent push it to the taskpool */
+       /* If a notification is due to be sent push it to the thread pool */
        if (notification) {
                /* If the subscription is being terminated we don't need the frame hook any longer */
                if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) {
index a4992225fe6783b7c15707b6cb3fb5cecf0adfdc..e4e4e1b12fc0363c8c4e02852df61ccf43b3a9f1 100644 (file)
@@ -32,7 +32,7 @@
 #include "asterisk/res_pjsip_session.h"
 #include "asterisk/module.h"
 #include "asterisk/causes.h"
-#include "asterisk/taskpool.h"
+#include "asterisk/threadpool.h"
 
 static void rfc3326_use_reason_header(struct ast_sip_session *session, struct pjsip_rx_data *rdata)
 {
@@ -139,7 +139,7 @@ static void rfc3326_outgoing_request(struct ast_sip_session *session, struct pjs
                 * checks so we must also be running under the call's serializer
                 * thread.
                 */
-               || session->serializer != ast_taskpool_serializer_get_current()) {
+               || session->serializer != ast_threadpool_serializer_get_current()) {
                return;
        }
 
@@ -152,7 +152,7 @@ static void rfc3326_outgoing_response(struct ast_sip_session *session, struct pj
 
        if (status.code < 300
                || !session->channel
-               || session->serializer != ast_taskpool_serializer_get_current()) {
+               || session->serializer != ast_threadpool_serializer_get_current()) {
                return;
        }
 
index d57392d4f89ad66c2ff33a44dba7dcaae0700505..f6faff2afe67e782ee4dbb4e119b8a8c33615423 100644 (file)
@@ -32,6 +32,7 @@
 #include "asterisk/res_pjsip_session.h"
 #include "asterisk/module.h"
 #include "asterisk/causes.h"
+#include "asterisk/threadpool.h"
 
 /*! \brief Private data structure used with the modules's datastore */
 struct rfc3329_store_data {
index 135aa1927950687851f70d53474ff1cf1382c52b..bca226c85753f03095eac00365b3cda5e0f6b70f 100644 (file)
@@ -42,7 +42,6 @@
 #include "asterisk/uuid.h"
 #include "asterisk/pbx.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/taskpool.h"
 #include "asterisk/causes.h"
 #include "asterisk/sdp_srtp.h"
 #include "asterisk/dsp.h"
@@ -3128,14 +3127,115 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint,
        return ret_session;
 }
 
+/*! \brief struct controlling the suspension of the session's serializer. */
+struct ast_sip_session_suspender {
+       ast_cond_t cond_suspended;
+       ast_cond_t cond_complete;
+       int suspended;
+       int complete;
+};
+
+static void sip_session_suspender_dtor(void *vdoomed)
+{
+       struct ast_sip_session_suspender *doomed = vdoomed;
+
+       ast_cond_destroy(&doomed->cond_suspended);
+       ast_cond_destroy(&doomed->cond_complete);
+}
+
+/*!
+ * \internal
+ * \brief Block the session serializer thread task.
+ *
+ * \param data Pushed serializer task data for suspension.
+ *
+ * \retval 0
+ */
+static int sip_session_suspend_task(void *data)
+{
+       struct ast_sip_session_suspender *suspender = data;
+
+       ao2_lock(suspender);
+
+       /* Signal that the serializer task is now suspended. */
+       suspender->suspended = 1;
+       ast_cond_signal(&suspender->cond_suspended);
+
+       /* Wait for the serializer suspension to be completed. */
+       while (!suspender->complete) {
+               ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender));
+       }
+
+       ao2_unlock(suspender);
+       ao2_ref(suspender, -1);
+
+       return 0;
+}
+
 void ast_sip_session_suspend(struct ast_sip_session *session)
 {
-       ast_taskpool_serializer_suspend(session->serializer);
+       struct ast_sip_session_suspender *suspender;
+       int res;
+
+       ast_assert(session->suspended == NULL);
+
+       if (ast_taskprocessor_is_task(session->serializer)) {
+               /* I am the session's serializer thread so I cannot suspend. */
+               return;
+       }
+
+       if (ast_taskprocessor_is_suspended(session->serializer)) {
+               /* The serializer already suspended. */
+               return;
+       }
+
+       suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor);
+       if (!suspender) {
+               /* We will just have to hope that the system does not deadlock */
+               return;
+       }
+       ast_cond_init(&suspender->cond_suspended, NULL);
+       ast_cond_init(&suspender->cond_complete, NULL);
+
+       ao2_ref(suspender, +1);
+       res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender);
+       if (res) {
+               /* We will just have to hope that the system does not deadlock */
+               ao2_ref(suspender, -2);
+               return;
+       }
+
+       session->suspended = suspender;
+
+       /* Wait for the serializer to get suspended. */
+       ao2_lock(suspender);
+       while (!suspender->suspended) {
+               ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender));
+       }
+       ao2_unlock(suspender);
+
+       ast_taskprocessor_suspend(session->serializer);
 }
 
 void ast_sip_session_unsuspend(struct ast_sip_session *session)
 {
-       ast_taskpool_serializer_unsuspend(session->serializer);
+       struct ast_sip_session_suspender *suspender = session->suspended;
+
+       if (!suspender) {
+               /* Nothing to do */
+               return;
+       }
+       session->suspended = NULL;
+
+       /* Signal that the serializer task suspension is now complete. */
+       ao2_lock(suspender);
+       suspender->complete = 1;
+       ast_cond_signal(&suspender->cond_complete);
+       ao2_unlock(suspender);
+
+       ao2_ref(suspender, -1);
+
+       ast_taskprocessor_unsuspend(session->serializer);
 }
 
 /*!