]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
taskprocessors: Improve logging and add new cli options
authorMike Bradeen <mbradeen@sangoma.com>
Tue, 28 Oct 2025 21:26:03 +0000 (15:26 -0600)
committergithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Thu, 4 Dec 2025 16:13:08 +0000 (16:13 +0000)
This change makes some small changes to improve log readability in
addition to the following changes:

Modified 'core show taskprocessors' to now show Low time and High time
for task execution.

New command 'core show taskprocessor name <taskprocessor-name>' to dump
taskprocessor info and current queue.

Addionally, a new test was added to demonstrate the 'show taskprocessor
name' functionality:
test execute category /main/taskprocessor/ name taskprocessor_cli_show

Setting 'core set debug 3 taskprocessor.c' will now log pushed tasks.
(Warning this is will cause extremely high levels of logging at even
low traffic levels.)

Resolves: #1566

UserNote: New CLI command has been added -
core show taskprocessor name <taskprocessor-name>

include/asterisk/res_pjsip.h
include/asterisk/taskpool.h
include/asterisk/taskprocessor.h
include/asterisk/threadpool.h
main/taskpool.c
main/taskprocessor.c
main/threadpool.c
res/res_pjsip.c
tests/test_taskprocessor.c

index 2fafd5790a2e5d69a9e774d96008507fb94d8c75..55461910a92cb7322745b6e262c14c763b2553b8 100644 (file)
@@ -2076,7 +2076,11 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg);
  * \retval 0 Success
  * \retval -1 Failure
  */
-int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function);
+
+#define ast_sip_push_task(serializer, sip_task, task_data) \
+       __ast_sip_push_task(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Push a task to SIP servants and wait for it to complete.
@@ -2112,13 +2116,19 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void
  * \return sip_task() return value on success.
  * \retval -1 Failure to push the task.
  */
-int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function);
+#define ast_sip_push_task_wait_servant(serializer, sip_task, task_data) \
+       __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Push a task to SIP servants and wait for it to complete.
  * \deprecated Replaced with ast_sip_push_task_wait_servant().
  */
-int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function);
+#define ast_sip_push_task_synchronous(serializer, sip_task, task_data) \
+       __ast_sip_push_task_synchronous(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Push a task to the serializer and wait for it to complete.
@@ -2162,7 +2172,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
  * \return sip_task() return value on success.
  * \retval -1 Failure to push the task.
  */
-int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function);
+#define ast_sip_push_task_wait_serializer(serializer, sip_task, task_data) \
+       __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Determine if the current thread is a SIP servant thread
index 2a4f963052955956dd19581164a4474692bea58d..a2adc3e93bf9249e8ecd4ff658c70330b796e13c 100644 (file)
@@ -197,8 +197,11 @@ long ast_taskpool_queue_size(struct ast_taskpool *pool);
  * \retval 0 success
  * \retval -1 failure
  */
-int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function)
        attribute_warn_unused_result;
+#define ast_taskpool_push(pool, task, data) \
+       __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Push a task to the taskpool, and wait for completion
@@ -214,8 +217,11 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
  * \retval 0 success
  * \retval -1 failure
  */
-int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function)
        attribute_warn_unused_result;
+#define ast_taskpool_push_wait(pool, task, data) \
+       __ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Shut down a taskpool and remove the underlying taskprocessors
index 3e3886eb1ef536a9df944dbe82be2d625909438f..6477fc382b3277ac2e17444c2c307aff254960a6 100644 (file)
@@ -214,8 +214,10 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
  * \retval -1 failure
  * \since 1.6.1
  */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
