]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
main/stasis: Allow subscriptions to use a threadpool for message delivery
authorMatthew Jordan <mjordan@digium.com>
Mon, 1 Dec 2014 15:53:02 +0000 (15:53 +0000)
committerMatthew Jordan <mjordan@digium.com>
Mon, 1 Dec 2014 15:53:02 +0000 (15:53 +0000)
Prior to this patch, all Stasis subscriptions would receive a dedicated
thread for servicing published messages. In contrast, prior to r400178
(see review https://reviewboard.asterisk.org/r/2881/), the subscriptions
shared a thread pool. It was discovered during some initial work on Stasis
that, for a low subscription count with high message throughput, the
threadpool was not as performant as simply having a dedicated thread per
subscriber.

For situations where a subscriber receives a substantial number of messages
and is always present, the model of having a dedicated thread per subscriber
makes sense. While we still have plenty of subscriptions that would follow
this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into
the following two categories:
* Large number of subscriptions, specifically those tied to endpoints/peers.
* Low number of messages. Some subscriptions exist specifically to coordinate
  a single message - the subscription is created, a message is published, the
  delivery is synchronized, and the subscription is destroyed.
In both of the latter two cases, creating a dedicated thread is wasteful (and
in the case of a large number of peers/endpoints, harmful). In those cases,
having shared delivery threads is far more performant.

This patch adds the ability of a subscriber to Stasis to choose whether or not
their messages are dispatched on a dedicated thread or on a threadpool. The
threadpool is configurable through stasis.conf.

Review: https://reviewboard.asterisk.org/r/4193

ASTERISK-24533 #close
Reported by: xrobau
Tested by: xrobau

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@428681 65c4cc65-6c06-0410-ace0-fbb531ad65f3

26 files changed:
UPGRADE.txt
apps/app_queue.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
configs/stasis.conf.sample [new file with mode: 0644]
include/asterisk/stasis.h
include/asterisk/stasis_internal.h
include/asterisk/stasis_message_router.h
main/endpoints.c
main/stasis.c
main/stasis_cache.c
main/stasis_channels.c
main/stasis_message_router.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/res_jabber.c
res/res_pjsip_mwi.c
res/res_pjsip_pubsub.c
res/res_pjsip_refer.c
res/res_stasis_device_state.c
res/res_xmpp.c
tests/test_stasis.c

index 4a28e123911f6d7ed84acd4e3dde8ab6b12c5a5f..57ec625e900432e0be51973c5d3ef5e1f0836b92 100644 (file)
 ===
 ===========================================================
 
+From 12.7.0 to 12.8.0:
+
+Core:
+ - The core of Asterisk uses a message bus called "Stasis" to distribute
+   information to internal components. For performance reasons, the message
+   distribution was modified to make use of a thread pool instead of a
+   dedicated thread per consumer in certain cases. The initial settings for
+   the thread pool can now be configured using 'stasis.conf'. A sample
+   configuration file is provided in the samples directory.
+
 From 12.6.0 to 12.7.0:
 
 PJSIP:
index 52f856beb20f993f3c5f4847e9424073516bb0f2..2c3cc8f099b920bfb97ac1cbc07b119d89d67529 100644 (file)
@@ -5954,7 +5954,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
                return -1;
        }
 
