From: Mike Bradeen Date: Tue, 28 Oct 2025 21:26:03 +0000 (-0600) Subject: taskprocessors: Improve logging and add new cli options X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f29ce05e3e5262a2040e86e89f98638453b032e0;p=thirdparty%2Fasterisk.git taskprocessors: Improve logging and add new cli options This change makes some small changes to improve log readability in addition to the following changes: Modified 'core show taskprocessors' to now show Low time and High time for task execution. New command 'core show taskprocessor name ' to dump taskprocessor info and current queue. Addionally, a new test was added to demonstrate the 'show taskprocessor name' functionality: test execute category /main/taskprocessor/ name taskprocessor_cli_show Setting 'core set debug 3 taskprocessor.c' will now log pushed tasks. (Warning this is will cause extremely high levels of logging at even low traffic levels.) Resolves: #1566 UserNote: New CLI command has been added - core show taskprocessor name --- diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 2fafd5790a..55461910a9 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2076,7 +2076,11 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg); * \retval 0 Success * \retval -1 Failure */ -int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); +int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function); + +#define ast_sip_push_task(serializer, sip_task, task_data) \ + __ast_sip_push_task(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Push a task to SIP servants and wait for it to complete. @@ -2112,13 +2116,19 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void * \return sip_task() return value on success. * \retval -1 Failure to push the task. */ -int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); +int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function); +#define ast_sip_push_task_wait_servant(serializer, sip_task, task_data) \ + __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Push a task to SIP servants and wait for it to complete. * \deprecated Replaced with ast_sip_push_task_wait_servant(). */ -int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); +int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function); +#define ast_sip_push_task_synchronous(serializer, sip_task, task_data) \ + __ast_sip_push_task_synchronous(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Push a task to the serializer and wait for it to complete. @@ -2162,7 +2172,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si * \return sip_task() return value on success. * \retval -1 Failure to push the task. */ -int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); +int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function); +#define ast_sip_push_task_wait_serializer(serializer, sip_task, task_data) \ + __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Determine if the current thread is a SIP servant thread diff --git a/include/asterisk/taskpool.h b/include/asterisk/taskpool.h index 2a4f963052..a2adc3e93b 100644 --- a/include/asterisk/taskpool.h +++ b/include/asterisk/taskpool.h @@ -197,8 +197,11 @@ long ast_taskpool_queue_size(struct ast_taskpool *pool); * \retval 0 success * \retval -1 failure */ -int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) +int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) attribute_warn_unused_result; +#define ast_taskpool_push(pool, task, data) \ + __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Push a task to the taskpool, and wait for completion @@ -214,8 +217,11 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void * * \retval 0 success * \retval -1 failure */ -int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) +int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) attribute_warn_unused_result; +#define ast_taskpool_push_wait(pool, task, data) \ + __ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Shut down a taskpool and remove the underlying taskprocessors diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 3e3886eb1e..6477fc382b 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -214,8 +214,10 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); * \retval -1 failure * \since 1.6.1 */ -int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) - attribute_warn_unused_result; +int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap, + const char *file, int line, const char *function) attribute_warn_unused_result; +#define ast_taskprocessor_push(tps, task_exe, datap) \ + __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! \brief Local data parameter */ struct ast_taskprocessor_local { @@ -240,9 +242,11 @@ struct ast_taskprocessor_local { * \retval -1 failure * \since 12.0.0 */ -int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, - int (*task_exe)(struct ast_taskprocessor_local *local), void *datap) - attribute_warn_unused_result; +int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, + int (*task_exe)(struct ast_taskprocessor_local *local), void *datap, + const char *file, int line, const char *function) attribute_warn_unused_result; +#define ast_taskprocessor_push_local(tps, task_exe, datap) \ + __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Indicate the taskprocessor is suspended. diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 72b2863c5d..2d759510cb 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -188,8 +188,11 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz * \retval 0 success * \retval -1 failure */ -int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) - attribute_warn_unused_result; +int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) attribute_warn_unused_result; + +#define ast_threadpool_push(pool, task, data) \ + __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) /*! * \brief Shut down a threadpool and destroy it diff --git a/main/taskpool.c b/main/taskpool.c index 59ac4b0c72..c1d92a7a12 100644 --- a/main/taskpool.c +++ b/main/taskpool.c @@ -519,7 +519,13 @@ static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpoo } } -int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) +#undef ast_taskpool_push +#define ast_taskpool_push_internal(pool, task, data) \ + __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data); + +int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) { RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup); @@ -555,13 +561,19 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void * return -1; } - if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) { + if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) { return -1; } return 0; } +/* ABI compatibility: Provide actual function symbol for external modules */ +int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + return __ast_taskpool_push(pool, task, data, NULL, 0, NULL); +} + /*! * \internal Structure used for synchronous task */ @@ -620,7 +632,8 @@ static int taskpool_sync_task(void *data) return ret; } -int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) +int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) { struct taskpool_sync_task sync_task; @@ -635,7 +648,7 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v return -1; } - if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) { + if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) { taskpool_sync_task_cleanup(&sync_task); return -1; } @@ -650,6 +663,15 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v return sync_task.fail; } +/* ABI compatibility: Provide actual function symbol for external modules */ +#undef ast_taskpool_push_wait +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data); + +int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data) +{ + return __ast_taskpool_push_wait(pool, task, data, NULL, 0, NULL); +} + void ast_taskpool_shutdown(struct ast_taskpool *pool) { if (!pool) { diff --git a/main/taskprocessor.c b/main/taskprocessor.c index c4015279c5..8ba6e0f6c2 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -55,6 +55,10 @@ struct tps_task { /*! \brief AST_LIST_ENTRY overhead */ AST_LIST_ENTRY(tps_task) list; unsigned int wants_local:1; + /*! \brief Debug information about where the task was pushed from */ + const char *file; + int line; + const char *function; }; /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */ @@ -63,6 +67,16 @@ struct tps_taskprocessor_stats { unsigned long max_qsize; /*! \brief This is the current number of tasks processed */ unsigned long _tasks_processed_count; + /*! \brief Highest time (in microseconds) spent processing a task */ + long highest_time_processed; + /*! \brief Lowest time (in microseconds) spent processing a task */ + long lowest_time_processed; + /*! \brief File where the highest time task was pushed from */ + const char *highest_time_task_file; + /*! \brief Line where the highest time task was pushed from */ + int highest_time_task_line; + /*! \brief Function where the highest time task was pushed from */ + const char *highest_time_task_function; }; /*! \brief A ast_taskprocessor structure is a singleton by name */ @@ -155,6 +169,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); +static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags); @@ -162,6 +177,7 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags); static struct ast_cli_entry taskprocessor_clis[] = { AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"), AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"), + AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"), AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"), AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"), AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"), @@ -189,6 +205,17 @@ static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listene listener->user_data = NULL; } +/* Keeping the old symbols for ABI compatibility */ +#undef ast_taskprocessor_push +#define ast_taskprocessor_push_internal(tps, task_exe, datap) \ + __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__) +int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); + +#undef ast_taskprocessor_push_local +#define ast_taskprocessor_push_local_internal(tps, task_exe, datap) \ + __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__) +int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap); + /*! * \brief Function that processes tasks in the taskprocessor * \internal @@ -204,8 +231,8 @@ static void *default_tps_processing_function(void *data) while (!pvt->dead) { res = ast_sem_wait(&pvt->sem); if (res != 0 && errno != EINTR) { - ast_log(LOG_ERROR, "ast_sem_wait(): %s\n", - strerror(errno)); + ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n", + tps->name, strerror(errno)); /* Just give up */ break; } @@ -238,8 +265,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int struct default_taskprocessor_listener_pvt *pvt = listener->user_data; if (ast_sem_post(&pvt->sem) != 0) { - ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n", - strerror(errno)); + ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n", + listener->tps->name, strerror(errno)); } } @@ -258,7 +285,7 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene /* Hold a reference during shutdown */ ao2_t_ref(listener->tps, +1, "tps-shutdown"); - if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) { + if (ast_taskprocessor_push_internal(listener->tps, default_listener_die, pvt)) { /* This will cause the thread to exit early without completing tasks already * in the queue. This is probably the least bad option in this situation. */ default_listener_die(pvt); @@ -312,7 +339,7 @@ static void tps_shutdown(void) objcount = ao2_container_count(tps_singletons); if (objcount > 0) { ast_log(LOG_DEBUG, - "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n", + "Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n", objcount); /* give the running taskprocessors a chance to finish, up to @@ -327,8 +354,8 @@ static void tps_shutdown(void) delay.tv_sec = 1; delay.tv_nsec = 0; ast_log(LOG_DEBUG, - "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n", - objcount); + "Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n", + objcount, tries + 1); } } @@ -336,17 +363,19 @@ static void tps_shutdown(void) * a taskprocessor was not cleaned up somewhere */ if (objcount > 0) { ast_log(LOG_ERROR, - "Assertion may occur, the following taskprocessors are still running:\n"); + "Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n", + objcount, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT); sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb, NULL); if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) { - ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors"); + ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n"); } else { iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK); while ((tps = ao2_iterator_next(&iter))) { - ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name); + ast_log(LOG_ERROR, " - Taskprocessor '%s' (queue size: %ld)\n", + tps->name, tps->tps_queue_size); } } @@ -354,7 +383,7 @@ static void tps_shutdown(void) } else { ast_log(LOG_DEBUG, - "All waiting taskprocessors cleared!\n"); + "Taskprocessor shutdown: All taskprocessors completed successfully.\n"); } ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis)); @@ -370,13 +399,13 @@ int ast_tps_init(void) tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb); if (!tps_singletons) { - ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n"); + ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n"); return -1; } if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) { ao2_ref(tps_singletons, -1); - ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n"); + ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n"); return -1; } @@ -390,43 +419,51 @@ int ast_tps_init(void) } /* allocate resources for the task */ -static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap) +static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap, + const char *file, int line, const char *function) { struct tps_task *t; if (!task_exe) { - ast_log(LOG_ERROR, "task_exe is NULL!\n"); + ast_log(LOG_ERROR, "Task callback function is NULL!\n"); return NULL; } t = ast_calloc(1, sizeof(*t)); if (!t) { - ast_log(LOG_ERROR, "failed to allocate task!\n"); + ast_log(LOG_ERROR, "Failed to allocate memory for task!\n"); return NULL; } t->callback.execute = task_exe; t->datap = datap; + t->file = file; + t->line = line; + t->function = function; return t; } -static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap) +static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap, + const char *file, int line, const char *function) { struct tps_task *t; if (!task_exe) { - ast_log(LOG_ERROR, "task_exe is NULL!\n"); + ast_log(LOG_ERROR, "Task callback function is NULL!\n"); return NULL; } t = ast_calloc(1, sizeof(*t)); if (!t) { - ast_log(LOG_ERROR, "failed to allocate task!\n"); + ast_log(LOG_ERROR, "Failed to allocate memory for task!\n"); return NULL; } t->callback.execute_local = task_exe; t->datap = datap; t->wants_local = 1; + t->file = file; + t->line = line; + t->function = function; return t; } @@ -520,7 +557,7 @@ static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args ts.tv_nsec = when.tv_usec * 1000; ast_mutex_lock(&cli_ping_cond_lock); - if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) { + if (ast_taskprocessor_push_internal(tps, tps_ping_handler, 0) < 0) { ast_mutex_unlock(&cli_ping_cond_lock); ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name); ast_taskprocessor_unreference(tps); @@ -574,8 +611,8 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags) return cmp; } -#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n" -#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n" +#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s %10s %10s\n" +#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n" /*! * \internal @@ -589,7 +626,8 @@ static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocesso { ast_cli(fd, FMT_FIELDS, tps->name, tps->stats._tasks_processed_count, tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low, - tps->tps_queue_high); + tps->tps_queue_high, tps->stats.lowest_time_processed, + tps->stats.highest_time_processed); } /*! @@ -667,12 +705,94 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SHOWUSAGE; } - ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water"); + ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)"); ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like)); return CLI_SUCCESS; } +static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + const char *name; + struct ast_taskprocessor *tps; + struct tps_task *task; + int task_count = 0; + + switch (cmd) { + case CLI_INIT: + e->command = "core show taskprocessor name"; + e->usage = + "Usage: core show taskprocessor name \n" + " Displays detailed information about a specific taskprocessor,\n" + " including all queued tasks and their origins (DEVMODE only).\n"; + return NULL; + case CLI_GENERATE: + if (a->pos == 4) { + return tps_taskprocessor_tab_complete(a); + } + return NULL; + } + + if (a->argc != 5) { + return CLI_SHOWUSAGE; + } + + name = a->argv[4]; + tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS); + if (!tps) { + ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name); + return CLI_SUCCESS; + } + + ao2_lock(tps); + + ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name); + ast_cli(a->fd, "===========================================\n"); + ast_cli(a->fd, "Subsystem: %s\n", tps->subsystem[0] ? tps->subsystem : "(none)"); + ast_cli(a->fd, "Tasks processed: %lu\n", tps->stats._tasks_processed_count); + ast_cli(a->fd, "Current queue size: %ld\n", tps->tps_queue_size); + ast_cli(a->fd, "Max queue depth: %lu\n", tps->stats.max_qsize); + ast_cli(a->fd, "Low water mark: %ld\n", tps->tps_queue_low); + ast_cli(a->fd, "High water mark: %ld\n", tps->tps_queue_high); + ast_cli(a->fd, "High water alert: %s\n", tps->high_water_alert ? "Yes" : "No"); + ast_cli(a->fd, "Suspended: %s\n", tps->suspended ? "Yes" : "No"); + ast_cli(a->fd, "Currently executing: %s\n", tps->executing ? "Yes" : "No"); + ast_cli(a->fd, "Highest time (us): %ld\n", tps->stats.highest_time_processed); + if (tps->stats.highest_time_task_file) { + ast_cli(a->fd, " Highest task origin: %s:%d (%s)\n", + tps->stats.highest_time_task_file, + tps->stats.highest_time_task_line, + tps->stats.highest_time_task_function); + } + ast_cli(a->fd, "Lowest time (us): %ld\n", tps->stats.lowest_time_processed); + + if (tps->tps_queue_size > 0) { + ast_cli(a->fd, "\nQueued Tasks:\n"); + ast_cli(a->fd, "-------------------------------------------\n"); + + AST_LIST_TRAVERSE(&tps->tps_queue, task, list) { + task_count++; + if (task->file) { + ast_cli(a->fd, " Task #%d:\n", task_count); + ast_cli(a->fd, " Origin: %s:%d\n", task->file, task->line); + ast_cli(a->fd, " Function: %s\n", task->function); + ast_cli(a->fd, " Type: %s\n", task->wants_local ? "Local" : "Standard"); + } else { + ast_cli(a->fd, " Task #%d: (origin not available)\n", task_count); + } + } + ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count); + } else { + ast_cli(a->fd, "\nNo tasks currently queued.\n"); + } + + ao2_unlock(tps); + ast_taskprocessor_unreference(tps); + + ast_cli(a->fd, "\n"); + return CLI_SUCCESS; +} + /* hash callback for astobj2 */ static int tps_hash_cb(const void *obj, const int flags) { @@ -866,8 +986,8 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta) if (DEBUG_ATLEAST(3) /* and tps_alert_count becomes zero or non-zero */ && !old != !tps_alert_count) { - ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n", - tps->name, tps_alert_count ? "triggered" : "cleared"); + ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n", + tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count); } if (tps->subsystem[0] != '\0') { @@ -1028,11 +1148,12 @@ static void *default_listener_pvt_alloc(void) pvt = ast_calloc(1, sizeof(*pvt)); if (!pvt) { + ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n"); return NULL; } pvt->poll_thread = AST_PTHREADT_NULL; if (ast_sem_init(&pvt->sem, 0, 0) != 0) { - ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno)); + ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno)); ast_free(pvt); return NULL; } @@ -1098,7 +1219,7 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p) { if (p && p->listener->callbacks->start(p->listener)) { - ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", + ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n", p->name); ast_taskprocessor_unreference(p); @@ -1118,7 +1239,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o struct default_taskprocessor_listener_pvt *pvt; if (ast_strlen_zero(name)) { - ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n"); + ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n"); return NULL; } ao2_lock(tps_singletons); @@ -1212,23 +1333,28 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) int was_empty; if (!tps) { - ast_log(LOG_ERROR, "tps is NULL!\n"); + ast_log(LOG_ERROR, "Taskprocessor is NULL!\n"); return -1; } if (!t) { - ast_log(LOG_ERROR, "t is NULL!\n"); + ast_log(LOG_ERROR, "Task is NULL!\n"); return -1; } + if (t->file) { + ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n", + tps->name, t->file, t->line, t->function); + } + ao2_lock(tps); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; if (tps->tps_queue_high <= tps->tps_queue_size) { if (!tps->high_water_alert) { - ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n", - tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : ""); + ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n", + tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : ""); tps->high_water_warned = 1; tps->high_water_alert = 1; tps_alert_add(tps, +1); @@ -1242,14 +1368,26 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) return 0; } +int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap, + const char *file, int line, const char *function) +{ + return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function)); +} + int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) { - return taskprocessor_push(tps, tps_task_alloc(task_exe, datap)); + return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL); +} + +int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap, + const char *file, int line, const char *function) +{ + return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function)); } int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap) { - return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap)); + return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL); } int ast_taskprocessor_suspend(struct ast_taskprocessor *tps) @@ -1284,6 +1422,11 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) struct ast_taskprocessor_local local; struct tps_task *t; long size; + struct timeval start; + long elapsed; + const char *task_file = NULL; + int task_line = 0; + const char *task_function = NULL; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -1299,8 +1442,15 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) local.local_data = tps->local_data; local.data = t->datap; } + + /* Save task origin info before we free the task */ + task_file = t->file; + task_line = t->line; + task_function = t->function; ao2_unlock(tps); + start = ast_tvnow(); + if (t->wants_local) { t->callback.execute_local(&local); } else { @@ -1324,6 +1474,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) if (size >= tps->stats.max_qsize) { tps->stats.max_qsize = size + 1; } + + elapsed = ast_tvdiff_us(ast_tvnow(), start); + if (elapsed > tps->stats.highest_time_processed) { + tps->stats.highest_time_processed = elapsed; + tps->stats.highest_time_task_file = task_file; + tps->stats.highest_time_task_line = task_line; + tps->stats.highest_time_task_function = task_function; + } + if (elapsed < tps->stats.lowest_time_processed) { + tps->stats.lowest_time_processed = elapsed; + } + ao2_unlock(tps); /* If we executed a task, check for the transition to empty */ @@ -1393,6 +1555,11 @@ static void tps_reset_stats(struct ast_taskprocessor *tps) ao2_lock(tps); tps->stats._tasks_processed_count = 0; tps->stats.max_qsize = 0; + tps->stats.highest_time_processed = 0; + tps->stats.lowest_time_processed = 0; + tps->stats.highest_time_task_file = NULL; + tps->stats.highest_time_task_line = 0; + tps->stats.highest_time_task_function = NULL; ao2_unlock(tps); } diff --git a/main/threadpool.c b/main/threadpool.c index 0969e627c3..1a815782f9 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -608,8 +608,7 @@ static int queued_task_pushed(void *data) * \param listener The taskprocessor listener. The threadpool is the listener's private data * \param was_empty True if the taskprocessor was empty prior to the task being pushed */ -static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, - int was_empty) +static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener); struct task_pushed_data *tpd; @@ -954,15 +953,27 @@ struct ast_threadpool *ast_threadpool_create(const char *name, return pool; } -int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) +#undef ast_threadpool_push +#define ast_threadpool_push_internal(pool, task, data) \ + __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data); + +int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data, + const char *file, int line, const char *function) { SCOPED_AO2LOCK(lock, pool); if (!pool->shutting_down) { - return ast_taskprocessor_push(pool->tps, task, data); + return __ast_taskprocessor_push(pool->tps, task, data, file, line, function); } return -1; } +/* ABI compatibility: Provide actual function symbol for external modules */ +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) +{ + return __ast_threadpool_push(pool, task, data, NULL, 0, NULL); +} + void ast_threadpool_shutdown(struct ast_threadpool *pool) { if (!pool) { diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 5b954b2226..6e31084756 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -2096,13 +2096,48 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) return ast_sip_create_serializer_group(name, NULL); } -int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +#undef ast_sip_push_task +int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function) { if (!serializer) { serializer = ast_serializer_pool_get(sip_serializer_pool); } - return ast_taskprocessor_push(serializer, sip_task, task_data); + return __ast_taskprocessor_push(serializer, sip_task, task_data, file, line, function); +} + +/* ABI compatibility: Provide actual function symbol for external modules */ +int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return __ast_sip_push_task(serializer, sip_task, task_data, NULL, 0, NULL); +} + +/* ABI compatibility: Provide actual function symbols for wait functions */ +#undef ast_sip_push_task_wait_servant +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, NULL, 0, NULL); +} + +#undef ast_sip_push_task_synchronous +int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return __ast_sip_push_task_synchronous(serializer, sip_task, task_data, NULL, 0, NULL); +} + +#undef ast_sip_push_task_wait_serializer +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, NULL, 0, NULL); } struct sync_task_data { @@ -2134,7 +2169,8 @@ static int sync_task(void *data) return ret; } -static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +static int __ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function) { /* This method is an onion */ struct sync_task_data std; @@ -2145,7 +2181,7 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si std.task = sip_task; std.task_data = task_data; - if (ast_sip_push_task(serializer, sync_task, &std)) { + if (__ast_sip_push_task(serializer, sync_task, &std, file, line, function)) { ast_mutex_destroy(&std.lock); ast_cond_destroy(&std.cond); return -1; @@ -2162,21 +2198,24 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si return std.fail; } -int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function) { if (ast_sip_thread_is_servant()) { return sip_task(task_data); } - return ast_sip_push_task_wait(serializer, sip_task, task_data); + return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function); } -int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function) { - return ast_sip_push_task_wait_servant(serializer, sip_task, task_data); + return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, file, line, function); } -int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data, + const char *file, int line, const char *function) { if (!serializer) { /* Caller doesn't care which PJSIP serializer the task executes under. */ @@ -2195,7 +2234,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int return sip_task(task_data); } - return ast_sip_push_task_wait(serializer, sip_task, task_data); + return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function); } void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 537f77c767..e2e827fb10 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -31,12 +31,15 @@ #include "asterisk.h" +#include + #include "asterisk/test.h" #include "asterisk/taskprocessor.h" #include "asterisk/module.h" #include "asterisk/astobj2.h" #include "asterisk/serializer.h" #include "asterisk/threadpool.h" +#include "asterisk/cli.h" /*! * \brief userdata associated with baseline taskprocessor test @@ -963,6 +966,170 @@ AST_TEST_DEFINE(serializer_pool) return AST_TEST_PASS; } +/*! + * \brief Test for CLI command "core show taskprocessor " + * + * This test creates a taskprocessor, queues tasks with controlled execution, + * and verifies that the CLI command displays the queued tasks correctly. + */ +AST_TEST_DEFINE(taskprocessor_cli_show) +{ + RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference); + struct task_data *task_data1 = NULL; + struct task_data *task_data2 = NULL; + struct task_data *task_data3 = NULL; + int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0; + char cli_command[128]; + int cli_output_fd[2]; + char output_buffer[4096] = {0}; + ssize_t bytes_read; + int res = AST_TEST_FAIL; + + switch (cmd) { + case TEST_INIT: + info->name = "taskprocessor_cli_show"; + info->category = "/main/taskprocessor/"; + info->summary = "Test CLI command 'core show taskprocessor'"; + info->description = + "Verifies that the 'core show taskprocessor ' CLI command\n" + "displays taskprocessor information and queued tasks correctly."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + /* Create a pipe to capture CLI output */ + if (pipe(cli_output_fd) != 0) { + ast_test_status_update(test, "Failed to create pipe for CLI output\n"); + return AST_TEST_FAIL; + } + + /* Create taskprocessor */ + tps = ast_taskprocessor_get("test_cli_taskprocessor", TPS_REF_DEFAULT); + if (!tps) { + ast_test_status_update(test, "Unable to create test taskprocessor\n"); + close(cli_output_fd[0]); + close(cli_output_fd[1]); + return AST_TEST_FAIL; + } + + /* Create tasks that will wait so they stay in the queue */ + task_data1 = task_data_create(); + task_data2 = task_data_create(); + task_data3 = task_data_create(); + + if (!task_data1 || !task_data2 || !task_data3) { + ast_test_status_update(test, "Unable to create task_data\n"); + goto cleanup; + } + + /* Set a long wait time so tasks stay queued */ + task_data1->wait_time = 2000; /* 2 seconds */ + task_data2->wait_time = 2000; + task_data3->wait_time = 2000; + + /* Queue the tasks */ + if (ast_taskprocessor_push(tps, task, task_data1)) { + ast_test_status_update(test, "Failed to queue task 1\n"); + goto cleanup; + } + task_queued1 = 1; + + if (ast_taskprocessor_push(tps, task, task_data2)) { + ast_test_status_update(test, "Failed to queue task 2\n"); + goto cleanup; + } + task_queued2 = 1; + + if (ast_taskprocessor_push(tps, task, task_data3)) { + ast_test_status_update(test, "Failed to queue task 3\n"); + goto cleanup; + } + task_queued3 = 1; + + /* Execute the CLI command */ + snprintf(cli_command, sizeof(cli_command), "core show taskprocessor name test_cli_taskprocessor"); + + if (ast_cli_command(cli_output_fd[1], cli_command) != 0) { + ast_test_status_update(test, "CLI command execution failed\n"); + goto cleanup; + } + + /* Close write end and read the output */ + close(cli_output_fd[1]); + cli_output_fd[1] = -1; + + bytes_read = read(cli_output_fd[0], output_buffer, sizeof(output_buffer) - 1); + if (bytes_read <= 0) { + ast_test_status_update(test, "Failed to read CLI output\n"); + goto cleanup; + } + output_buffer[bytes_read] = '\0'; + + /* Log the output for inspection */ + ast_test_status_update(test, "CLI Output:\n%s\n", output_buffer); + + /* Verify the output contains expected information */ + if (!strstr(output_buffer, "test_cli_taskprocessor")) { + ast_test_status_update(test, "Output missing taskprocessor name\n"); + goto cleanup; + } + + if (!strstr(output_buffer, "Current queue size")) { + ast_test_status_update(test, "Output missing queue size information\n"); + goto cleanup; + } + + /* Check for queued tasks section (at least one task should be shown) */ + if (!strstr(output_buffer, "Queued Tasks") && !strstr(output_buffer, "Currently executing")) { + ast_test_status_update(test, "Output missing queued tasks or execution status\n"); + goto cleanup; + } + + /* Verify we see task information */ + if (!strstr(output_buffer, "Task #")) { + ast_test_status_update(test, "Output missing task information\n"); + goto cleanup; + } + + ast_test_status_update(test, "CLI command output validated successfully\n"); + res = AST_TEST_PASS; + +cleanup: + + ast_test_status_update(test, "Waiting for tasks to complete\n"); + + /* Wait for tasks to complete */ + if (task_data1) { + if (task_queued1) { + task_wait(task_data1); + } + ao2_cleanup(task_data1); + } + if (task_data2) { + if (task_queued2) { + task_wait(task_data2); + } + ao2_cleanup(task_data2); + } + if (task_data3) { + if (task_queued3) { + task_wait(task_data3); + } + ao2_cleanup(task_data3); + } + + if (cli_output_fd[0] >= 0) { + close(cli_output_fd[0]); + } + if (cli_output_fd[1] >= 0) { + close(cli_output_fd[1]); + } + + ast_test_status_update(test, "Tasks complete\n"); + return res; +} + static int unload_module(void) { ast_test_unregister(default_taskprocessor); @@ -972,6 +1139,7 @@ static int unload_module(void) ast_test_unregister(taskprocessor_shutdown); ast_test_unregister(taskprocessor_push_local); ast_test_unregister(serializer_pool); + ast_test_unregister(taskprocessor_cli_show); return 0; } @@ -984,6 +1152,7 @@ static int load_module(void) ast_test_register(taskprocessor_shutdown); ast_test_register(taskprocessor_push_local); ast_test_register(serializer_pool); + ast_test_register(taskprocessor_cli_show); return AST_MODULE_LOAD_SUCCESS; }