-       attribute_warn_unused_result;
+int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
+       const char *file, int line, const char *function) attribute_warn_unused_result;
+#define ast_taskprocessor_push(tps, task_exe, datap) \
+       __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*! \brief Local data parameter */
 struct ast_taskprocessor_local {
@@ -240,9 +242,11 @@ struct ast_taskprocessor_local {
  * \retval -1 failure
  * \since 12.0.0
  */
-int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
-       int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
-       attribute_warn_unused_result;
+int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+       int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
+       const char *file, int line, const char *function) attribute_warn_unused_result;
+#define ast_taskprocessor_push_local(tps, task_exe, datap) \
+       __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Indicate the taskprocessor is suspended.
index 72b2863c5d1b599b0b896c2bc6b321f7b037557b..2d759510cbb1b85bd03e93a3885a9871c9eff055 100644 (file)
@@ -188,8 +188,11 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz
  * \retval 0 success
  * \retval -1 failure
  */
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
-       attribute_warn_unused_result;
+int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function) attribute_warn_unused_result;
+
+#define ast_threadpool_push(pool, task, data) \
+       __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
 /*!
  * \brief Shut down a threadpool and destroy it
index 59ac4b0c722c649354506b1c75ae49364a9086c5..c1d92a7a122ceb920c1a1ebd8225d46f200d52a7 100644 (file)
@@ -519,7 +519,13 @@ static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpoo
        }
 }
 
-int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+#undef ast_taskpool_push
+#define ast_taskpool_push_internal(pool, task, data) \
+       __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data);
+
+int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function)
 {
        RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
 
@@ -555,13 +561,19 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
                return -1;
        }
 
-       if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
+       if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) {
                return -1;
        }
 
        return 0;
 }
 
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+       return __ast_taskpool_push(pool, task, data, NULL, 0, NULL);
+}
+
 /*!
  * \internal Structure used for synchronous task
  */
@@ -620,7 +632,8 @@ static int taskpool_sync_task(void *data)
        return ret;
 }
 
-int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function)
 {
        struct taskpool_sync_task sync_task;
 
@@ -635,7 +648,7 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
                return -1;
        }
 
-       if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) {
+       if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) {
                taskpool_sync_task_cleanup(&sync_task);
                return -1;
        }
@@ -650,6 +663,15 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
        return sync_task.fail;
 }
 
+/* ABI compatibility: Provide actual function symbol for external modules */
+#undef ast_taskpool_push_wait
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data);
+
+int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
+{
+       return __ast_taskpool_push_wait(pool, task, data, NULL, 0, NULL);
+}
+
 void ast_taskpool_shutdown(struct ast_taskpool *pool)
 {
        if (!pool) {
index c4015279c5bd1914dfcd9c9f0d36e60ed14cd409..8ba6e0f6c2ec8b7a9641416162c971810197e2b6 100644 (file)
@@ -55,6 +55,10 @@ struct tps_task {
        /*! \brief AST_LIST_ENTRY overhead */
        AST_LIST_ENTRY(tps_task) list;
        unsigned int wants_local:1;
+       /*! \brief Debug information about where the task was pushed from */
+       const char *file;
+       int line;
+       const char *function;
 };
 
 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -63,6 +67,16 @@ struct tps_taskprocessor_stats {
        unsigned long max_qsize;
        /*! \brief This is the current number of tasks processed */
        unsigned long _tasks_processed_count;
+       /*! \brief Highest time (in microseconds) spent processing a task */
+       long highest_time_processed;
+       /*! \brief Lowest time (in microseconds) spent processing a task */
+       long lowest_time_processed;
+       /*! \brief File where the highest time task was pushed from */
+       const char *highest_time_task_file;
+       /*! \brief Line where the highest time task was pushed from */
+       int highest_time_task_line;
+       /*! \brief Function where the highest time task was pushed from */
+       const char *highest_time_task_function;
 };
 
 /*! \brief A ast_taskprocessor structure is a singleton by name */
@@ -155,6 +169,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
 static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
 static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
 
@@ -162,6 +177,7 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
 static struct ast_cli_entry taskprocessor_clis[] = {
        AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
        AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+       AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"),
        AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
        AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
        AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
@@ -189,6 +205,17 @@ static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listene
        listener->user_data = NULL;
 }
 
+/* Keeping the old symbols for ABI compatibility */
+#undef ast_taskprocessor_push
+#define ast_taskprocessor_push_internal(tps, task_exe, datap) \
+       __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+
+#undef ast_taskprocessor_push_local
+#define ast_taskprocessor_push_local_internal(tps, task_exe, datap) \
+       __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap);
+
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
@@ -204,8 +231,8 @@ static void *default_tps_processing_function(void *data)
        while (!pvt->dead) {
                res = ast_sem_wait(&pvt->sem);
                if (res != 0 && errno != EINTR) {
-                       ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
-                               strerror(errno));
+                       ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n",
+                               tps->name, strerror(errno));
                        /* Just give up */
                        break;
                }