-       queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+       queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all());
        if (!queue_data->bridge_router) {
                ao2_ref(queue_data, -1);
                return -1;
@@ -5969,7 +5969,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
        stasis_message_router_set_default(queue_data->bridge_router,
                        queue_bridge_cb, queue_data);
 
-       queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+       queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all());
        if (!queue_data->channel_router) {
                /* Unsubscribing from the bridge router will remove the only ref of queue_data,
                 * thus beginning the destruction process
index 62164104d255a1fce3317478b94724c8b6ac1edf..48e5da76dd3e892199547093aab94aa5ef5e958b 100644 (file)
@@ -12426,7 +12426,7 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
 
                        mailbox_specific_topic = ast_mwi_topic(tmp->mailbox);
                        if (mailbox_specific_topic) {
-                               tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                               tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                        }
                }
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
index ef0c0965634dbac11c1682da17e1168ed096967f..359160f687fe43b97a4e6cb210d0ef695ef8cbd9 100644 (file)
@@ -12966,7 +12966,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
 
                mailbox_specific_topic = ast_mwi_topic(peer->mailbox);
                if (mailbox_specific_topic) {
-                       peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                       peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                }
        }
 
index e1a73d8c1c84e210a83307e34b57671438304a73..a8459fad51aed57861e9cf83330a00a89bb568d8 100644 (file)
@@ -4199,7 +4199,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
 
                                        mailbox_specific_topic = ast_mwi_topic(e->mailbox);
                                        if (mailbox_specific_topic) {
-                                               e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                                               e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
                                        }
                                }
                                snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
index 4049d1a4c0f6aa8495ff41c7528743137b8b98bd..a2312e7745b7b1523f9fad79b75fd40192663a6d 100644 (file)
@@ -27504,7 +27504,7 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
                        if (!peer_name) {
                                return;
                        }
-                       mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name);
+                       mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name);
                }
        }
 }
index 8e98cbabe69615b2a2d5065a0d00be77e9c8b780..6f64b83ad199b22125fa4b3928ee13d0e69c20f5 100644 (file)
@@ -8318,7 +8318,7 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
 
                mailbox_specific_topic = ast_mwi_topic(l->mailbox);
                if (mailbox_specific_topic) {
-                       l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l);
+                       l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
                }
        }
 
index 0951ffd9892f2a9e9b4217aff88922571ae0f796..26dd1e9a3e42f9633f87f513854ebd879711fe3e 100644 (file)
@@ -9145,7 +9145,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
 
                mailbox_specific_topic = ast_mwi_topic(mbox_id);
                if (mailbox_specific_topic) {
-                       pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
+                       pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
                }
                if (!pri->mbox[i].sub) {
                        ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
diff --git a/configs/stasis.conf.sample b/configs/stasis.conf.sample
new file mode 100644 (file)
index 0000000..282bc42
--- /dev/null
@@ -0,0 +1,9 @@
+[threadpool]
+;initial_size = 5          ; Initial size of the threadpool.
+;                          ; 0 means the threadpool has no threads initially
+;                          ; until a task needs a thread.
+;idle_timeout_sec = 20     ; Number of seconds a thread should be idle before
+;                          ; dying. 0 means threads never time out.
+;max_size = 50             ; Maximum number of threads in the Stasis threadpool.
+;                          ; 0 means no limit to the number of threads in the
+;                          ; threadpool.
\ No newline at end of file
index 55ebb45148ad7cdebf848a74d04bd0c327b5d9e1..3f8e2a6c5f6ca1ed25e27936337f68c257a45de2 100644 (file)
@@ -519,6 +519,31 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
 
+/*!
+ * \brief Create a subscription whose callbacks occur on a thread pool
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but will almost certainly not
+ * always happen on the same thread. The invocation order of different subscriptions
+ * is unspecified.
+ *
+ * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
+ * dispatch items to its \c callback. This form of subscription should be used
+ * when many subscriptions may be made to the specified \c topic.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12.8.0
+ */
+struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
+       stasis_subscription_cb callback, void *data);
+
 /*!
  * \brief Cancel a subscription.
  *
index bb7b6cc0a2afbeafeb097a22814f6417ab802ae1..bc6122c2b2038ac9fd4842d5bbde1cdbfb87838f 100644 (file)
  * \param callback Callback function for subscription messages.
  * \param data Data to be passed to the callback, in addition to the message.
  * \param needs_mailbox Determines whether or not the subscription requires a mailbox.
- *  Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool;
+ *  Subscriptions with mailboxes will be delivered on some non-publisher thread;
  *  subscriptions without mailboxes will be delivered on the publisher thread.
+ * \param use_thread_pool Use the thread pool for the subscription. This is only
+ *  relevant if \c needs_mailbox is non-zero.
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
@@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
        stasis_subscription_cb callback,
        void *data,
-       int needs_mailbox);
+       int needs_mailbox,
+       int use_thread_pool);
 
 #endif /* STASIS_INTERNAL_H_ */
