]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
main/stasis: Allow subscriptions to use a threadpool for message delivery
authorMatthew Jordan <mjordan@digium.com>
Mon, 1 Dec 2014 17:59:21 +0000 (17:59 +0000)
committerMatthew Jordan <mjordan@digium.com>
Mon, 1 Dec 2014 17:59:21 +0000 (17:59 +0000)
Prior to this patch, all Stasis subscriptions would receive a dedicated
thread for servicing published messages. In contrast, prior to r400178
(see review https://reviewboard.asterisk.org/r/2881/), the subscriptions
shared a thread pool. It was discovered during some initial work on Stasis
that, for a low subscription count with high message throughput, the
threadpool was not as performant as simply having a dedicated thread per
subscriber.

For situations where a subscriber receives a substantial number of messages
and is always present, the model of having a dedicated thread per subscriber
makes sense. While we still have plenty of subscriptions that would follow
this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into
the following two categories:
* Large number of subscriptions, specifically those tied to endpoints/peers.
* Low number of messages. Some subscriptions exist specifically to coordinate
  a single message - the subscription is created, a message is published, the
  delivery is synchronized, and the subscription is destroyed.
In both of the latter two cases, creating a dedicated thread is wasteful (and
in the case of a large number of peers/endpoints, harmful). In those cases,
having shared delivery threads is far more performant.

This patch adds the ability of a subscriber to Stasis to choose whether or not
their messages are dispatched on a dedicated thread or on a threadpool. The
threadpool is configurable through stasis.conf.

Review: https://reviewboard.asterisk.org/r/4193

ASTERISK-24533 #close
Reported by: xrobau
Tested by: xrobau
........

Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12
........

Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3

24 files changed:
CHANGES
apps/app_queue.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
configs/samples/stasis.conf.sample
include/asterisk/stasis.h
include/asterisk/stasis_internal.h
include/asterisk/stasis_message_router.h
main/endpoints.c
main/stasis.c
main/stasis_cache.c
main/stasis_message_router.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/res_pjsip_mwi.c
res/res_pjsip_pubsub.c
res/res_pjsip_refer.c
res/res_stasis_device_state.c
res/res_xmpp.c
tests/test_stasis.c

diff --git a/CHANGES b/CHANGES
index 0c46e0d6dd2f57afe339f9f3ee897ca9c6930d38..f2c1314ddd9dd4b0e8ea2040185760c3fb210c2c 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -39,6 +39,16 @@ chan_pjsip
    the message will automatically be associated with the configured endpoint on the
    outbound registration.
 
+
+Core
+------------------
+ * The core of Asterisk uses a message bus called "Stasis" to distribute
+   information to internal components. For performance reasons, the message
+   distribution was modified to make use of a thread pool instead of a
+   dedicated thread per consumer in certain cases. The initial settings for
+   the thread pool can now be configured in 'stasis.conf'.
+
+
 Functions
 ------------------
 
@@ -355,7 +365,7 @@ AMI
  * AMI action PJSIPNotify may now send to a URI instead of only to a PJSIP
    endpoint as long as a default outbound endpoint is set. This also applies
    to the equivalent CLI command (pjsip send notify)
+
  * The AMI action PJSIPShowEndpoint now includes ContactStatusDetail sections
    that give information on Asterisk's attempts to qualify the endpoint.
 
index 4820c662d684f3f2113677900c985c702ef2a600..1cf1f6ad3d7ae6ca55741a7da4a0084c5b28ec01 100644 (file)
@@ -6097,7 +6097,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
                return -1;
        }
 
