From: Joshua C. Colp Date: Mon, 27 Oct 2025 13:06:49 +0000 (-0300) Subject: Revert "pjsip: Move from threadpool to taskpool" X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=74b1cafece998859f53196736c09f7b703b6f369;p=thirdparty%2Fasterisk.git Revert "pjsip: Move from threadpool to taskpool" This reverts commit bb6b76c2d8239b2665223dcbf6d507aa9aa4534e. --- diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index f44c51edae..e9c037cc22 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -720,7 +720,7 @@ static int answer(void *data) ast_channel_name(session->channel), err); /* * Return this value so we can distinguish between this - * failure and the taskpool synchronous push failing. + * failure and the threadpool synchronous push failing. */ SCOPE_EXIT_RTN_VALUE(-2, "pjproject failure\n"); } @@ -753,7 +753,7 @@ static int chan_pjsip_answer(struct ast_channel *ast) res = ast_sip_push_task_wait_serializer(session->serializer, answer, &ans_data); if (res) { if (res == -1) { - ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the taskpool.\n", + ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n", ast_channel_name(session->channel)); } ao2_ref(session, -1); @@ -2601,7 +2601,7 @@ static int chan_pjsip_hangup(struct ast_channel *ast) } if (ast_sip_push_task(channel->session->serializer, hangup, h_data)) { - ast_log(LOG_WARNING, "Unable to push hangup task to the taskpool. Expect bad things\n"); + ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n"); goto failure; } diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index f3619909b6..c399ba8a24 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -1287,16 +1287,14 @@ ;timer_b=32000 ; Set transaction timer B value milliseconds (default: "32000") ;compact_headers=no ; Use the short forms of common SIP header names ; (default: "no") -;taskpool_minimum_size=4 ; Minimum number of taskprocessors in the res_pjsip - ; taskpool (default: "4") -;taskpool_initial_size=4 ; Initial number of taskprocessors in the res_pjsip - ; taskpool (default: "4") -;taskpool_auto_increment=1 ; The amount by which the number of taskprocessors is - ; incremented when necessary (default: "1") -;taskpool_idle_timeout=60 ; Number of seconds before an idle taskprocessor - ; should be disposed of (default: "60") -;taskpool_max_size=50 ; Maximum number of taskprocessors in the res_pjsip taskpool - ; A value of 0 indicates no maximum (default: "50") +;threadpool_initial_size=0 ; Initial number of threads in the res_pjsip + ; threadpool (default: "0") +;threadpool_auto_increment=5 ; The amount by which the number of threads is + ; incremented when necessary (default: "5") +;threadpool_idle_timeout=60 ; Number of seconds before an idle thread + ; should be disposed of (default: "60") +;threadpool_max_size=0 ; Maximum number of threads in the res_pjsip threadpool + ; A value of 0 indicates no maximum (default: "0") ;disable_tcp_switch=yes ; Disable automatic switching from UDP to TCP transports ; if outgoing request is too large. ; See RFC 3261 section 18.1.1. diff --git a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py b/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py deleted file mode 100644 index b07c966bd2..0000000000 --- a/contrib/ast-db-manage/config/versions/dc7c357dc178_add_taskpool_options_to_system.py +++ /dev/null @@ -1,29 +0,0 @@ -"""add taskpool options to system - -Revision ID: dc7c357dc178 -Revises: abdc9ede147d -Create Date: 2025-09-24 09:45:17.609185 - -""" - -# revision identifiers, used by Alembic. -revision = 'dc7c357dc178' -down_revision = 'abdc9ede147d' - -from alembic import op -import sqlalchemy as sa - - -def upgrade(): - op.add_column('ps_systems', sa.Column('taskpool_minimum_size', sa.Integer)) - op.add_column('ps_systems', sa.Column('taskpool_initial_size', sa.Integer)) - op.add_column('ps_systems', sa.Column('taskpool_auto_increment', sa.Integer)) - op.add_column('ps_systems', sa.Column('taskpool_idle_timeout', sa.Integer)) - op.add_column('ps_systems', sa.Column('taskpool_max_size', sa.Integer)) - -def downgrade(): - op.drop_column('ps_systems', 'taskpool_minimum_size') - op.drop_column('ps_systems', 'taskpool_initial_size') - op.drop_column('ps_systems', 'taskpool_auto_increment') - op.drop_column('ps_systems', 'taskpool_idle_timeout') - op.drop_column('ps_systems', 'taskpool_max_size') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index c49e9dfac7..2fafd5790a 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1921,7 +1921,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * There are three major types of threads that SIP will have to deal with: * \li Asterisk threads * \li PJSIP threads - * \li SIP taskpool threads (a.k.a. "servants") + * \li SIP threadpool threads (a.k.a. "servants") * * \par Asterisk Threads * @@ -1963,7 +1963,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * is NULL, then the work will be handed off to whatever servant can currently handle * the task. If this pointer is non-NULL, then the task will not be executed until * previous tasks pushed with the same serializer have completed. For more information - * on serializers and the benefits they provide, see \ref ast_taskpool_serializer + * on serializers and the benefits they provide, see \ref ast_threadpool_serializer * * \par Scheduler * @@ -1992,7 +1992,7 @@ typedef int (*ast_sip_task)(void *user_data); * \brief Create a new serializer for SIP tasks * \since 13.8.0 * - * See \ref ast_taskpool_serializer for more information on serializers. + * See \ref ast_threadpool_serializer for more information on serializers. * SIP creates serializers so that tasks operating on similar data will run * in sequence. * @@ -2009,7 +2009,7 @@ struct ast_serializer_shutdown_group; * \brief Create a new serializer for SIP tasks * \since 13.8.0 * - * See \ref ast_taskpool_serializer for more information on serializers. + * See \ref ast_threadpool_serializer for more information on serializers. * SIP creates serializers so that tasks operating on similar data will run * in sequence. * @@ -2251,7 +2251,7 @@ enum ast_sip_scheduler_task_flags { struct ast_sip_sched_task; /*! - * \brief Schedule a task to run in the res_pjsip taskpool + * \brief Schedule a task to run in the res_pjsip thread pool * \since 13.9.0 * * \param serializer The serializer to use. If NULL, don't use a serializer (see note below) @@ -2266,7 +2266,7 @@ struct ast_sip_sched_task; * \par Serialization * * Specifying a serializer guarantees serialized execution but NOT specifying a serializer - * may still result in tasks being effectively serialized if the taskpool is busy. + * may still result in tasks being effectively serialized if the thread pool is busy. * The point of the serializer BTW is not to prevent parallel executions of the SAME task. * That happens automatically (see below). It's to prevent the task from running at the same * time as other work using the same serializer, whether or not it's being run by the scheduler. @@ -3662,15 +3662,15 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); const char *ast_sip_get_host_ip_string(int af); /*! - * \brief Return the size of the SIP taskpool's task queue + * \brief Return the size of the SIP threadpool's task queue * \since 13.7.0 */ -long ast_sip_taskpool_queue_size(void); +long ast_sip_threadpool_queue_size(void); /*! - * \brief Retrieve the SIP taskpool object + * \brief Retrieve the SIP threadpool object */ -struct ast_taskpool *ast_sip_taskpool(void); +struct ast_threadpool *ast_sip_threadpool(void); /*! * \brief Retrieve transport state diff --git a/include/asterisk/res_pjsip_session.h b/include/asterisk/res_pjsip_session.h index 7279d0d448..75ba60e523 100644 --- a/include/asterisk/res_pjsip_session.h +++ b/include/asterisk/res_pjsip_session.h @@ -195,7 +195,7 @@ struct ast_sip_session { struct ao2_container *datastores; /*! Serializer for tasks relating to this SIP session */ struct ast_taskprocessor *serializer; - /*! \deprecated Non-null if the session serializer is suspended or being suspended. */ + /*! Non-null if the session serializer is suspended or being suspended. */ struct ast_sip_session_suspender *suspended; /*! Requests that could not be sent due to current inv_session state */ AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests; diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h index bf1c1901eb..2a4f963052 100644 --- a/include/asterisk/taskpool.h +++ b/include/asterisk/taskpool.h @@ -318,24 +318,4 @@ struct ast_taskprocessor *ast_taskpool_serializer_group(const char *name, */ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data); -/*! - * \brief Suspend a serializer, causing tasks to be queued until unsuspended - * \since 23.1.0 - * \since 22.7.0 - * \since 20.17.0 - * - * \param serializer The serializer to suspend - */ -void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer); - -/*! - * \brief Unsuspend a serializer, causing tasks to be executed - * \since 23.1.0 - * \since 22.7.0 - * \since 20.17.0 - * - * \param serializer The serializer to unsuspend - */ -void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer); - #endif /* ASTERISK_TASKPOOL_H */ diff --git a/main/taskpool.c b/main/taskpool.c index 987ad3776f..59ac4b0c72 100644 --- a/main/taskpool.c +++ b/main/taskpool.c @@ -676,8 +676,6 @@ struct serializer { struct ast_taskpool *pool; /*! Which group will wait for this serializer to shutdown. */ struct ast_serializer_shutdown_group *shutdown_group; - /*! Whether the serializer is suspended or not. */ - unsigned int suspended:1; }; static void serializer_dtor(void *obj) @@ -729,15 +727,6 @@ static int execute_tasks(void *data) ast_threadstorage_set_ptr(¤t_taskpool_serializer, tps); for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) { requeue = ast_taskprocessor_execute(tps); - - /* If the serializer is suspended we will not execute any more tasks and - * we will not requeue the taskpool task. Instead it will be requeued when - * the serializer is unsuspended. - */ - if (ser->suspended) { - requeue = 0; - break; - } } ast_threadstorage_set_ptr(¤t_taskpool_serializer, NULL); @@ -927,72 +916,6 @@ int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int return sync_task.fail; } -/*! - * \internal A task that suspends the serializer after queuing an empty task - */ -static int taskpool_serializer_suspend_task(void *data) -{ - struct ast_taskprocessor *serializer = data; - struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); - struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); - - /* If already suspended this is a no-op */ - if (ser->suspended) { - return 0; - } - - /* First we queue the empty task to ensure the serializer doesn't reach empty, this - * prevents any threads from queueing up a taskpool task that executes the serializer - * while it is suspended, allowing us to queue it ourselves when the serializer is - * unsuspended. - */ - if (ast_taskprocessor_push(serializer, taskpool_serializer_empty_task, NULL)) { - return 0; - } - - /* Next we suspend the serializer so that the execute_tasks currently executing stops - * and doesn't requeue. - */ - ser->suspended = 1; - - return 0; - } - -void ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer) -{ - if (ast_taskprocessor_is_task(serializer)) { - /* I am the session's serializer thread so I cannot suspend. */ - return; - } - - /* Once this returns there is no thread executing the tasks on the serializer, so they - * will accumulate until the serializer is unsuspended. - */ - ast_taskpool_serializer_push_wait(serializer, taskpool_serializer_suspend_task, serializer); -} - -void ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer) -{ - struct ast_taskprocessor_listener *listener = ast_taskprocessor_listener(serializer); - struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); - - ao2_lock(ser); - - if (!ser->suspended) { - ao2_unlock(ser); - return; - } - - ser->suspended = 0; - - ao2_unlock(ser); - - /* And now we kick off handling of the queued tasks once again */ - if (ast_taskpool_push(ser->pool, execute_tasks, ao2_bump(serializer))) { - ast_taskprocessor_unreference(serializer); - } -} - /*! * \internal * \brief Clean up resources on Asterisk shutdown diff --git a/res/res_pjsip.c b/res/res_pjsip.c index e3e1c59852..5b954b2226 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -37,7 +37,7 @@ #include "asterisk/astobj2.h" #include "asterisk/module.h" #include "asterisk/serializer.h" -#include "asterisk/taskpool.h" +#include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/uuid.h" #include "asterisk/sorcery.h" @@ -65,9 +65,15 @@ #define MOD_DATA_CONTACT "contact" +/*! Number of serializers in pool if one not supplied. */ +#define SERIALIZER_POOL_SIZE 8 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_serializer_pool *sip_serializer_pool; + static pjsip_endpoint *ast_pjsip_endpoint; -static struct ast_taskpool *sip_taskpool; +static struct ast_threadpool *sip_threadpool; /*! Local host address for IPv4 */ static pj_sockaddr host_ip_ipv4; @@ -2082,7 +2088,7 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text) struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group) { - return ast_taskpool_serializer_group(name, sip_taskpool, shutdown_group); + return ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group); } struct ast_taskprocessor *ast_sip_create_serializer(const char *name) @@ -2093,18 +2099,67 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - return ast_taskpool_push(sip_taskpool, sip_task, task_data); + serializer = ast_serializer_pool_get(sip_serializer_pool); } return ast_taskprocessor_push(serializer, sip_task, task_data); } +struct sync_task_data { + ast_mutex_t lock; + ast_cond_t cond; + int complete; + int fail; + int (*task)(void *); + void *task_data; +}; + +static int sync_task(void *data) +{ + struct sync_task_data *std = data; + int ret; + + std->fail = std->task(std->task_data); + + /* + * Once we unlock std->lock after signaling, we cannot access + * std again. The thread waiting within ast_sip_push_task_wait() + * is free to continue and release its local variable (std). + */ + ast_mutex_lock(&std->lock); + std->complete = 1; + ast_cond_signal(&std->cond); + ret = std->fail; + ast_mutex_unlock(&std->lock); + return ret; +} + static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { - if (!serializer) { - return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); + /* This method is an onion */ + struct sync_task_data std; + + memset(&std, 0, sizeof(std)); + ast_mutex_init(&std.lock); + ast_cond_init(&std.cond, NULL); + std.task = sip_task; + std.task_data = task_data; + + if (ast_sip_push_task(serializer, sync_task, &std)) { + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); + return -1; + } + + ast_mutex_lock(&std.lock); + while (!std.complete) { + ast_cond_wait(&std.cond, &std.lock); } - return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); + ast_mutex_unlock(&std.lock); + + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); + return std.fail; } int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) @@ -2124,10 +2179,23 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - return ast_taskpool_push_wait(sip_taskpool, sip_task, task_data); + /* Caller doesn't care which PJSIP serializer the task executes under. */ + serializer = ast_serializer_pool_get(sip_serializer_pool); + if (!serializer) { + /* No serializer picked to execute the task */ + return -1; + } + } + if (ast_taskprocessor_is_task(serializer)) { + /* + * We are the requested serializer so we must execute + * the task now or deadlock waiting on ourself to + * execute it. + */ + return sip_task(task_data); } - return ast_taskpool_serializer_push_wait(serializer, sip_task, task_data); + return ast_sip_push_task_wait(serializer, sip_task, task_data); } void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) @@ -3386,14 +3454,14 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } -long ast_sip_taskpool_queue_size(void) +long ast_sip_threadpool_queue_size(void) { - return ast_taskpool_queue_size(sip_taskpool); + return ast_threadpool_queue_size(sip_threadpool); } -struct ast_taskpool *ast_sip_taskpool(void) +struct ast_threadpool *ast_sip_threadpool(void) { - return sip_taskpool; + return sip_threadpool; } int ast_sip_is_uri_sip_sips(pjsip_uri *uri) @@ -3733,7 +3801,7 @@ static int unload_pjsip(void *data) * These calls need the pjsip endpoint and serializer to clean up. * If they're not set, then there's nothing to clean up anyway. */ - if (ast_pjsip_endpoint) { + if (ast_pjsip_endpoint && sip_serializer_pool) { ast_res_pjsip_cleanup_options_handling(); ast_res_pjsip_cleanup_message_filter(); ast_sip_destroy_distributor(); @@ -3853,7 +3921,7 @@ pjsip_media_type pjsip_media_type_text_plain; static int load_module(void) { - struct ast_taskpool_options options; + struct ast_threadpool_options options; /* pjproject and config_system need to be initialized before all else */ if (pj_init() != PJ_SUCCESS) { @@ -3890,11 +3958,18 @@ static int load_module(void) goto error; } - /* The serializer needs taskpool and taskpool needs pjproject to be initialized so it's next */ - sip_get_taskpool_options(&options); + /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */ + sip_get_threadpool_options(&options); options.thread_start = sip_thread_start; - sip_taskpool = ast_taskpool_create("pjsip", &options); - if (!sip_taskpool) { + sip_threadpool = ast_threadpool_create("pjsip", NULL, &options); + if (!sip_threadpool) { + goto error; + } + + sip_serializer_pool = ast_serializer_pool_create( + "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); + if (!sip_serializer_pool) { + ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); goto error; } @@ -3974,7 +4049,8 @@ error: /* These functions all check for NULLs and are safe to call at any time */ ast_sip_destroy_scheduler(); - ast_taskpool_shutdown(sip_taskpool); + ast_serializer_pool_destroy(sip_serializer_pool); + ast_threadpool_shutdown(sip_threadpool); return AST_MODULE_LOAD_DECLINE; } @@ -4000,11 +4076,12 @@ static int unload_module(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); /* The thread this is called from cannot call PJSIP/PJLIB functions, - * so we have to push the work to the taskpool to handle + * so we have to push the work to the threadpool to handle */ ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); - ast_taskpool_shutdown(sip_taskpool); + ast_serializer_pool_destroy(sip_serializer_pool); + ast_threadpool_shutdown(sip_threadpool); return 0; } diff --git a/res/res_pjsip/config_system.c b/res/res_pjsip/config_system.c index c782c6f28e..e16738f610 100644 --- a/res/res_pjsip/config_system.c +++ b/res/res_pjsip/config_system.c @@ -24,7 +24,7 @@ #include "asterisk/res_pjsip.h" #include "asterisk/sorcery.h" #include "include/res_pjsip_private.h" -#include "asterisk/taskpool.h" +#include "asterisk/threadpool.h" #include "asterisk/dns.h" #include "asterisk/res_pjsip_cli.h" @@ -41,17 +41,15 @@ struct system_config { /*! Should we use short forms for headers? */ unsigned int compactheaders; struct { - /*! Minimum number of taskprocessors in the taskpool */ - int minimum_size; - /*! Initial number of taskprocessors in the taskpool */ + /*! Initial number of threads in the threadpool */ int initial_size; - /*! The amount by which the number of taskprocessors is incremented when necessary */ + /*! The amount by which the number of threads is incremented when necessary */ int auto_increment; - /*! Taskprocessor idle timeout in seconds */ + /*! Thread idle timeout in seconds */ int idle_timeout; - /*! Maxumum number of taskprocessors in the taskpool */ + /*! Maxumum number of threads in the threadpool */ int max_size; - } taskpool; + } threadpool; /*! Nonzero to disable switching from UDP to TCP transport */ unsigned int disable_tcp_switch; /*! @@ -65,13 +63,13 @@ struct system_config { unsigned int disable_rport; }; -static struct ast_taskpool_options sip_taskpool_options = { - .version = AST_TASKPOOL_OPTIONS_VERSION, +static struct ast_threadpool_options sip_threadpool_options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, }; -void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options) +void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options) { - *taskpool_options = sip_taskpool_options; + *threadpool_options = sip_threadpool_options; } static struct ast_sorcery *system_sorcery; @@ -127,11 +125,10 @@ static int system_apply(const struct ast_sorcery *sorcery, void *obj) #endif } - sip_taskpool_options.minimum_size = system->taskpool.minimum_size; - sip_taskpool_options.initial_size = system->taskpool.initial_size; - sip_taskpool_options.auto_increment = system->taskpool.auto_increment; - sip_taskpool_options.idle_timeout = system->taskpool.idle_timeout; - sip_taskpool_options.max_size = system->taskpool.max_size; + sip_threadpool_options.initial_size = system->threadpool.initial_size; + sip_threadpool_options.auto_increment = system->threadpool.auto_increment; + sip_threadpool_options.idle_timeout = system->threadpool.idle_timeout; + sip_threadpool_options.max_size = system->threadpool.max_size; pjsip_cfg()->endpt.disable_tcp_switch = system->disable_tcp_switch ? PJ_TRUE : PJ_FALSE; @@ -202,24 +199,14 @@ int ast_sip_initialize_system(void) OPT_UINT_T, 0, FLDSET(struct system_config, timerb)); ast_sorcery_object_field_register(system_sorcery, "system", "compact_headers", "no", OPT_BOOL_T, 1, FLDSET(struct system_config, compactheaders)); - ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_minimum_size", "4", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.minimum_size)); - ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_initial_size", "4", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size)); - ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "4", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.initial_size)); - ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_auto_increment", "1", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment)); - ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "1", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.auto_increment)); - ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_idle_timeout", "60", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout)); + ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_initial_size", "0", + OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.initial_size)); + ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_auto_increment", "5", + OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.auto_increment)); ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_idle_timeout", "60", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.idle_timeout)); - ast_sorcery_object_field_register(system_sorcery, "system", "taskpool_max_size", "50", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size)); + OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.idle_timeout)); ast_sorcery_object_field_register(system_sorcery, "system", "threadpool_max_size", "50", - OPT_UINT_T, 0, FLDSET(struct system_config, taskpool.max_size)); + OPT_UINT_T, 0, FLDSET(struct system_config, threadpool.max_size)); ast_sorcery_object_field_register(system_sorcery, "system", "disable_tcp_switch", "yes", OPT_BOOL_T, 1, FLDSET(struct system_config, disable_tcp_switch)); ast_sorcery_object_field_register(system_sorcery, "system", "follow_early_media_fork", "yes", diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 11582ad1c4..dcb821cedc 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -18,7 +18,7 @@ #include "asterisk/compat.h" struct ao2_container; -struct ast_taskpool_options; +struct ast_threadpool_options; struct ast_sip_cli_context; /*! @@ -116,7 +116,7 @@ int ast_sip_destroy_sorcery_auth(void); * \brief Initialize the distributor module * * The distributor module is responsible for taking an incoming - * SIP message and placing it into the taskpool. Once in the taskpool, + * SIP message and placing it into the threadpool. Once in the threadpool, * the distributor will perform endpoint lookups and authentication, and * then distribute the message up the stack to any further modules. * @@ -278,9 +278,9 @@ void ast_res_pjsip_cleanup_message_filter(void); /*! * \internal - * \brief Get taskpool options + * \brief Get threadpool options */ -void sip_get_taskpool_options(struct ast_taskpool_options *taskpool_options); +void sip_get_threadpool_options(struct ast_threadpool_options *threadpool_options); /*! * \internal diff --git a/res/res_pjsip/pjsip_config.xml b/res/res_pjsip/pjsip_config.xml index 8019426da7..cef061b84c 100644 --- a/res/res_pjsip/pjsip_config.xml +++ b/res/res_pjsip/pjsip_config.xml @@ -3009,75 +3009,30 @@ Use the short forms of common SIP header names. - - - 23.1.0 - 22.7.0 - 20.17.0 - - Minimum number of taskprocessors in the res_pjsip taskpool. - - - - 23.1.0 - 22.7.0 - 20.17.0 - - Initial number of taskprocessors in the res_pjsip taskpool. - - - - 23.1.0 - 22.7.0 - 20.17.0 - - The amount by which the number of taskprocessors is incremented when necessary. - - - - 23.1.0 - 22.7.0 - 20.17.0 - - Number of seconds before an idle taskprocessor should be disposed of. - - - - 23.1.0 - 22.7.0 - 20.17.0 - - Maximum number of taskprocessors in the res_pjsip taskpool. - A value of 0 indicates no maximum. - - + 12.0.0 - Initial number of threads in the res_pjsip taskpool. - Deprecated in favor of taskpool_initiali_size. + Initial number of threads in the res_pjsip threadpool. - + 12.0.0 - The amount by which the number of threads is incremented when necessary. - Deprecated in favor of taskpool_auto_increment. + The amount by which the number of threads is incremented when necessary. 12.0.0 - Number of seconds before an idle taskprocessor should be disposed of. - Deprecated in favor of taskpool_idle_timeout. + Number of seconds before an idle thread should be disposed of. - + 13.7.0 - Maximum number of taskprocessors in the res_pjsip taskpool. - A value of 0 indicates no maximum. - Deprecated in favor of taskpool_max_size. + Maximum number of threads in the res_pjsip threadpool. + A value of 0 indicates no maximum. diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index adf18d3b8b..c960611101 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -24,7 +24,7 @@ #include "asterisk/acl.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/taskpool.h" +#include "asterisk/threadpool.h" #include "asterisk/res_pjsip_cli.h" static int distribute(void *data); @@ -75,7 +75,7 @@ static pj_status_t record_serializer(pjsip_tx_data *tdata) { struct ast_taskprocessor *serializer; - serializer = ast_taskpool_serializer_get_current(); + serializer = ast_threadpool_serializer_get_current(); if (serializer) { const char *name; diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index 1d75c903ed..93354b9309 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -33,7 +33,7 @@ #include "asterisk/statsd.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/threadpool.h" /* * This implementation for OPTIONS support is based around the idea diff --git a/res/res_pjsip/pjsip_resolver.c b/res/res_pjsip/pjsip_resolver.c index f5f4a1cfc5..2babfe40de 100644 --- a/res/res_pjsip/pjsip_resolver.c +++ b/res/res_pjsip/pjsip_resolver.c @@ -31,7 +31,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/taskprocessor.h" -#include "asterisk/taskpool.h" +#include "asterisk/threadpool.h" #ifdef HAVE_PJSIP_EXTERNAL_RESOLVER @@ -611,7 +611,7 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip return; } - resolve->serializer = ao2_bump(ast_taskpool_serializer_get_current()); + resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current()); ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host); ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve); diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 22c7a1f345..072ddce5cc 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -1573,8 +1573,8 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - mwi_serializer_pool = ast_serializer_taskpool_create("pjsip/mwi", - MWI_SERIALIZER_POOL_SIZE, ast_sip_taskpool(), MAX_UNLOAD_TIMEOUT_TIME); + mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi", + MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME); if (!mwi_serializer_pool) { ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 538df56125..4b47f2b33f 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -33,8 +33,7 @@ #include "asterisk/res_pjsip_outbound_publish.h" #include "asterisk/module.h" #include "asterisk/taskprocessor.h" -#include "asterisk/taskpool.h" -#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/threadpool.h" #include "asterisk/datastore.h" #include "res_pjsip/include/res_pjsip_private.h" diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 2746eeaea9..75744affa9 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -35,8 +35,7 @@ #include "asterisk/cli.h" #include "asterisk/stasis_system.h" #include "asterisk/threadstorage.h" -#include "asterisk/taskpool.h" -#include "asterisk/serializer_shutdown_group.h" +#include "asterisk/threadpool.h" #include "asterisk/statsd.h" #include "res_pjsip/include/res_pjsip_private.h" #include "asterisk/vector.h" @@ -1670,7 +1669,7 @@ static void sip_outbound_registration_response_cb(struct pjsip_regc_cbparam *par * pjproject callback thread. */ if (ast_sip_push_task(client_state->serializer, handle_registration_response, response)) { - ast_log(LOG_WARNING, "Failed to pass incoming registration response to taskpool\n"); + ast_log(LOG_WARNING, "Failed to pass incoming registration response to threadpool\n"); ao2_cleanup(response); } } @@ -1691,7 +1690,7 @@ static void sip_outbound_registration_state_destroy(void *obj) ao2_ref(state->client_state, -1); } else if (ast_sip_push_task(state->client_state->serializer, handle_client_state_destruction, state->client_state)) { - ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to taskpool\n"); + ast_log(LOG_WARNING, "Failed to pass outbound registration client destruction to threadpool\n"); ao2_ref(state->client_state, -1); } } diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 67d7836ce7..6616b1c0c9 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -343,7 +343,7 @@ static struct ast_frame *refer_progress_framehook(struct ast_channel *chan, stru } } - /* If a notification is due to be sent push it to the taskpool */ + /* If a notification is due to be sent push it to the thread pool */ if (notification) { /* If the subscription is being terminated we don't need the frame hook any longer */ if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) { @@ -395,7 +395,7 @@ static struct ast_frame *refer_ari_progress_framehook(struct ast_channel *chan, progress->ari_state->last_response = *message; } - /* If a notification is due to be sent push it to the taskpool */ + /* If a notification is due to be sent push it to the thread pool */ if (notification) { /* If the subscription is being terminated we don't need the frame hook any longer */ if (notification->state == PJSIP_EVSUB_STATE_TERMINATED) { diff --git a/res/res_pjsip_rfc3326.c b/res/res_pjsip_rfc3326.c index a4992225fe..e4e4e1b12f 100644 --- a/res/res_pjsip_rfc3326.c +++ b/res/res_pjsip_rfc3326.c @@ -32,7 +32,7 @@ #include "asterisk/res_pjsip_session.h" #include "asterisk/module.h" #include "asterisk/causes.h" -#include "asterisk/taskpool.h" +#include "asterisk/threadpool.h" static void rfc3326_use_reason_header(struct ast_sip_session *session, struct pjsip_rx_data *rdata) { @@ -139,7 +139,7 @@ static void rfc3326_outgoing_request(struct ast_sip_session *session, struct pjs * checks so we must also be running under the call's serializer * thread. */ - || session->serializer != ast_taskpool_serializer_get_current()) { + || session->serializer != ast_threadpool_serializer_get_current()) { return; } @@ -152,7 +152,7 @@ static void rfc3326_outgoing_response(struct ast_sip_session *session, struct pj if (status.code < 300 || !session->channel - || session->serializer != ast_taskpool_serializer_get_current()) { + || session->serializer != ast_threadpool_serializer_get_current()) { return; } diff --git a/res/res_pjsip_rfc3329.c b/res/res_pjsip_rfc3329.c index d57392d4f8..f6faff2afe 100644 --- a/res/res_pjsip_rfc3329.c +++ b/res/res_pjsip_rfc3329.c @@ -32,6 +32,7 @@ #include "asterisk/res_pjsip_session.h" #include "asterisk/module.h" #include "asterisk/causes.h" +#include "asterisk/threadpool.h" /*! \brief Private data structure used with the modules's datastore */ struct rfc3329_store_data { diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c index 135aa19279..bca226c857 100644 --- a/res/res_pjsip_session.c +++ b/res/res_pjsip_session.c @@ -42,7 +42,6 @@ #include "asterisk/uuid.h" #include "asterisk/pbx.h" #include "asterisk/taskprocessor.h" -#include "asterisk/taskpool.h" #include "asterisk/causes.h" #include "asterisk/sdp_srtp.h" #include "asterisk/dsp.h" @@ -3128,14 +3127,115 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, return ret_session; } +/*! \brief struct controlling the suspension of the session's serializer. */ +struct ast_sip_session_suspender { + ast_cond_t cond_suspended; + ast_cond_t cond_complete; + int suspended; + int complete; +}; + +static void sip_session_suspender_dtor(void *vdoomed) +{ + struct ast_sip_session_suspender *doomed = vdoomed; + + ast_cond_destroy(&doomed->cond_suspended); + ast_cond_destroy(&doomed->cond_complete); +} + +/*! + * \internal + * \brief Block the session serializer thread task. + * + * \param data Pushed serializer task data for suspension. + * + * \retval 0 + */ +static int sip_session_suspend_task(void *data) +{ + struct ast_sip_session_suspender *suspender = data; + + ao2_lock(suspender); + + /* Signal that the serializer task is now suspended. */ + suspender->suspended = 1; + ast_cond_signal(&suspender->cond_suspended); + + /* Wait for the serializer suspension to be completed. */ + while (!suspender->complete) { + ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender)); + } + + ao2_unlock(suspender); + ao2_ref(suspender, -1); + + return 0; +} + void ast_sip_session_suspend(struct ast_sip_session *session) { - ast_taskpool_serializer_suspend(session->serializer); + struct ast_sip_session_suspender *suspender; + int res; + + ast_assert(session->suspended == NULL); + + if (ast_taskprocessor_is_task(session->serializer)) { + /* I am the session's serializer thread so I cannot suspend. */ + return; + } + + if (ast_taskprocessor_is_suspended(session->serializer)) { + /* The serializer already suspended. */ + return; + } + + suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor); + if (!suspender) { + /* We will just have to hope that the system does not deadlock */ + return; + } + ast_cond_init(&suspender->cond_suspended, NULL); + ast_cond_init(&suspender->cond_complete, NULL); + + ao2_ref(suspender, +1); + res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender); + if (res) { + /* We will just have to hope that the system does not deadlock */ + ao2_ref(suspender, -2); + return; + } + + session->suspended = suspender; + + /* Wait for the serializer to get suspended. */ + ao2_lock(suspender); + while (!suspender->suspended) { + ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender)); + } + ao2_unlock(suspender); + + ast_taskprocessor_suspend(session->serializer); } void ast_sip_session_unsuspend(struct ast_sip_session *session) { - ast_taskpool_serializer_unsuspend(session->serializer); + struct ast_sip_session_suspender *suspender = session->suspended; + + if (!suspender) { + /* Nothing to do */ + return; + } + session->suspended = NULL; + + /* Signal that the serializer task suspension is now complete. */ + ao2_lock(suspender); + suspender->complete = 1; + ast_cond_signal(&suspender->cond_complete); + ao2_unlock(suspender); + + ao2_ref(suspender, -1); + + ast_taskprocessor_unsuspend(session->serializer); } /*!