index 613a2bd7f0469a2a98f77084898ea3a2362eb514..89657a5ee5eddd10dfe653cd8c72162af18e1093 100644 (file)
@@ -58,6 +58,22 @@ struct stasis_message_router;
 struct stasis_message_router *stasis_message_router_create(
        struct stasis_topic *topic);
 
+/*!
+ * \brief Create a new message router object.
+ *
+ * The subscription created for this message router will dispatch
+ * callbacks on a thread pool.
+ *
+ * \param topic Topic to subscribe route to.
+ *
+ * \return New \ref stasis_message_router.
+ * \return \c NULL on error.
+ *
+ * \since 12.8.0
+ */
+struct stasis_message_router *stasis_message_router_create_pool(
+       struct stasis_topic *topic);
+
 /*!
  * \brief Unsubscribe the router from the upstream topic.
  *
index 07687eecc5066f515f6b1fe009870aa3d54b2968..ebf4fe762d15834a69812bb16ef2e4d58677a477 100644 (file)
@@ -306,7 +306,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
        }
 
        if (!ast_strlen_zero(resource)) {
-               endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
+               endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
                if (!endpoint->router) {
                        return NULL;
                }
index 594ec5e99c9641ea0e29269f4ea419219cf3a5a7..77ef7d91b408f34dc8a5cbcff7486e633216842e 100644 (file)
@@ -35,12 +35,14 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
 #include "asterisk/vector.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_endpoints.h"
+#include "asterisk/config_options.h"
 
 /*** DOCUMENTATION
        <managerEvent language="en_US" name="UserEvent">
@@ -60,6 +62,22 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
                        </see-also>
                </managerEventInstance>
        </managerEvent>
+       <configInfo name="stasis" language="en_US">
+               <configFile name="stasis.conf">
+                       <configObject name="threadpool">
+                               <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+                               <configOption name="initial_size" default="5">
+                                       <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+                               </configOption>
+                               <configOption name="idle_timeout_sec" default="20">
+                                       <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+                               </configOption>
+                               <configOption name="max_size" default="50">
+                                       <synopsis>Maximum number of threads in the threadpool.</synopsis>
+                               </configOption>
+                       </configObject>
+               </configFile>
+       </configInfo>
 ***/
 
 /*!
@@ -157,6 +175,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -302,7 +323,8 @@ struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
        stasis_subscription_cb callback,
        void *data,
-       int needs_mailbox)
+       int needs_mailbox,
+       int use_thread_pool)
 {
        RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
@@ -315,19 +337,19 @@ struct stasis_subscription *internal_stasis_subscribe(
        if (!sub) {
                return NULL;
        }
-
        ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
        if (needs_mailbox) {
                /* With a small number of subscribers, a thread-per-sub is
-                * acceptable. If our usage changes so that we have larger
-                * numbers of subscribers, we'll probably want to consider
-                * a threadpool. We had that originally, but with so few
-                * subscribers it was actually a performance loss instead of
-                * a gain.
+                * acceptable. For larger number of subscribers, a thread
+                * pool should be used.
                 */
-               sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
-                       TPS_REF_DEFAULT);
+               if (use_thread_pool) {
+                       sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+               } else {
+                       sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+                               TPS_REF_DEFAULT);
+               }
                if (!sub->mailbox) {
                        return NULL;
                }
@@ -356,7 +378,15 @@ struct stasis_subscription *stasis_subscribe(
        stasis_subscription_cb callback,
        void *data)
 {
-       return internal_stasis_subscribe(topic, callback, data, 1);
+       return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+       struct stasis_topic *topic,
+       stasis_subscription_cb callback,
+       void *data)
+{
+       return internal_stasis_subscribe(topic, callback, data, 1, 1);
 }
 
 static int sub_cleanup(void *data)