-       queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+       queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all());
        if (!queue_data->bridge_router) {
                ao2_ref(queue_data, -1);
                return -1;
@@ -6112,7 +6112,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
        stasis_message_router_set_default(queue_data->bridge_router,
                        queue_bridge_cb, queue_data);
 
-       queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+       queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all());
        if (!queue_data->channel_router) {
                /* Unsubscribing from the bridge router will remove the only ref of queue_data,
                 * thus beginning the destruction process
index 9ccec641f64881a8fb2de205da63ece662b4f28d..8ef0cec3020777948ab06c3665a5871b7ca3cb75 100644 (file)
@@ -12581,7 +12581,7 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
 
                        mailbox_specific_topic = ast_mwi_topic(tmp->mailbox);
                        if (mailbox_specific_topic) {
-                               tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                               tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                        }
                }
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
index d093438c8112b9dbded5718b2a393ac91ff32437..39861db0e3013a5226bfabab3b816dc6bd015a20 100644 (file)
@@ -13096,7 +13096,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
 
                mailbox_specific_topic = ast_mwi_topic(peer->mailbox);
                if (mailbox_specific_topic) {
-                       peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                       peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                }
        }
 
index 72898b8ff19671e84e84da014084eb2eaf58e862..95fa2dc7cdc84f6d0009bdbf220fc1c719131826 100644 (file)
@@ -4237,7 +4237,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
 
                                        mailbox_specific_topic = ast_mwi_topic(e->mailbox);
                                        if (mailbox_specific_topic) {
-                                               e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                                               e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                                        }
                                }
                                snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
index 192956362ce5e7cc1c44955ae6d9210127d904b8..2cbef5243a8aef7996dfe845bdfedafb85233a0b 100644 (file)
@@ -27258,7 +27258,7 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
                        if (!peer_name) {
                                return;
                        }
-                       mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name);
+                       mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name);
                }
        }
 }
index 4b7f353a0a9ad71b9cda9cc53e338c1fe991c173..b8202d33b4250c87d738fc655df92c13c54d1dec 100644 (file)
@@ -8295,7 +8295,7 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
 
                mailbox_specific_topic = ast_mwi_topic(l->mailbox);
                if (mailbox_specific_topic) {
-                       l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l);
+                       l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
                }
        }
 
index e9e17322f2b42f9cfa2fbe257001af9ce640ea1d..a26b5661142edd788c229b76dc29c88b2af8da5e 100644 (file)
@@ -9174,7 +9174,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
 
                mailbox_specific_topic = ast_mwi_topic(mbox_id);
                if (mailbox_specific_topic) {
-                       pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
+                       pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
                }
                if (!pri->mbox[i].sub) {
                        ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
index 3aac230cbb7580b5fb9441e089d25d0487935872..e591e7637fd651b0146cfa95574c3ad695288c2e 100644 (file)
@@ -1,3 +1,13 @@
+[threadpool]
+;initial_size = 5          ; Initial size of the threadpool.
+;                          ; 0 means the threadpool has no threads initially
+;                          ; until a task needs a thread.
+;idle_timeout_sec = 20     ; Number of seconds a thread should be idle before
+;                          ; dying. 0 means threads never time out.
+;max_size = 50             ; Maximum number of threads in the Stasis threadpool.
+;                          ; 0 means no limit to the number of threads in the
+;                          ; threadpool.
+
 [declined_message_types]
 ; This config section contains the names of message types that should be prevented
 ; from being created. By default, all message types are allowed to be created.
index 4189513ac4b7076eed9930cf8338b965cbcf748f..0b1b1e83f88ca01f5969af5c7993a794d0816af2 100644 (file)
@@ -541,6 +541,31 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
 
+/*!
+ * \brief Create a subscription whose callbacks occur on a thread pool
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but will almost certainly not
+ * always happen on the same thread. The invocation order of different subscriptions
+ * is unspecified.
+ *
+ * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
+ * dispatch items to its \c callback. This form of subscription should be used
+ * when many subscriptions may be made to the specified \c topic.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12.8.0
+ */
+struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
+       stasis_subscription_cb callback, void *data);
+
 /*!
  * \brief Cancel a subscription.
  *
index bb7b6cc0a2afbeafeb097a22814f6417ab802ae1..bc6122c2b2038ac9fd4842d5bbde1cdbfb87838f 100644 (file)
  * \param callback Callback function for subscription messages.
  * \param data Data to be passed to the callback, in addition to the message.
  * \param needs_mailbox Determines whether or not the subscription requires a mailbox.
- *  Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool;
+ *  Subscriptions with mailboxes will be delivered on some non-publisher thread;
  *  subscriptions without mailboxes will be delivered on the publisher thread.
+ * \param use_thread_pool Use the thread pool for the subscription. This is only
+ *  relevant if \c needs_mailbox is non-zero.
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
@@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
        stasis_subscription_cb callback,
        void *data,
-       int needs_mailbox);
+       int needs_mailbox,
+       int use_thread_pool);
 
 #endif /* STASIS_INTERNAL_H_ */
