From: Matthew Jordan Date: Mon, 1 Dec 2014 15:53:02 +0000 (+0000) Subject: main/stasis: Allow subscriptions to use a threadpool for message delivery X-Git-Tag: 12.8.0-rc1~18 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4588a0bc3c51f8b135f38fe69685055197072cc1;p=thirdparty%2Fasterisk.git main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@428681 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- diff --git a/UPGRADE.txt b/UPGRADE.txt index 4a28e12391..57ec625e90 100644 --- a/UPGRADE.txt +++ b/UPGRADE.txt @@ -21,6 +21,16 @@ === =========================================================== +From 12.7.0 to 12.8.0: + +Core: + - The core of Asterisk uses a message bus called "Stasis" to distribute + information to internal components. For performance reasons, the message + distribution was modified to make use of a thread pool instead of a + dedicated thread per consumer in certain cases. The initial settings for + the thread pool can now be configured using 'stasis.conf'. A sample + configuration file is provided in the samples directory. + From 12.6.0 to 12.7.0: PJSIP: diff --git a/apps/app_queue.c b/apps/app_queue.c index 52f856beb2..2c3cc8f099 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -5954,7 +5954,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str return -1; } - queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all()); + queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all()); if (!queue_data->bridge_router) { ao2_ref(queue_data, -1); return -1; @@ -5969,7 +5969,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str stasis_message_router_set_default(queue_data->bridge_router, queue_bridge_cb, queue_data); - queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all()); + queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all()); if (!queue_data->channel_router) { /* Unsubscribing from the bridge router will remove the only ref of queue_data, * thus beginning the destruction process diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 62164104d2..48e5da76dd 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -12426,7 +12426,7 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf, mailbox_specific_topic = ast_mwi_topic(tmp->mailbox); if (mailbox_specific_topic) { - tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } #ifdef HAVE_DAHDI_LINEREVERSE_VMWI diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index ef0c096563..359160f687 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -12966,7 +12966,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st mailbox_specific_topic = ast_mwi_topic(peer->mailbox); if (mailbox_specific_topic) { - peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index e1a73d8c1c..a8459fad51 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -4199,7 +4199,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) mailbox_specific_topic = ast_mwi_topic(e->mailbox); if (mailbox_specific_topic) { - e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random()); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 4049d1a4c0..a2312e7745 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -27504,7 +27504,7 @@ static void add_peer_mwi_subs(struct sip_peer *peer) if (!peer_name) { return; } - mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name); + mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name); } } } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index 8e98cbabe6..6f64b83ad1 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -8318,7 +8318,7 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v mailbox_specific_topic = ast_mwi_topic(l->mailbox); if (mailbox_specific_topic) { - l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l); + l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l); } } diff --git a/channels/sig_pri.c b/channels/sig_pri.c index 0951ffd989..26dd1e9a3e 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -9145,7 +9145,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri) mailbox_specific_topic = ast_mwi_topic(mbox_id); if (mailbox_specific_topic) { - pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri); + pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri); } if (!pri->mbox[i].sub) { ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n", diff --git a/configs/stasis.conf.sample b/configs/stasis.conf.sample new file mode 100644 index 0000000000..282bc42edc --- /dev/null +++ b/configs/stasis.conf.sample @@ -0,0 +1,9 @@ +[threadpool] +;initial_size = 5 ; Initial size of the threadpool. +; ; 0 means the threadpool has no threads initially +; ; until a task needs a thread. +;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before +; ; dying. 0 means threads never time out. +;max_size = 50 ; Maximum number of threads in the Stasis threadpool. +; ; 0 means no limit to the number of threads in the +; ; threadpool. \ No newline at end of file diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 55ebb45148..3f8e2a6c5f 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -519,6 +519,31 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +/*! + * \brief Create a subscription whose callbacks occur on a thread pool + * + * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free + * up this reference), the subscription must be explicitly unsubscribed from its + * topic using stasis_unsubscribe(). + * + * The invocations of the callback are serialized, but will almost certainly not + * always happen on the same thread. The invocation order of different subscriptions + * is unspecified. + * + * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to + * dispatch items to its \c callback. This form of subscription should be used + * when many subscriptions may be made to the specified \c topic. + * + * \param topic Topic to subscribe to. + * \param callback Callback function for subscription messages. + * \param data Data to be passed to the callback, in addition to the message. + * \return New \ref stasis_subscription object. + * \return \c NULL on error. + * \since 12.8.0 + */ +struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic, + stasis_subscription_cb callback, void *data); + /*! * \brief Cancel a subscription. * diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h index bb7b6cc0a2..bc6122c2b2 100644 --- a/include/asterisk/stasis_internal.h +++ b/include/asterisk/stasis_internal.h @@ -52,8 +52,10 @@ * \param callback Callback function for subscription messages. * \param data Data to be passed to the callback, in addition to the message. * \param needs_mailbox Determines whether or not the subscription requires a mailbox. - * Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool; + * Subscriptions with mailboxes will be delivered on some non-publisher thread; * subscriptions without mailboxes will be delivered on the publisher thread. + * \param use_thread_pool Use the thread pool for the subscription. This is only + * relevant if \c needs_mailbox is non-zero. * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12 @@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, - int needs_mailbox); + int needs_mailbox, + int use_thread_pool); #endif /* STASIS_INTERNAL_H_ */ diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 613a2bd7f0..89657a5ee5 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -58,6 +58,22 @@ struct stasis_message_router; struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic); +/*! + * \brief Create a new message router object. + * + * The subscription created for this message router will dispatch + * callbacks on a thread pool. + * + * \param topic Topic to subscribe route to. + * + * \return New \ref stasis_message_router. + * \return \c NULL on error. + * + * \since 12.8.0 + */ +struct stasis_message_router *stasis_message_router_create_pool( + struct stasis_topic *topic); + /*! * \brief Unsubscribe the router from the upstream topic. * diff --git a/main/endpoints.c b/main/endpoints.c index 07687eecc5..ebf4fe762d 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -306,7 +306,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha } if (!ast_strlen_zero(resource)) { - endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); + endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } diff --git a/main/stasis.c b/main/stasis.c index 594ec5e99c..77ef7d91b4 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -35,12 +35,14 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" #include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_endpoints.h" +#include "asterisk/config_options.h" /*** DOCUMENTATION @@ -60,6 +62,22 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); + + + + Settings that configure the threadpool Stasis uses to deliver some messages. + + Initial number of threads in the message bus threadpool. + + + Number of seconds before an idle thread is disposed of. + + + Maximum number of threads in the threadpool. + + + + ***/ /*! @@ -157,6 +175,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 +/*! Thread pool for topics that don't want a dedicated taskprocessor */ +static struct ast_threadpool *pool; + STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ @@ -302,7 +323,8 @@ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, - int needs_mailbox) + int needs_mailbox, + int use_thread_pool) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); @@ -315,19 +337,19 @@ struct stasis_subscription *internal_stasis_subscribe( if (!sub) { return NULL; } - ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); if (needs_mailbox) { /* With a small number of subscribers, a thread-per-sub is - * acceptable. If our usage changes so that we have larger - * numbers of subscribers, we'll probably want to consider - * a threadpool. We had that originally, but with so few - * subscribers it was actually a performance loss instead of - * a gain. + * acceptable. For larger number of subscribers, a thread + * pool should be used. */ - sub->mailbox = ast_taskprocessor_get(sub->uniqueid, - TPS_REF_DEFAULT); + if (use_thread_pool) { + sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + } else { + sub->mailbox = ast_taskprocessor_get(sub->uniqueid, + TPS_REF_DEFAULT); + } if (!sub->mailbox) { return NULL; } @@ -356,7 +378,15 @@ struct stasis_subscription *stasis_subscribe( stasis_subscription_cb callback, void *data) { - return internal_stasis_subscribe(topic, callback, data, 1); + return internal_stasis_subscribe(topic, callback, data, 1, 0); +} + +struct stasis_subscription *stasis_subscribe_pool( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data) +{ + return internal_stasis_subscribe(topic, callback, data, 1, 1); } static int sub_cleanup(void *data) @@ -1215,6 +1245,68 @@ static struct ast_manager_event_blob *multi_user_event_to_ami( ast_str_buffer(body)); } +/*! \brief Threadpool configuration options */ +struct stasis_threadpool_conf { + /*! Initial size of the thread pool */ + int initial_size; + /*! Time, in seconds, before we expire a thread */ + int idle_timeout_sec; + /*! Maximum number of thread to allow */ + int max_size; +}; + +/*! \brief Configuration for stasis */ +struct stasis_config { + /*! Thread pool configuration options */ + struct stasis_threadpool_conf *threadpool_options; +}; + +static struct aco_type threadpool_option = { + .type = ACO_GLOBAL, + .name = "threadpool", + .item_offset = offsetof(struct stasis_config, threadpool_options), + .category = "^threadpool$", + .category_match = ACO_WHITELIST, +}; + +static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option); + +struct aco_file stasis_conf = { + .filename = "stasis.conf", + .types = ACO_TYPES(&threadpool_option), +}; + +static void stasis_config_destructor(void *obj) +{ + struct stasis_config *cfg = obj; + + ast_free(cfg->threadpool_options); +} + +static void *stasis_config_alloc(void) +{ + struct stasis_config *cfg; + + cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor); + if (!cfg) { + return NULL; + } + + cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options)); + if (!cfg->threadpool_options) { + ao2_ref(cfg, -1); + return NULL; + } + + return cfg; +} + +static AO2_GLOBAL_OBJ_STATIC(globals); + +/*! \brief Register information about the configs being processed by this module */ +CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc, + .files = ACO_FILES(&stasis_conf), +); /*! * @{ \brief Define multi user event message type(s). @@ -1227,19 +1319,83 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, /*! @} */ +/*! \brief Shutdown function */ +static void stasis_exit(void) +{ + ast_threadpool_shutdown(pool); + pool = NULL; +} + /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type); + aco_info_destroy(&cfg_info); + ao2_global_obj_release(globals); } int stasis_init(void) { + RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup); int cache_init; + struct ast_threadpool_options threadpool_opts = { 0, }; /* Be sure the types are cleaned up after the message bus */ ast_register_cleanup(stasis_cleanup); + ast_register_atexit(stasis_exit); + + if (aco_info_init(&cfg_info)) { + return -1; + } + + aco_option_register(&cfg_info, "initial_size", ACO_EXACT, + threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, initial_size), 0, + INT_MAX); + aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT, + threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0, + INT_MAX); + aco_option_register(&cfg_info, "max_size", ACO_EXACT, + threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, max_size), 0, + INT_MAX); + + if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) { + struct stasis_config *default_cfg = stasis_config_alloc(); + + if (!default_cfg) { + return -1; + } + + if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { + ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); + ao2_ref(default_cfg, -1); + return -1; + } + + ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n"); + ao2_global_obj_replace_unref(globals, default_cfg); + cfg = default_cfg; + } else { + cfg = ao2_global_obj_ref(globals); + if (!cfg) { + ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n"); + return -1; + } + } + + threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION; + threadpool_opts.initial_size = cfg->threadpool_options->initial_size; + threadpool_opts.auto_increment = 1; + threadpool_opts.max_size = cfg->threadpool_options->max_size; + threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; + pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts); + if (!pool) { + ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); + return -1; + } cache_init = stasis_cache_init(); if (cache_init != 0) { diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 8b4304e5f4..e74688943a 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -881,7 +881,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or ao2_ref(cache, +1); caching_topic->cache = cache; - sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0); + sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0); if (sub == NULL) { return NULL; } diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 615f5ea912..eda5163b4f 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -361,7 +361,7 @@ static void ast_channel_publish_dial_internal(struct ast_channel *caller, } if (forwarded) { - struct stasis_subscription *subscription = stasis_subscribe(ast_channel_topic(peer), dummy_event_cb, NULL); + struct stasis_subscription *subscription = stasis_subscribe_pool(ast_channel_topic(peer), dummy_event_cb, NULL); stasis_publish(ast_channel_topic(peer), msg); stasis_unsubscribe_and_join(subscription); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index da288e864c..a9e458456f 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -206,8 +206,8 @@ static void router_dispatch(void *data, } } -struct stasis_message_router *stasis_message_router_create( - struct stasis_topic *topic) +static struct stasis_message_router *stasis_message_router_create_internal( + struct stasis_topic *topic, int use_thread_pool) { int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); @@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->subscription = stasis_subscribe(topic, router_dispatch, router); + if (use_thread_pool) { + router->subscription = stasis_subscribe_pool(topic, router_dispatch, router); + } else { + router->subscription = stasis_subscribe(topic, router_dispatch, router); + } if (!router->subscription) { return NULL; } @@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create( return router; } +struct stasis_message_router *stasis_message_router_create( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 0); +} + +struct stasis_message_router *stasis_message_router_create_pool( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 1); +} + void stasis_message_router_unsubscribe(struct stasis_message_router *router) { if (!router) { diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c index 5500e33164..6bb447ec74 100644 --- a/res/parking/parking_applications.c +++ b/res/parking/parking_applications.c @@ -832,7 +832,7 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data return -1; } - if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) { + if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) { /* Failed to create subscription */ park_announce_subscription_data_destroy(pa_data); return -1; diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c index f411d04164..fefa6ee2d7 100644 --- a/res/parking/parking_bridge_features.c +++ b/res/parking/parking_bridge_features.c @@ -192,7 +192,7 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char strcpy(subscription_data->parkee_uuid, parkee_uuid); strcpy(subscription_data->parker_uuid, parker_uuid); - if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) { + if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) { return -1; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 39ef62e015..d9edcc3b16 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -3300,7 +3300,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags) static void aji_init_event_distribution(struct aji_client *client) { if (!mwi_sub) { - mwi_sub = stasis_subscribe(ast_mwi_topic_all(), aji_mwi_cb, client); + mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), aji_mwi_cb, client); } if (!device_state_sub) { RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index b1ae6ee163..c600a96e9d 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -142,7 +142,7 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char strcpy(mwi_stasis_sub->mailbox, mailbox); ao2_ref(mwi_sub, +1); ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id); - mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub); + mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub); return mwi_stasis_sub; } diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 4452683349..a851ef0719 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -1934,7 +1934,7 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, subscription_persistence_load, NULL); } else { - stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); } ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 995276cb1b..992245d1b8 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -550,7 +550,7 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann /* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to * detect if the transfer target has answered the call */ - refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress); + refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress); if (!refer->progress->bridge_sub) { struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200, PJSIP_EVSUB_STATE_TERMINATED); diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c index 0130909ddc..3a99afa62c 100644 --- a/res/res_stasis_device_state.c +++ b/res/res_stasis_device_state.c @@ -330,7 +330,7 @@ static int subscribe_device_state(struct stasis_app *app, void *obj) return 0; } - if (!(sub->sub = stasis_subscribe( + if (!(sub->sub = stasis_subscribe_pool( ast_device_state_topic(sub->device_name), device_state_cb, sub))) { ast_log(LOG_ERROR, "Unable to subscribe to device %s\n", diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 651c7ed85e..4b9ef6c681 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1606,7 +1606,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) xmpp_pubsub_unsubscribe(client, "device_state"); xmpp_pubsub_unsubscribe(client, "message_waiting"); - if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { + if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 4c042c05be..ccd98af41c 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_pool_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(char *, expected_uniqueid, NULL, ast_free); + int complete; + struct stasis_subscription_change *change; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription"; + info->description = "Test subscribe/unsubscribe messages using a threadpool subscription"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(0); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut)); + + uut = stasis_unsubscribe(uut); + complete = consumer_wait_for_completion(consumer); + ast_test_validate(test, 1 == complete); + + ast_test_validate(test, 2 == consumer->messages_rxed_len); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0])); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1])); + + change = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Subscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + change = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(publish) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test publishing with a threadpool"; + info->description = "Test publishing to a subscriber whose\n" + "subscription dictates messages are received through a\n" + "threadpool."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic with different subscribers"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order, for different subscription types: one with a dedicated\n" + "thread, the other on the Stasis threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test", NULL); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + + sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1); + ast_test_validate(test, NULL != sub1); + ao2_ref(consumer1, +1); + + sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2); + ast_test_validate(test, NULL != sub2); + ao2_ref(consumer2, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer1, 3); + ast_test_validate(test, 3 == actual_len); + + actual_len = consumer_wait_for(consumer2, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]); + + ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +AST_TEST_DEFINE(router_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test message routing via threadpool"; + info->description = "Test simple message routing when\n" + "the subscriptions dictate usage of the Stasis\n" + "threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("TestMessage1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("TestMessage2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("TestMessage3", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create_pool(topic); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, test_message_type2, consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, test_message1 == actual); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, test_message2 == actual); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + static const char *cache_simple(struct stasis_message *message) { const char *type_name = @@ -1748,8 +2050,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(message_type); AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); + AST_TEST_UNREGISTER(subscription_pool_messages); AST_TEST_UNREGISTER(publish); AST_TEST_UNREGISTER(publish_sync); + AST_TEST_UNREGISTER(publish_pool); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1757,8 +2061,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(cache_eid_aggregate); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_pool); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); + AST_TEST_UNREGISTER(subscription_interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); AST_TEST_UNREGISTER(no_to_ami); @@ -1773,8 +2079,10 @@ static int load_module(void) AST_TEST_REGISTER(message_type); AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); + AST_TEST_REGISTER(subscription_pool_messages); AST_TEST_REGISTER(publish); AST_TEST_REGISTER(publish_sync); + AST_TEST_REGISTER(publish_pool); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); @@ -1782,8 +2090,10 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(cache_eid_aggregate); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_pool); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); + AST_TEST_REGISTER(subscription_interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json); AST_TEST_REGISTER(no_to_ami);