@@ -1215,6 +1245,68 @@ static struct ast_manager_event_blob *multi_user_event_to_ami(
                ast_str_buffer(body));
 }
 
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+       /*! Initial size of the thread pool */
+       int initial_size;
+       /*! Time, in seconds, before we expire a thread */
+       int idle_timeout_sec;
+       /*! Maximum number of thread to allow */
+       int max_size;
+};
+
+/*! \brief Configuration for stasis */
+struct stasis_config {
+       /*! Thread pool configuration options */
+       struct stasis_threadpool_conf *threadpool_options;
+};
+
+static struct aco_type threadpool_option = {
+       .type = ACO_GLOBAL,
+       .name = "threadpool",
+       .item_offset = offsetof(struct stasis_config, threadpool_options),
+       .category = "^threadpool$",
+       .category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
+struct aco_file stasis_conf = {
+       .filename = "stasis.conf",
+       .types = ACO_TYPES(&threadpool_option),
+};
+
+static void stasis_config_destructor(void *obj)
+{
+       struct stasis_config *cfg = obj;
+
+       ast_free(cfg->threadpool_options);
+}
+
+static void *stasis_config_alloc(void)
+{
+       struct stasis_config *cfg;
+
+       cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor);
+       if (!cfg) {
+               return NULL;
+       }
+
+       cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+       if (!cfg->threadpool_options) {
+               ao2_ref(cfg, -1);
+               return NULL;
+       }
+
+       return cfg;
+}
+
+static AO2_GLOBAL_OBJ_STATIC(globals);
+
+/*! \brief Register information about the configs being processed by this module */
+CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
+        .files = ACO_FILES(&stasis_conf),
+);
 
 /*!
  * @{ \brief Define multi user event message type(s).
@@ -1227,19 +1319,83 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
 
 /*! @} */
 
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+       ast_threadpool_shutdown(pool);
+       pool = NULL;
+}
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
        STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
+       aco_info_destroy(&cfg_info);
+       ao2_global_obj_release(globals);
 }
 
 int stasis_init(void)
 {
+       RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
        int cache_init;
+       struct ast_threadpool_options threadpool_opts = { 0, };
 
        /* Be sure the types are cleaned up after the message bus */
        ast_register_cleanup(stasis_cleanup);
+       ast_register_atexit(stasis_exit);
+
+       if (aco_info_init(&cfg_info)) {
+               return -1;
+       }
+
+       aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+               threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+               threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+               threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, max_size), 0,
+               INT_MAX);
+
+       if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
+               struct stasis_config *default_cfg = stasis_config_alloc();
+
+               if (!default_cfg) {
+                       return -1;
+               }
+
+               if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+                       ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+                       ao2_ref(default_cfg, -1);
+                       return -1;
+               }
+
+               ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+               ao2_global_obj_replace_unref(globals, default_cfg);
+               cfg = default_cfg;
+       } else {
+               cfg = ao2_global_obj_ref(globals);
+               if (!cfg) {
+                       ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+                       return -1;
+               }
+       }
+
+       threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+       threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+       threadpool_opts.auto_increment = 1;
+       threadpool_opts.max_size = cfg->threadpool_options->max_size;
+       threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+       pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+       if (!pool) {
+               ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+               return -1;
+       }
 
        cache_init = stasis_cache_init();
        if (cache_init != 0) {
index 8b4304e5f4f3f1a8b61627820c09e161750db198..e74688943a6ff15c51c7a0f6943bb7db7a907f5b 100644 (file)
@@ -881,7 +881,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
        ao2_ref(cache, +1);
        caching_topic->cache = cache;
 
-       sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
+       sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
        if (sub == NULL) {
                return NULL;
        }
index 615f5ea912d8917bd969ff7a6be896f3b52905ca..eda5163b4f4c062cd39c579cb0aa9426f5d4f971 100644 (file)
@@ -361,7 +361,7 @@ static void ast_channel_publish_dial_internal(struct ast_channel *caller,
        }
 
        if (forwarded) {
-               struct stasis_subscription *subscription = stasis_subscribe(ast_channel_topic(peer), dummy_event_cb, NULL);
+               struct stasis_subscription *subscription = stasis_subscribe_pool(ast_channel_topic(peer), dummy_event_cb, NULL);
 
                stasis_publish(ast_channel_topic(peer), msg);
                stasis_unsubscribe_and_join(subscription);
index da288e864cee214513eaf48e49a2688e4b38db44..a9e458456fcd3c27119bc0ccda677d9877534e02 100644 (file)
@@ -206,8 +206,8 @@ static void router_dispatch(void *data,
        }
 }
 
-struct stasis_message_router *stasis_message_router_create(
-       struct stasis_topic *topic)
+static struct stasis_message_router *stasis_message_router_create_internal(
+       struct stasis_topic *topic, int use_thread_pool)
 {
        int res;
        RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
@@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create(
                return NULL;
        }
 
-       router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       if (use_thread_pool) {
+               router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
+       } else {
+               router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       }
        if (!router->subscription) {
                return NULL;
        }
@@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create(
        return router;
 }
 
+struct stasis_message_router *stasis_message_router_create(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 0);
+}
+
+struct stasis_message_router *stasis_message_router_create_pool(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 1);
+}
+
 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
 {
        if (!router) {
index 5500e3316475a0379133c0fdb205fd1f6a0ad408..6bb447ec74d46d4425fcccffb350dd6e8819d5f2 100644 (file)
@@ -832,7 +832,7 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
                return -1;
        }
 
-       if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) {
+       if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) {
                /* Failed to create subscription */
                park_announce_subscription_data_destroy(pa_data);
                return -1;
index f411d041642c33a1f2add32dc6fdf07627ae5c84..fefa6ee2d73d9d4eaf99b72e2207af71d31f51b7 100644 (file)
@@ -192,7 +192,7 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
        strcpy(subscription_data->parkee_uuid, parkee_uuid);
        strcpy(subscription_data->parker_uuid, parker_uuid);
 
-       if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) {
+       if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
                return -1;
        }
 