index 613a2bd7f0469a2a98f77084898ea3a2362eb514..89657a5ee5eddd10dfe653cd8c72162af18e1093 100644 (file)
@@ -58,6 +58,22 @@ struct stasis_message_router;
 struct stasis_message_router *stasis_message_router_create(
        struct stasis_topic *topic);
 
+/*!
+ * \brief Create a new message router object.
+ *
+ * The subscription created for this message router will dispatch
+ * callbacks on a thread pool.
+ *
+ * \param topic Topic to subscribe route to.
+ *
+ * \return New \ref stasis_message_router.
+ * \return \c NULL on error.
+ *
+ * \since 12.8.0
+ */
+struct stasis_message_router *stasis_message_router_create_pool(
+       struct stasis_topic *topic);
+
 /*!
  * \brief Unsubscribe the router from the upstream topic.
  *
index cc2eccc705ee6b1eff3e3d8cc78b269b1d90b2dc..f8cca45b89f82a907bf83e20cef8300793b27e8e 100644 (file)
@@ -310,7 +310,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
        }
 
        if (!ast_strlen_zero(resource)) {
-               endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
+               endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
                if (!endpoint->router) {
                        return NULL;
                }
index b85135a5b828e5c82cb72ae029859cd753a64990..dbb6e4c12820df77384697be54836ec0d9168776 100644 (file)
@@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
 #include "asterisk/vector.h"
@@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
        </managerEvent>
        <configInfo name="stasis" language="en_US">
                <configFile name="stasis.conf">
+                       <configObject name="threadpool">
+                               <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+                               <configOption name="initial_size" default="5">
+                                       <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+                               </configOption>
+                               <configOption name="idle_timeout_sec" default="20">
+                                       <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+                               </configOption>
+                               <configOption name="max_size" default="50">
+                                       <synopsis>Maximum number of threads in the threadpool.</synopsis>
+                               </configOption>
+                       </configObject>
                        <configObject name="declined_message_types">
                                <synopsis>Stasis message types for which to decline creation.</synopsis>
                                <configOption name="decline">
@@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
        stasis_subscription_cb callback,
        void *data,
-       int needs_mailbox)
+       int needs_mailbox,
+       int use_thread_pool)
 {
        RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
@@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe(
        if (!sub) {
                return NULL;
        }
-
        ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
        if (needs_mailbox) {
                /* With a small number of subscribers, a thread-per-sub is
-                * acceptable. If our usage changes so that we have larger
-                * numbers of subscribers, we'll probably want to consider
-                * a threadpool. We had that originally, but with so few
-                * subscribers it was actually a performance loss instead of
-                * a gain.
+                * acceptable. For larger number of subscribers, a thread
+                * pool should be used.
                 */
-               sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
-                       TPS_REF_DEFAULT);
+               if (use_thread_pool) {
+                       sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+               } else {
+                       sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+                               TPS_REF_DEFAULT);
+               }
                if (!sub->mailbox) {
                        return NULL;
                }
@@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe(
        stasis_subscription_cb callback,
        void *data)
 {
-       return internal_stasis_subscribe(topic, callback, data, 1);
+       return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+       struct stasis_topic *topic,
+       stasis_subscription_cb callback,
+       void *data)
+{
+       return internal_stasis_subscribe(topic, callback, data, 1, 1);
 }
 
 static int sub_cleanup(void *data)