@@ -238,8 +265,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
        struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
        if (ast_sem_post(&pvt->sem) != 0) {
-               ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
-                       strerror(errno));
+               ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n",
+                       listener->tps->name, strerror(errno));
        }
 }
 
@@ -258,7 +285,7 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene
        /* Hold a reference during shutdown */
        ao2_t_ref(listener->tps, +1, "tps-shutdown");
 
-       if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+       if (ast_taskprocessor_push_internal(listener->tps, default_listener_die, pvt)) {
                /* This will cause the thread to exit early without completing tasks already
                 * in the queue.  This is probably the least bad option in this situation. */
                default_listener_die(pvt);
@@ -312,7 +339,7 @@ static void tps_shutdown(void)
        objcount = ao2_container_count(tps_singletons);
        if (objcount > 0) {
                ast_log(LOG_DEBUG,
-               "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
+                       "Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n",
                        objcount);
 
                /* give the running taskprocessors a chance to finish, up to
@@ -327,8 +354,8 @@ static void tps_shutdown(void)
                        delay.tv_sec = 1;
                        delay.tv_nsec = 0;
                        ast_log(LOG_DEBUG,
-                       "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
-                               objcount);
+                               "Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n",
+                               objcount, tries + 1);
                }
        }
 
@@ -336,17 +363,19 @@ static void tps_shutdown(void)
         * a taskprocessor was not cleaned up somewhere */
        if (objcount > 0) {
                ast_log(LOG_ERROR,
-               "Assertion may occur, the following taskprocessors are still running:\n");
+                       "Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n",
+                       objcount, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT);
 
                sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
                        NULL);
                if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
-                       ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors");
+                       ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n");
                }
                else {
                        iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
                        while ((tps = ao2_iterator_next(&iter))) {
-                               ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name);
+                               ast_log(LOG_ERROR, "  - Taskprocessor '%s' (queue size: %ld)\n",
+                                       tps->name, tps->tps_queue_size);
                        }
                }
 
@@ -354,7 +383,7 @@ static void tps_shutdown(void)
        }
        else {
                ast_log(LOG_DEBUG,
-                       "All waiting taskprocessors cleared!\n");
+                               "Taskprocessor shutdown: All taskprocessors completed successfully.\n");
        }
 
        ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -370,13 +399,13 @@ int ast_tps_init(void)
        tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
                TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
        if (!tps_singletons) {
-               ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
+               ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n");
                return -1;
        }
 
        if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
                ao2_ref(tps_singletons, -1);
-               ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+               ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n");
                return -1;
        }
 
@@ -390,43 +419,51 @@ int ast_tps_init(void)
 }
 
 /* allocate resources for the task */
-static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
+static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap,
+       const char *file, int line, const char *function)
 {
        struct tps_task *t;
        if (!task_exe) {
-               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               ast_log(LOG_ERROR, "Task callback function is NULL!\n");
                return NULL;
        }
 
        t = ast_calloc(1, sizeof(*t));
        if (!t) {
-               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
                return NULL;
        }
 
        t->callback.execute = task_exe;
        t->datap = datap;
+       t->file = file;
+       t->line = line;
+       t->function = function;
 
        return t;
 }
 
-static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
+       const char *file, int line, const char *function)
 {
        struct tps_task *t;
        if (!task_exe) {
-               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               ast_log(LOG_ERROR, "Task callback function is NULL!\n");
                return NULL;
        }
 
        t = ast_calloc(1, sizeof(*t));
        if (!t) {
-               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
                return NULL;
        }
 
        t->callback.execute_local = task_exe;
        t->datap = datap;
        t->wants_local = 1;
+       t->file = file;
+       t->line = line;
+       t->function = function;
 
        return t;
 }