index 39ef62e0156952a681cc11143c2cfed4130a3629..d9edcc3b16086156051b7276ef6d2b2a553502a9 100644 (file)
@@ -3300,7 +3300,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
 static void aji_init_event_distribution(struct aji_client *client)
 {
        if (!mwi_sub) {
-               mwi_sub = stasis_subscribe(ast_mwi_topic_all(), aji_mwi_cb, client);
+               mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), aji_mwi_cb, client);
        }
        if (!device_state_sub) {
                RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
index b1ae6ee16328beff0e17fc38e30fc769f464047a..c600a96e9d0cbc3d195a62519605e0812b6caf17 100644 (file)
@@ -142,7 +142,7 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
        strcpy(mwi_stasis_sub->mailbox, mailbox);
        ao2_ref(mwi_sub, +1);
        ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id);
-       mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub);
+       mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub);
        return mwi_stasis_sub;
 }
 
index 445268334942eb2fa2e208c58702e7fded8df2fe..a851ef0719caf4d90d1c89803d1306ea93569976 100644 (file)
@@ -1934,7 +1934,7 @@ static int load_module(void)
        if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                ast_sip_push_task(NULL, subscription_persistence_load, NULL);
        } else {
-               stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
        }
 
        ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
index 995276cb1bf5bff1682ac0a229e7ab2510a07871..992245d1b814c07ceec765ba351f5a40e1098f6b 100644 (file)
@@ -550,7 +550,7 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
                /* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to
                 * detect if the transfer target has answered the call
                 */
-               refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
+               refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
                if (!refer->progress->bridge_sub) {
                        struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200,
                                PJSIP_EVSUB_STATE_TERMINATED);
index 0130909ddc45efc8feed2a4bc04ff493c842a77d..3a99afa62c1504203ce187d278952497d2d39d63 100644 (file)
@@ -330,7 +330,7 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
                return 0;
        }
 