@@ -1365,11 +1390,33 @@ struct stasis_declined_config {
        struct ao2_container *declined;
 };
 
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+       /*! Initial size of the thread pool */
+       int initial_size;
+       /*! Time, in seconds, before we expire a thread */
+       int idle_timeout_sec;
+       /*! Maximum number of thread to allow */
+       int max_size;
+};
 
 struct stasis_config {
+       /*! Thread pool configuration options */
+       struct stasis_threadpool_conf *threadpool_options;
+       /*! Declined message types */
        struct stasis_declined_config *declined_message_types;
 };
 
+static struct aco_type threadpool_option = {
+       .type = ACO_GLOBAL,
+       .name = "threadpool",
+       .item_offset = offsetof(struct stasis_config, threadpool_options),
+       .category = "^threadpool$",
+       .category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
 static struct aco_type declined_option = {
        .type = ACO_GLOBAL,
@@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
 
 struct aco_file stasis_conf = {
         .filename = "stasis.conf",
-       .types = ACO_TYPES(&declined_option),
+       .types = ACO_TYPES(&declined_option, &threadpool_option),
 };
 
 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
@@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
 static void stasis_declined_config_destructor(void *obj)
 {
        struct stasis_declined_config *declined = obj;
+
        ao2_cleanup(declined->declined);
 }
 
 static void stasis_config_destructor(void *obj)
 {
        struct stasis_config *cfg = obj;
+
        ao2_cleanup(cfg->declined_message_types);
+       ast_free(cfg->threadpool_options);
 }
 
 static void *stasis_config_alloc(void)
@@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void)
                return NULL;
        }
 
-       /* Allocate/initialize memory */
-       cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
+       cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+       if (!cfg->threadpool_options) {
+               ao2_ref(cfg, -1);
+               return NULL;
+       }
+
+       cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+               stasis_declined_config_destructor);
        if (!cfg->declined_message_types) {
-               goto error;
+               ao2_ref(cfg, -1);
+               return NULL;
        }
 
        cfg->declined_message_types->declined = ast_str_container_alloc(13);
        if (!cfg->declined_message_types->declined) {
-               goto error;
+               ao2_ref(cfg, -1);
+               return NULL;
        }
 
        return cfg;
-error:
-       ao2_ref(cfg, -1);
-       return NULL;
 }
 
 int stasis_message_type_declined(const char *name)
@@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
 
 /*! @} */
 
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+       ast_threadpool_shutdown(pool);
+       pool = NULL;
+}
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
@@ -1489,27 +1551,71 @@ static void stasis_cleanup(void)
 
 int stasis_init(void)
 {
+       RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
        int cache_init;
+       struct ast_threadpool_options threadpool_opts = { 0, };
 
        /* Be sure the types are cleaned up after the message bus */
        ast_register_cleanup(stasis_cleanup);
+       ast_register_atexit(stasis_exit);
 
        if (aco_info_init(&cfg_info)) {
                return -1;
        }
 
-       aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
+       aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+               declined_options, "", declined_handler, 0);
+       aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+               threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+               threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+               threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, max_size), 0,
+               INT_MAX);
 
        if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
-               RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
+               struct stasis_config *default_cfg = stasis_config_alloc();
+
+               if (!default_cfg) {
+                       return -1;
+               }
 
-               if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
+               if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+                       ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+                       ao2_ref(default_cfg, -1);
+                       return -1;
+               }
+
+               if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
                        ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
                        return -1;
                }
 
-               ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
-               ao2_global_obj_replace_unref(globals, stasis_cfg);
+               ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+               ao2_global_obj_replace_unref(globals, default_cfg);
+               cfg = default_cfg;
+       } else {
+               cfg = ao2_global_obj_ref(globals);
+               if (!cfg) {
+                       ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+                       return -1;
+               }
+       }
+
+       threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+       threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+       threadpool_opts.auto_increment = 1;
+       threadpool_opts.max_size = cfg->threadpool_options->max_size;
+       threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+       pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+       if (!pool) {
+               ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+               return -1;
        }
 
        cache_init = stasis_cache_init();