@@ -520,7 +557,7 @@ static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
        ts.tv_nsec = when.tv_usec * 1000;
 
        ast_mutex_lock(&cli_ping_cond_lock);
-       if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
+       if (ast_taskprocessor_push_internal(tps, tps_ping_handler, 0) < 0) {
                ast_mutex_unlock(&cli_ping_cond_lock);
                ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
                ast_taskprocessor_unreference(tps);
@@ -574,8 +611,8 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
        return cmp;
 }
 
-#define FMT_HEADERS            "%-70s %10s %10s %10s %10s %10s\n"
-#define FMT_FIELDS             "%-70s %10lu %10lu %10lu %10lu %10lu\n"
+#define FMT_HEADERS            "%-70s %10s %10s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS             "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"
 
 /*!
  * \internal
@@ -589,7 +626,8 @@ static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocesso
 {
        ast_cli(fd, FMT_FIELDS, tps->name, tps->stats._tasks_processed_count,
                tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low,
-               tps->tps_queue_high);
+               tps->tps_queue_high, tps->stats.lowest_time_processed,
+               tps->stats.highest_time_processed);
 }
 
 /*!
@@ -667,12 +705,94 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
                return CLI_SHOWUSAGE;
        }
 
-       ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
+       ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)");
        ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
 
        return CLI_SUCCESS;
 }
 
+static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       const char *name;
+       struct ast_taskprocessor *tps;
+       struct tps_task *task;
+       int task_count = 0;
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "core show taskprocessor name";
+               e->usage =
+                       "Usage: core show taskprocessor name <taskprocessor>\n"
+                       "    Displays detailed information about a specific taskprocessor,\n"
+                       "    including all queued tasks and their origins (DEVMODE only).\n";
+               return NULL;
+       case CLI_GENERATE:
+               if (a->pos == 4) {
+                       return tps_taskprocessor_tab_complete(a);
+               }
+               return NULL;
+       }
+
+       if (a->argc != 5) {
+               return CLI_SHOWUSAGE;
+       }
+
+       name = a->argv[4];
+       tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS);
+       if (!tps) {
+               ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name);
+               return CLI_SUCCESS;
+       }
+
+       ao2_lock(tps);
+
+       ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name);
+       ast_cli(a->fd, "===========================================\n");
+       ast_cli(a->fd, "Subsystem:            %s\n", tps->subsystem[0] ? tps->subsystem : "(none)");
+       ast_cli(a->fd, "Tasks processed:      %lu\n", tps->stats._tasks_processed_count);
+       ast_cli(a->fd, "Current queue size:   %ld\n", tps->tps_queue_size);
+       ast_cli(a->fd, "Max queue depth:      %lu\n", tps->stats.max_qsize);
+       ast_cli(a->fd, "Low water mark:       %ld\n", tps->tps_queue_low);
+       ast_cli(a->fd, "High water mark:      %ld\n", tps->tps_queue_high);
+       ast_cli(a->fd, "High water alert:     %s\n", tps->high_water_alert ? "Yes" : "No");
+       ast_cli(a->fd, "Suspended:            %s\n", tps->suspended ? "Yes" : "No");
+       ast_cli(a->fd, "Currently executing:  %s\n", tps->executing ? "Yes" : "No");
+       ast_cli(a->fd, "Highest time (us):    %ld\n", tps->stats.highest_time_processed);
+       if (tps->stats.highest_time_task_file) {
+               ast_cli(a->fd, "  Highest task origin: %s:%d (%s)\n",
+                       tps->stats.highest_time_task_file,
+                       tps->stats.highest_time_task_line,
+                       tps->stats.highest_time_task_function);
+       }
+       ast_cli(a->fd, "Lowest time (us):     %ld\n", tps->stats.lowest_time_processed);
+
+       if (tps->tps_queue_size > 0) {
+               ast_cli(a->fd, "\nQueued Tasks:\n");
+               ast_cli(a->fd, "-------------------------------------------\n");
+
+               AST_LIST_TRAVERSE(&tps->tps_queue, task, list) {
+                       task_count++;
+                       if (task->file) {
+                               ast_cli(a->fd, "  Task #%d:\n", task_count);
+                               ast_cli(a->fd, "    Origin:   %s:%d\n", task->file, task->line);
+                               ast_cli(a->fd, "    Function: %s\n", task->function);
+                               ast_cli(a->fd, "    Type:     %s\n", task->wants_local ? "Local" : "Standard");
+                       } else {
+                               ast_cli(a->fd, "  Task #%d: (origin not available)\n", task_count);
+                       }
+               }
+               ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count);
+       } else {
+               ast_cli(a->fd, "\nNo tasks currently queued.\n");
+       }
+
+       ao2_unlock(tps);
+       ast_taskprocessor_unreference(tps);
+
+       ast_cli(a->fd, "\n");
+       return CLI_SUCCESS;
+}
+
 /* hash callback for astobj2 */
 static int tps_hash_cb(const void *obj, const int flags)
 {
@@ -866,8 +986,8 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
        if (DEBUG_ATLEAST(3)
                /* and tps_alert_count becomes zero or non-zero */
                && !old != !tps_alert_count) {
-               ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
-                       tps->name, tps_alert_count ? "triggered" : "cleared");
+               ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n",
+                       tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count);
        }
 
        if (tps->subsystem[0] != '\0') {
@@ -1028,11 +1148,12 @@ static void *default_listener_pvt_alloc(void)
 
        pvt = ast_calloc(1, sizeof(*pvt));
        if (!pvt) {
+               ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n");
                return NULL;
        }
        pvt->poll_thread = AST_PTHREADT_NULL;
        if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
-               ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+               ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno));
                ast_free(pvt);
                return NULL;
        }