-       if (!(sub->sub = stasis_subscribe(
+       if (!(sub->sub = stasis_subscribe_pool(
                        ast_device_state_topic(sub->device_name),
                        device_state_cb, sub))) {
                ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
index 651c7ed85e2879c5cce88f612dc346ab5d1ac17f..4b9ef6c681da1c78bc6b0e893bc57a7edf01eb8f 100644 (file)
@@ -1606,7 +1606,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
        xmpp_pubsub_unsubscribe(client, "device_state");
        xmpp_pubsub_unsubscribe(client, "message_waiting");
 
-       if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
+       if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
                return;
        }
 
index 4c042c05be47df4cf0064fb03498910a14a2f80b..ccd98af41c41367a4abde3a67d77fa5ee570dbc4 100644 (file)
@@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(subscription_pool_messages)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+       int complete;
+       struct stasis_subscription_change *change;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
+               info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(0);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+       expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+       uut = stasis_unsubscribe(uut);
+       complete = consumer_wait_for_completion(consumer);
+       ast_test_validate(test, 1 == complete);
+
+       ast_test_validate(test, 2 == consumer->messages_rxed_len);
+       ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
+       ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
+
+       change = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       change = stasis_message_data(consumer->messages_rxed[1]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(publish)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(publish_pool)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       int actual_len;
+       const char *actual;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test publishing with a threadpool";
+               info->description = "Test publishing to a subscriber whose\n"
+                       "subscription dictates messages are received through a\n"
+                       "threadpool.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message_type = stasis_message_type_create("TestMessage", NULL);
+       test_message = stasis_message_create(test_message_type, test_data);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_wait_for(consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, test_data == actual);
+
+       return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(unsubscribe_stops_messages)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(subscription_interleaving)
+{
+       RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+       RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
+       RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
+
+       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+
+       int actual_len;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test sending interleaved events to a parent topic with different subscribers";
+               info->description = "Test sending events to a parent topic.\n"
+                       "This test creates three topics (one parent, two children)\n"
+                       "and publishes messages alternately between the children.\n"
+                       "It verifies that the messages are received in the expected\n"
+                       "order, for different subscription types: one with a dedicated\n"
+                       "thread, the other on the Stasis threadpool.\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       test_message_type = stasis_message_type_create("test", NULL);
+       ast_test_validate(test, NULL != test_message_type);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+
+       test_message1 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message1);
+       test_message2 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message2);
+       test_message3 = stasis_message_create(test_message_type, test_data);
+       ast_test_validate(test, NULL != test_message3);
+
+       parent_topic = stasis_topic_create("ParentTestTopic");
+       ast_test_validate(test, NULL != parent_topic);
+       topic1 = stasis_topic_create("Topic1");
+       ast_test_validate(test, NULL != topic1);
+       topic2 = stasis_topic_create("Topic2");
+       ast_test_validate(test, NULL != topic2);
+
+       forward_sub1 = stasis_forward_all(topic1, parent_topic);
+       ast_test_validate(test, NULL != forward_sub1);
+       forward_sub2 = stasis_forward_all(topic2, parent_topic);
+       ast_test_validate(test, NULL != forward_sub2);
+
+       consumer1 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer1);
+
+       consumer2 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer2);
+
+       sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
+       ast_test_validate(test, NULL != sub1);
+       ao2_ref(consumer1, +1);
+
+       sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
+       ast_test_validate(test, NULL != sub2);
+       ao2_ref(consumer2, +1);
+
+       stasis_publish(topic1, test_message1);
+       stasis_publish(topic2, test_message2);
+       stasis_publish(topic1, test_message3);
+
+       actual_len = consumer_wait_for(consumer1, 3);
+       ast_test_validate(test, 3 == actual_len);
+
+       actual_len = consumer_wait_for(consumer2, 3);
+       ast_test_validate(test, 3 == actual_len);
+
+       ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
+       ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
+       ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
+
+       ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
+       ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
+       ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
+
+       return AST_TEST_PASS;
+}
+
 struct cache_test_data {
        char *id;
        char *value;
@@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router)
        return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(router_pool)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+       int actual_len, ret;