index c492307d63aef631cf15ca299b31e177f5fa0e39..9129c0064c9cfb5c93e99b5319e3037c6eaa3bd0 100644 (file)
@@ -894,7 +894,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
        ao2_ref(cache, +1);
        caching_topic->cache = cache;
 
-       sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
+       sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
        if (sub == NULL) {
                return NULL;
        }
index da288e864cee214513eaf48e49a2688e4b38db44..a9e458456fcd3c27119bc0ccda677d9877534e02 100644 (file)
@@ -206,8 +206,8 @@ static void router_dispatch(void *data,
        }
 }
 
-struct stasis_message_router *stasis_message_router_create(
-       struct stasis_topic *topic)
+static struct stasis_message_router *stasis_message_router_create_internal(
+       struct stasis_topic *topic, int use_thread_pool)
 {
        int res;
        RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
@@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create(
                return NULL;
        }
 
-       router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       if (use_thread_pool) {
+               router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
+       } else {
+               router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       }
        if (!router->subscription) {
                return NULL;
        }
@@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create(
        return router;
 }
 
+struct stasis_message_router *stasis_message_router_create(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 0);
+}
+
+struct stasis_message_router *stasis_message_router_create_pool(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 1);
+}
+
 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
 {
        if (!router) {
index 8bb57b62b2b709962faf3ea748e15a24565d8aed..c5214b36a336323d5808e22b2a1ee9a0267e0aae 100644 (file)
@@ -832,7 +832,7 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
                return -1;
        }
 
-       if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) {
+       if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) {
                /* Failed to create subscription */
                park_announce_subscription_data_destroy(pa_data);
                return -1;
index 61cb85f008d45baab6d243ea3f1542abed83134b..a21be90687ec565a32c600a6ba4df5db11c4a499 100644 (file)
@@ -192,7 +192,7 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
        strcpy(subscription_data->parkee_uuid, parkee_uuid);
        strcpy(subscription_data->parker_uuid, parker_uuid);
 
-       if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) {
+       if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
                return -1;
        }
 
index eaf0f32afcc4c6d5b34a6d10d30a638cdcd9e8de..bf0925dd480e0c678bdbf60a3713474237377d83 100644 (file)
@@ -138,7 +138,7 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
        strcpy(mwi_stasis_sub->mailbox, mailbox);
        ao2_ref(mwi_sub, +1);
        ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id);
-       mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub);
+       mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub);
        return mwi_stasis_sub;
 }
 
index 344bda3cd5a68bb468f4c0a22643f3cdd4420bc2..02deeb668e43f9fe3dbb32da717dd6f390ed5388 100644 (file)
@@ -4257,7 +4257,7 @@ static int load_module(void)
        if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                ast_sip_push_task(NULL, subscription_persistence_load, NULL);
        } else {
-               stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
        }
 
        ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
index 99d43fd2f0d127056311fd423a655084d52e2719..7b8c53761970f57d6395ac9ecda3cce21ffab93c 100644 (file)
@@ -550,7 +550,7 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
                /* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to
                 * detect if the transfer target has answered the call
                 */
-               refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
+               refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
                if (!refer->progress->bridge_sub) {
                        struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200,
                                PJSIP_EVSUB_STATE_TERMINATED);
index 40219c007cd59ee60603b751edfe069def881aeb..8a1c230491864689ef70f8c84b0965666f754cf4 100644 (file)
@@ -330,7 +330,7 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
                return 0;
        }
 