@@ -1098,7 +1219,7 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
 static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
 {
        if (p && p->listener->callbacks->start(p->listener)) {
-               ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+               ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n",
                        p->name);
                ast_taskprocessor_unreference(p);
 
@@ -1118,7 +1239,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
        struct default_taskprocessor_listener_pvt *pvt;
 
        if (ast_strlen_zero(name)) {
-               ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
+               ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
                return NULL;
        }
        ao2_lock(tps_singletons);
@@ -1212,23 +1333,28 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
        int was_empty;
 
        if (!tps) {
-               ast_log(LOG_ERROR, "tps is NULL!\n");
+               ast_log(LOG_ERROR, "Taskprocessor is NULL!\n");
                return -1;
        }
 
        if (!t) {
-               ast_log(LOG_ERROR, "t is NULL!\n");
+               ast_log(LOG_ERROR, "Task is NULL!\n");
                return -1;
        }
 
+       if (t->file) {
+               ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n",
+                       tps->name, t->file, t->line, t->function);
+       }
+
        ao2_lock(tps);
        AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
        previous_size = tps->tps_queue_size++;
 
        if (tps->tps_queue_high <= tps->tps_queue_size) {
                if (!tps->high_water_alert) {
-                       ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
-                               tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
+                       ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n",
+                               tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : "");
                        tps->high_water_warned = 1;
                        tps->high_water_alert = 1;
                        tps_alert_add(tps, +1);
@@ -1242,14 +1368,26 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
        return 0;
 }
 
+int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
+       const char *file, int line, const char *function)
+{
+       return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
+}
+
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
 {
-       return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+       return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL);
+}
+
+int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap,
+       const char *file, int line, const char *function)
+{
+       return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function));
 }
 
 int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
 {
-       return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+       return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL);
 }
 
 int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
@@ -1284,6 +1422,11 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        struct ast_taskprocessor_local local;
        struct tps_task *t;
        long size;
+       struct timeval start;
+       long elapsed;
+       const char *task_file = NULL;
+       int task_line = 0;
+       const char *task_function = NULL;
 
        ao2_lock(tps);
        t = tps_taskprocessor_pop(tps);
@@ -1299,8 +1442,15 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
                local.local_data = tps->local_data;
                local.data = t->datap;
        }