+       struct stasis_message *actual;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test message routing via threadpool";
+               info->description = "Test simple message routing when\n"
+                       "the subscriptions dictate usage of the Stasis\n"
+                       "threadpool.\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer1 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer1);
+       consumer2 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer2);
+       consumer3 = consumer_create(1);
+       ast_test_validate(test, NULL != consumer3);
+
+       test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
+       ast_test_validate(test, NULL != test_message_type1);
+       test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
+       ast_test_validate(test, NULL != test_message_type2);
+       test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
+       ast_test_validate(test, NULL != test_message_type3);
+
+       uut = stasis_message_router_create_pool(topic);
+       ast_test_validate(test, NULL != uut);
+
+       ret = stasis_message_router_add(
+               uut, test_message_type1, consumer_exec, consumer1);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer1, +1);
+       ret = stasis_message_router_add(
+               uut, test_message_type2, consumer_exec, consumer2);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer2, +1);
+       ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+       ast_test_validate(test, 0 == ret);
+       ao2_ref(consumer3, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message1 = stasis_message_create(test_message_type1, test_data);
+       ast_test_validate(test, NULL != test_message1);
+       test_message2 = stasis_message_create(test_message_type2, test_data);
+       ast_test_validate(test, NULL != test_message2);
+       test_message3 = stasis_message_create(test_message_type3, test_data);
+       ast_test_validate(test, NULL != test_message3);
+
+       stasis_publish(topic, test_message1);
+       stasis_publish(topic, test_message2);
+       stasis_publish(topic, test_message3);
+
+       actual_len = consumer_wait_for(consumer1, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual_len = consumer_wait_for(consumer2, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual_len = consumer_wait_for(consumer3, 1);
+       ast_test_validate(test, 1 == actual_len);
+
+       actual = consumer1->messages_rxed[0];
+       ast_test_validate(test, test_message1 == actual);
+
+       actual = consumer2->messages_rxed[0];
+       ast_test_validate(test, test_message2 == actual);
+
+       actual = consumer3->messages_rxed[0];
+       ast_test_validate(test, test_message3 == actual);
+
+       /* consumer1 and consumer2 do not get the final message. */
+       ao2_cleanup(consumer1);
+       ao2_cleanup(consumer2);
+
+       return AST_TEST_PASS;
+}
+
 static const char *cache_simple(struct stasis_message *message)
 {
        const char *type_name =
@@ -1748,8 +2050,10 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(message_type);
        AST_TEST_UNREGISTER(message);
        AST_TEST_UNREGISTER(subscription_messages);
+       AST_TEST_UNREGISTER(subscription_pool_messages);
        AST_TEST_UNREGISTER(publish);
        AST_TEST_UNREGISTER(publish_sync);
+       AST_TEST_UNREGISTER(publish_pool);
        AST_TEST_UNREGISTER(unsubscribe_stops_messages);
        AST_TEST_UNREGISTER(forward);
        AST_TEST_UNREGISTER(cache_filter);
@@ -1757,8 +2061,10 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(cache_dump);
        AST_TEST_UNREGISTER(cache_eid_aggregate);
        AST_TEST_UNREGISTER(router);
+       AST_TEST_UNREGISTER(router_pool);
        AST_TEST_UNREGISTER(router_cache_updates);
        AST_TEST_UNREGISTER(interleaving);
+       AST_TEST_UNREGISTER(subscription_interleaving);
        AST_TEST_UNREGISTER(no_to_json);
        AST_TEST_UNREGISTER(to_json);
        AST_TEST_UNREGISTER(no_to_ami);
@@ -1773,8 +2079,10 @@ static int load_module(void)
        AST_TEST_REGISTER(message_type);
        AST_TEST_REGISTER(message);
        AST_TEST_REGISTER(subscription_messages);
+       AST_TEST_REGISTER(subscription_pool_messages);
        AST_TEST_REGISTER(publish);
        AST_TEST_REGISTER(publish_sync);
+       AST_TEST_REGISTER(publish_pool);
        AST_TEST_REGISTER(unsubscribe_stops_messages);
        AST_TEST_REGISTER(forward);
        AST_TEST_REGISTER(cache_filter);
@@ -1782,8 +2090,10 @@ static int load_module(void)
        AST_TEST_REGISTER(cache_dump);
        AST_TEST_REGISTER(cache_eid_aggregate);
        AST_TEST_REGISTER(router);
+       AST_TEST_REGISTER(router_pool);
        AST_TEST_REGISTER(router_cache_updates);
        AST_TEST_REGISTER(interleaving);
+       AST_TEST_REGISTER(subscription_interleaving);
        AST_TEST_REGISTER(no_to_json);
        AST_TEST_REGISTER(to_json);
        AST_TEST_REGISTER(no_to_ami);