-       if (!(sub->sub = stasis_subscribe(
+       if (!(sub->sub = stasis_subscribe_pool(
                        ast_device_state_topic(sub->device_name),
                        device_state_cb, sub))) {
                ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
index 3cb6fc572cdd62dd5379f56f0df30940d09f9b40..e3eff9390b3d66fdd706fd48548e93bc1c470f95 100644 (file)
@@ -1606,7 +1606,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
        xmpp_pubsub_unsubscribe(client, "device_state");
        xmpp_pubsub_unsubscribe(client, "message_waiting");
 
-       if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
+       if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
                return;
        }
 
index ba82e83adb06aa625d949bbb9834071d252f8ceb..2e83e3b7096eecbd0b91d9e47b170ebdd442858a 100644 (file)
@@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(subscription_pool_messages)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+       int complete;
+       struct stasis_subscription_change *change;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
+               info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(0);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+       expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+       uut = stasis_unsubscribe(uut);
+       complete = consumer_wait_for_completion(consumer);
+       ast_test_validate(test, 1 == complete);
+
+       ast_test_validate(test, 2 == consumer->messages_rxed_len);
+       ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
+       ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
+
+       change = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       change = stasis_message_data(consumer->messages_rxed[1]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(publish)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(publish_pool)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       int actual_len;
+       const char *actual;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test publishing with a threadpool";
+               info->description = "Test publishing to a subscriber whose\n"
+                       "subscription dictates messages are received through a\n"
+                       "threadpool.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message_type = stasis_message_type_create("TestMessage", NULL);
+       test_message = stasis_message_create(test_message_type, test_data);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_wait_for(consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, test_data == actual);
+
+       return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(unsubscribe_stops_messages)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(subscription_interleaving)
+{
+       RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+       RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
+       RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
+
+       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+
+       int actual_len;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test sending interleaved events to a parent topic with different subscribers";
+               info->description = "Test sending events to a parent topic.\n"
+                       "This test creates three topics (one parent, two children)\n"
+                       "and publishes messages alternately between the children.\n"
+                       "It verifies that the messages are received in the expected\n"
+                       "order, for different subscription types: one with a dedicated\n"
+                       "thread, the other on the Stasis threadpool.\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       test_message_type = stasis_message_type_create("test", NULL);
+       ast_test_validate(test, NULL != test_message_type);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+
+       test_message1 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message1);
+       test_message2 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message2);
+       test_message3 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message3);
+
+       parent_topic = stasis_topic_create("ParentTestTopic");
+       ast_test_validate(test, NULL != parent_topic);
+       topic1 = stasis_topic_create("Topic1");
+       ast_test_validate(test, NULL != topic1);
+       topic2 = stasis_topic_create("Topic2");
+       ast_test_validate(test, NULL != topic2);
+
+       forward_sub1 = stasis_forward_all(topic1, parent_topic);
+       ast_test_validate(test, NULL != forward_sub1);
+       forward_sub2 = stasis_forward_all(topic2, parent_topic);
+       ast_test_validate(test, NULL != forward_sub2);
+
+       consumer1 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer1);
+
+       consumer2 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer2);
+
+       sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
+       ast_test_validate(test, NULL != sub1);
+       ao2_ref(consumer1, +1);
+
+       sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
+       ast_test_validate(test, NULL != sub2);
+       ao2_ref(consumer2, +1);
+
+       stasis_publish(topic1, test_message1);
+       stasis_publish(topic2, test_message2);
+       stasis_publish(topic1, test_message3);
+
+       actual_len = consumer_wait_for(consumer1, 3);
+       ast_test_validate(test, 3 == actual_len);
+
+       actual_len = consumer_wait_for(consumer2, 3);
+       ast_test_validate(test, 3 == actual_len);
+
+       ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
+       ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
+       ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
+
+       ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
+       ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
+       ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
+
+       return AST_TEST_PASS;
+}
+
 struct cache_test_data {
        char *id;
        char *value;
@@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(router_pool)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+       int actual_len, ret;
+       struct stasis_message *actual;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test message routing via threadpool";
+               info->description = "Test simple message routing when\n"
+                       "the subscriptions dictate usage of the Stasis\n"
+                       "threadpool.\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer1 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer1);
+       consumer2 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer2);
+       consumer3 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer3);
+
+       test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
+       ast_test_validate(test, NULL != test_message_type1);
+       test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
+       ast_test_validate(test, NULL != test_message_type2);
+       test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
+       ast_test_validate(test, NULL != test_message_type3);
+
+       uut = stasis_message_router_create_pool(topic);
+       ast_test_validate(test, NULL != uut);
+
+       ret = stasis_message_router_add(
+               uut, test_message_type1, consumer_exec, consumer1);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer1, +1);
+       ret = stasis_message_router_add(
+               uut, test_message_type2, consumer_exec, consumer2);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer2, +1);
+       ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer3, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message1 = stasis_message_create(test_message_type1, test_data);
+       ast_test_validate(test, NULL != test_message1);
+       test_message2 = stasis_message_create(test_message_type2, test_data);
+       ast_test_validate(test, NULL != test_message2);
+       test_message3 = stasis_message_create(test_message_type3, test_data);
+       ast_test_validate(test, NULL != test_message3);
+
+       stasis_publish(topic, test_message1);
+       stasis_publish(topic, test_message2);
+       stasis_publish(topic, test_message3);
+
+       actual_len = consumer_wait_for(consumer1, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual_len = consumer_wait_for(consumer2, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual_len = consumer_wait_for(consumer3, 1);
+       ast_test_validate(test, 1 == actual_len);
+
+       actual = consumer1->messages_rxed[0];
+       ast_test_validate(test, test_message1 == actual);
+
+       actual = consumer2->messages_rxed[0];
+       ast_test_validate(test, test_message2 == actual);
+
+       actual = consumer3->messages_rxed[0];
+       ast_test_validate(test, test_message3 == actual);
+
+       /* consumer1 and consumer2 do not get the final message. */
+       ao2_cleanup(consumer1);
+       ao2_cleanup(consumer2);
+
+       return AST_TEST_PASS;
+}
+
 static const char *cache_simple(struct stasis_message *message)
 {
        const char *type_name =
@@ -1748,8 +2050,10 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(message_type);
        AST_TEST_UNREGISTER(message);
        AST_TEST_UNREGISTER(subscription_messages);
+       AST_TEST_UNREGISTER(subscription_pool_messages);
        AST_TEST_UNREGISTER(publish);
        AST_TEST_UNREGISTER(publish_sync);
+       AST_TEST_UNREGISTER(publish_pool);
        AST_TEST_UNREGISTER(unsubscribe_stops_messages);
        AST_TEST_UNREGISTER(forward);
        AST_TEST_UNREGISTER(cache_filter);
@@ -1757,8 +2061,10 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(cache_dump);
        AST_TEST_UNREGISTER(cache_eid_aggregate);
        AST_TEST_UNREGISTER(router);
+       AST_TEST_UNREGISTER(router_pool);
        AST_TEST_UNREGISTER(router_cache_updates);
        AST_TEST_UNREGISTER(interleaving);
+       AST_TEST_UNREGISTER(subscription_interleaving);
        AST_TEST_UNREGISTER(no_to_json);
        AST_TEST_UNREGISTER(to_json);
        AST_TEST_UNREGISTER(no_to_ami);
@@ -1773,8 +2079,10 @@ static int load_module(void)
        AST_TEST_REGISTER(message_type);
        AST_TEST_REGISTER(message);
        AST_TEST_REGISTER(subscription_messages);
+       AST_TEST_REGISTER(subscription_pool_messages);
        AST_TEST_REGISTER(publish);
        AST_TEST_REGISTER(publish_sync);
+       AST_TEST_REGISTER(publish_pool);
        AST_TEST_REGISTER(unsubscribe_stops_messages);
        AST_TEST_REGISTER(forward);
        AST_TEST_REGISTER(cache_filter);
@@ -1782,8 +2090,10 @@ static int load_module(void)
        AST_TEST_REGISTER(cache_dump);
        AST_TEST_REGISTER(cache_eid_aggregate);
        AST_TEST_REGISTER(router);
+       AST_TEST_REGISTER(router_pool);
        AST_TEST_REGISTER(router_cache_updates);
        AST_TEST_REGISTER(interleaving);
+       AST_TEST_REGISTER(subscription_interleaving);
        AST_TEST_REGISTER(no_to_json);
        AST_TEST_REGISTER(to_json);
        AST_TEST_REGISTER(no_to_ami);