+
+       /* Save task origin info before we free the task */
+       task_file = t->file;
+       task_line = t->line;
+       task_function = t->function;
        ao2_unlock(tps);
 
+       start = ast_tvnow();
+
        if (t->wants_local) {
                t->callback.execute_local(&local);
        } else {
@@ -1324,6 +1474,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        if (size >= tps->stats.max_qsize) {
                tps->stats.max_qsize = size + 1;
        }
+
+       elapsed = ast_tvdiff_us(ast_tvnow(), start);
+       if (elapsed > tps->stats.highest_time_processed) {
+               tps->stats.highest_time_processed = elapsed;
+               tps->stats.highest_time_task_file = task_file;
+               tps->stats.highest_time_task_line = task_line;
+               tps->stats.highest_time_task_function = task_function;
+       }
+       if (elapsed < tps->stats.lowest_time_processed) {
+               tps->stats.lowest_time_processed = elapsed;
+       }
+
        ao2_unlock(tps);
 
        /* If we executed a task, check for the transition to empty */
@@ -1393,6 +1555,11 @@ static void tps_reset_stats(struct ast_taskprocessor *tps)
        ao2_lock(tps);
        tps->stats._tasks_processed_count = 0;
        tps->stats.max_qsize = 0;
+       tps->stats.highest_time_processed = 0;
+       tps->stats.lowest_time_processed = 0;
+       tps->stats.highest_time_task_file = NULL;
+       tps->stats.highest_time_task_line = 0;
+       tps->stats.highest_time_task_function = NULL;
        ao2_unlock(tps);
 }
 
index 0969e627c36822054f127aa51fcf6cce63420780..1a815782f911f8f80073aed74506fbf6d36dbcb2 100644 (file)
@@ -608,8 +608,7 @@ static int queued_task_pushed(void *data)
  * \param listener The taskprocessor listener. The threadpool is the listener's private data
  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
  */
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
-               int was_empty)
+static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
        struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
        struct task_pushed_data *tpd;
@@ -954,15 +953,27 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
        return pool;
 }
 
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+#undef ast_threadpool_push
+#define ast_threadpool_push_internal(pool, task, data) \
+       __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
+
+int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
+       const char *file, int line, const char *function)
 {
        SCOPED_AO2LOCK(lock, pool);
        if (!pool->shutting_down) {
-               return ast_taskprocessor_push(pool->tps, task, data);
+               return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
        }
        return -1;
 }
 
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+{
+       return __ast_threadpool_push(pool, task, data, NULL, 0, NULL);
+}
+
 void ast_threadpool_shutdown(struct ast_threadpool *pool)
 {
        if (!pool) {
index 5b954b222682ee404a14f1ddb487de68aab408b2..6e31084756c96975add69489dc1f1e48083e96e7 100644 (file)
@@ -2096,13 +2096,48 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
        return ast_sip_create_serializer_group(name, NULL);
 }
 
-int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+#undef ast_sip_push_task
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function)
 {
        if (!serializer) {
                serializer = ast_serializer_pool_get(sip_serializer_pool);
        }
 
-       return ast_taskprocessor_push(serializer, sip_task, task_data);
+       return __ast_taskprocessor_push(serializer, sip_task, task_data, file, line, function);
+}
+
+/* ABI compatibility: Provide actual function symbol for external modules */
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+       return __ast_sip_push_task(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+/* ABI compatibility: Provide actual function symbols for wait functions */
+#undef ast_sip_push_task_wait_servant
+int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+       return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+#undef ast_sip_push_task_synchronous
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+       return __ast_sip_push_task_synchronous(serializer, sip_task, task_data, NULL, 0, NULL);
+}
+
+#undef ast_sip_push_task_wait_serializer
+int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+       return __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, NULL, 0, NULL);
 }
 
 struct sync_task_data {
@@ -2134,7 +2169,8 @@ static int sync_task(void *data)
        return ret;
 }
 
-static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+static int __ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function)
 {
        /* This method is an onion */
        struct sync_task_data std;
@@ -2145,7 +2181,7 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
        std.task = sip_task;
        std.task_data = task_data;
 
-       if (ast_sip_push_task(serializer, sync_task, &std)) {
+       if (__ast_sip_push_task(serializer, sync_task, &std, file, line, function)) {
                ast_mutex_destroy(&std.lock);
                ast_cond_destroy(&std.cond);
                return -1;
@@ -2162,21 +2198,24 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
        return std.fail;
 }
 
-int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function)
 {
        if (ast_sip_thread_is_servant()) {
                return sip_task(task_data);
        }
 
-       return ast_sip_push_task_wait(serializer, sip_task, task_data);
+       return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
 }
 
-int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function)
 {
-       return ast_sip_push_task_wait_servant(serializer, sip_task, task_data);
+       return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, file, line, function);
 }
 
-int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
+       const char *file, int line, const char *function)
 {
        if (!serializer) {
                /* Caller doesn't care which PJSIP serializer the task executes under. */
@@ -2195,7 +2234,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int
                return sip_task(task_data);
        }
 
-       return ast_sip_push_task_wait(serializer, sip_task, task_data);
+       return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
 }
 
 void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
index 537f77c7673b7756b72ac4404dddac6bc52a4bc2..e2e827fb10c5cbff685cfc98f35ed71e22185e3f 100644 (file)
 
 #include "asterisk.h"
 
+#include <unistd.h>
+
 #include "asterisk/test.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/module.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/serializer.h"
 #include "asterisk/threadpool.h"
+#include "asterisk/cli.h"
 
 /*!
  * \brief userdata associated with baseline taskprocessor test
@@ -963,6 +966,170 @@ AST_TEST_DEFINE(serializer_pool)
        return AST_TEST_PASS;
 }
 
+/*!
+ * \brief Test for CLI command "core show taskprocessor <name>"
+ *
+ * This test creates a taskprocessor, queues tasks with controlled execution,
+ * and verifies that the CLI command displays the queued tasks correctly.
+ */
+AST_TEST_DEFINE(taskprocessor_cli_show)
+{
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+       struct task_data *task_data1 = NULL;
+       struct task_data *task_data2 = NULL;
+       struct task_data *task_data3 = NULL;
+       int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0;
+       char cli_command[128];
+       int cli_output_fd[2];
+       char output_buffer[4096] = {0};
+       ssize_t bytes_read;
+       int res = AST_TEST_FAIL;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = "taskprocessor_cli_show";
+               info->category = "/main/taskprocessor/";
+               info->summary = "Test CLI command 'core show taskprocessor'";
+               info->description =
+                       "Verifies that the 'core show taskprocessor <name>' CLI command\n"
+                       "displays taskprocessor information and queued tasks correctly.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       /* Create a pipe to capture CLI output */
+       if (pipe(cli_output_fd) != 0) {
+               ast_test_status_update(test, "Failed to create pipe for CLI output\n");
+               return AST_TEST_FAIL;
+       }
+
+       /* Create taskprocessor */
+       tps = ast_taskprocessor_get("test_cli_taskprocessor", TPS_REF_DEFAULT);
+       if (!tps) {
+               ast_test_status_update(test, "Unable to create test taskprocessor\n");
+               close(cli_output_fd[0]);
+               close(cli_output_fd[1]);
+               return AST_TEST_FAIL;
+       }
+
+       /* Create tasks that will wait so they stay in the queue */
+       task_data1 = task_data_create();
+       task_data2 = task_data_create();
+       task_data3 = task_data_create();
+
+       if (!task_data1 || !task_data2 || !task_data3) {
+               ast_test_status_update(test, "Unable to create task_data\n");
+               goto cleanup;
+       }
+
+       /* Set a long wait time so tasks stay queued */
+       task_data1->wait_time = 2000; /* 2 seconds */
+       task_data2->wait_time = 2000;
+       task_data3->wait_time = 2000;
+
+       /* Queue the tasks */
+       if (ast_taskprocessor_push(tps, task, task_data1)) {
+               ast_test_status_update(test, "Failed to queue task 1\n");
+               goto cleanup;
+       }
+       task_queued1 = 1;
+
+       if (ast_taskprocessor_push(tps, task, task_data2)) {
+               ast_test_status_update(test, "Failed to queue task 2\n");
+               goto cleanup;
+       }
+       task_queued2 = 1;
+
+       if (ast_taskprocessor_push(tps, task, task_data3)) {
+               ast_test_status_update(test, "Failed to queue task 3\n");
+               goto cleanup;
+       }
+       task_queued3 = 1;
+
+       /* Execute the CLI command */
+       snprintf(cli_command, sizeof(cli_command), "core show taskprocessor name test_cli_taskprocessor");
+
+       if (ast_cli_command(cli_output_fd[1], cli_command) != 0) {
+               ast_test_status_update(test, "CLI command execution failed\n");
+               goto cleanup;
+       }
+
+       /* Close write end and read the output */
+       close(cli_output_fd[1]);
+       cli_output_fd[1] = -1;
+
+       bytes_read = read(cli_output_fd[0], output_buffer, sizeof(output_buffer) - 1);
+       if (bytes_read <= 0) {
+               ast_test_status_update(test, "Failed to read CLI output\n");
+               goto cleanup;
+       }
+       output_buffer[bytes_read] = '\0';
+
+       /* Log the output for inspection */
+       ast_test_status_update(test, "CLI Output:\n%s\n", output_buffer);
+
+       /* Verify the output contains expected information */
+       if (!strstr(output_buffer, "test_cli_taskprocessor")) {
+               ast_test_status_update(test, "Output missing taskprocessor name\n");
+               goto cleanup;
+       }
+
+       if (!strstr(output_buffer, "Current queue size")) {
+               ast_test_status_update(test, "Output missing queue size information\n");
+               goto cleanup;
+       }
+
+       /* Check for queued tasks section (at least one task should be shown) */
+       if (!strstr(output_buffer, "Queued Tasks") && !strstr(output_buffer, "Currently executing")) {
+               ast_test_status_update(test, "Output missing queued tasks or execution status\n");
+               goto cleanup;
+       }
+
+       /* Verify we see task information */
+       if (!strstr(output_buffer, "Task #")) {
+               ast_test_status_update(test, "Output missing task information\n");
+               goto cleanup;
+       }
+
+       ast_test_status_update(test, "CLI command output validated successfully\n");
+       res = AST_TEST_PASS;
+
+cleanup:
+
+       ast_test_status_update(test, "Waiting for tasks to complete\n");
+
+       /* Wait for tasks to complete */
+       if (task_data1) {
+               if (task_queued1) {
+                       task_wait(task_data1);
+               }
+               ao2_cleanup(task_data1);
+       }
+       if (task_data2) {
+               if (task_queued2) {
+                       task_wait(task_data2);
+               }
+               ao2_cleanup(task_data2);
+       }
+       if (task_data3) {
+               if (task_queued3) {
+                       task_wait(task_data3);
+               }
+               ao2_cleanup(task_data3);
+       }
+
+       if (cli_output_fd[0] >= 0) {
+               close(cli_output_fd[0]);
+       }
+       if (cli_output_fd[1] >= 0) {
+               close(cli_output_fd[1]);
+       }
+
+       ast_test_status_update(test, "Tasks complete\n");
+       return res;
+}
+
 static int unload_module(void)
 {
        ast_test_unregister(default_taskprocessor);
@@ -972,6 +1139,7 @@ static int unload_module(void)
        ast_test_unregister(taskprocessor_shutdown);
        ast_test_unregister(taskprocessor_push_local);
        ast_test_unregister(serializer_pool);
+       ast_test_unregister(taskprocessor_cli_show);
        return 0;
 }
 
@@ -984,6 +1152,7 @@ static int load_module(void)
        ast_test_register(taskprocessor_shutdown);
        ast_test_register(taskprocessor_push_local);
        ast_test_register(serializer_pool);
+       ast_test_register(taskprocessor_cli_show);
        return AST_MODULE_LOAD_SUCCESS;
 }