/*!
* \brief Add a route to a message router.
*
+ * A particular \a message_type may have at most one route per \a router. If
+ * you route \ref stasis_cache_update messages, the callback will only receive
+ * updates for types not handled by routes added with
+ * stasis_message_router_add_cache_update().
+ *
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
* \since 12
*/
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data);
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data);
+
+/*!
+ * \brief Add a route for \ref stasis_cache_update messages to a message router.
+ *
+ * A particular \a message_type may have at most one cache route per \a router.
+ * These are distinct from regular routes, so one could have both a regular
+ * route and a cache route for the same \a message_type.
+ *
+ * \param router Router to add the route to.
+ * \param message_type Subtype of cache update to route.
+ * \param callback Callback to forard messages of \a message_type to.
+ * \param data Data pointer to pass to \a callback.
+ *
+ * \retval 0 on success
+ * \retval -1 on failure
+ *
+ * \since 12
+ */
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data);
/*!
* \brief Remove a route from a message router.
* \since 12
*/
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type);
+ struct stasis_message_type *message_type);
+
+/*!
+ * \brief Remove a cache route from a message router.
+ *
+ * \param router Router to remove the route from.
+ * \param message_type Type of message to route.
+ *
+ * \since 12
+ */
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type);
/*!
* \brief Sets the default route of a router.
struct cdr_object *it_cdr;
ast_assert(update != NULL);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
if (!stasis_router) {
return -1;
}
- stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
update = stasis_message_data(message);
- if (ast_bridge_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_bridge_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
return -1;
}
- /* BUGBUG - This should really route off of the manager_router, but
- * can't b/c manager_channels is already routing the
- * stasis_cache_update_type() messages. Having a separate router can
- * cause some message ordering issues with bridge and channel messages.
- */
- bridge_state_router = stasis_message_router_create(bridge_topic);
+ bridge_state_router = ast_manager_get_message_router();
if (!bridge_state_router) {
return -1;
}
- ret |= stasis_message_router_add(bridge_state_router,
- stasis_cache_update_type(),
- bridge_snapshot_update,
- NULL);
+ ret |= stasis_message_router_add_cache_update(bridge_state_router,
+ ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_bridge_merge_message_type(),
- bridge_merge_cb,
- NULL);
+ ast_bridge_merge_message_type(), bridge_merge_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_entered_bridge_type(),
- channel_enter_cb,
- NULL);
+ ast_channel_entered_bridge_type(), channel_enter_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_left_bridge_type(),
- channel_leave_cb,
- NULL);
+ ast_channel_left_bridge_type(), channel_leave_cb, NULL);
ret |= ast_manager_register_xml_core("BridgeList", 0, manager_bridges_list);
ret |= ast_manager_register_xml_core("BridgeInfo", 0, manager_bridge_info);
update = stasis_message_data(message);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
ast_register_atexit(manager_channels_shutdown);
- ret |= stasis_message_router_add(message_router,
- stasis_cache_update_type(),
- channel_snapshot_update,
- NULL);
+ ret |= stasis_message_router_add_cache_update(message_router,
+ ast_channel_snapshot_type(), channel_snapshot_update, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_user_event_type(),
- channel_user_event_cb,
- NULL);
+ ast_channel_user_event_type(), channel_user_event_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_begin_type(),
- channel_dtmf_begin_cb,
- NULL);
+ ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_end_type(),
- channel_dtmf_end_cb,
- NULL);
+ ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_request_type(),
- channel_hangup_request_cb,
- NULL);
+ ast_channel_hangup_request_type(), channel_hangup_request_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dial_type(),
- channel_dial_cb,
- NULL);
+ ast_channel_dial_type(), channel_dial_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hold_type(),
- channel_hold_cb,
- NULL);
+ ast_channel_hold_type(), channel_hold_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_unhold_type(),
- channel_unhold_cb,
- NULL);
+ ast_channel_unhold_type(), channel_unhold_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_fax_type(),
- channel_fax_cb,
- NULL);
+ ast_channel_fax_type(), channel_fax_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_start_type(),
- channel_chanspy_start_cb,
- NULL);
+ ast_channel_chanspy_start_type(), channel_chanspy_start_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_stop_type(),
- channel_chanspy_stop_cb,
- NULL);
+ ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_handler_type(),
- channel_hangup_handler_cb,
- NULL);
+ ast_channel_hangup_handler_type(), channel_hangup_handler_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_moh_start_type(),
- channel_moh_start_cb,
- NULL);
+ ast_channel_moh_start_type(), channel_moh_start_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_moh_stop_type(),
- channel_moh_stop_cb,
- NULL);
+ ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_start_type(),
- channel_monitor_start_cb,
- NULL);
+ ast_channel_monitor_start_type(), channel_monitor_start_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_stop_type(),
- channel_monitor_stop_cb,
- NULL);
+ ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL);
/* If somehow we failed to add any routes, just shut down the whole
* thing and fail it.
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
+/*! Number of hash buckets for the route table. Keep it prime! */
+#define ROUTE_TABLE_BUCKETS 7
+
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
+ /*! Subscribed routes for \ref stasi_cache_update messages */
+ struct ao2_container *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
};
ao2_cleanup(router->routes);
router->routes = NULL;
+ ao2_cleanup(router->cache_routes);
+ router->cache_routes = NULL;
+
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
-static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type)
+static struct stasis_message_route *find_route(
+ struct stasis_message_router *router,
+ struct stasis_message *message)
{
- return ao2_find(router->routes, message_type, OBJ_KEY);
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_type *type = stasis_message_type(message);
+ SCOPED_AO2LOCK(lock, router);
+
+ if (type == stasis_cache_update_type()) {
+ /* Find a cache route */
+ struct stasis_cache_update *update =
+ stasis_message_data(message);
+ route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Find a regular route */
+ route = ao2_find(router->routes, type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Maybe the default route, then? */
+ if ((route = router->default_route)) {
+ ao2_ref(route, +1);
+ }
+ }
+
+ if (route == NULL) {
+ return NULL;
+ }
+
+ ao2_ref(route, +1);
+ return route;
}
static void router_dispatch(void *data,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
- struct stasis_message_type *type = stasis_message_type(message);
- {
- SCOPED_AO2LOCK(lock, router);
-
- if (!(route = find_route(router, type))) {
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
- }
- }
+ route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
}
+
if (stasis_subscription_final_message(sub, message)) {
- router_needs_cleanup = router;
- return;
+ ao2_cleanup(router);
}
-
}
struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) {
+ router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
+ route_cmp);
+ if (!router->routes) {
+ return NULL;
+ }
+
+ router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
+ route_hash, route_cmp);
+ if (!router->cache_routes) {
return NULL;
}
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
- if ((existing_route = find_route(router, route->message_type))) {
+ existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
return 0;
}
+static int add_cache_route(struct stasis_message_router *router,
+ struct stasis_message_route *route)
+{
+ RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, router);
+
+ existing_route = ao2_find(router->cache_routes, route->message_type,
+ OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
+ return -1;
+ }
+
+ ao2_link(router->cache_routes, route);
+ return 0;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
return add_route(router, route);
}
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
+{
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+ route = route_create(message_type, callback, data);
+ if (!route) {
+ return -1;
+ }
+
+ return add_cache_route(router, route);
+}
+
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type)
+ struct stasis_message_type *message_type)
+{
+ SCOPED_AO2LOCK(lock, router);
+
+ ao2_find(router->routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+}
+
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
- ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ ao2_find(router->cache_routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
{
struct timeval start = ast_tvnow();
struct timespec end = {
- .tv_sec = start.tv_sec + 30,
+ .tv_sec = start.tv_sec + 3,
.tv_nsec = start.tv_usec * 1000
};
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
return AST_TEST_PASS;
}
+static const char *cache_simple(struct stasis_message *message) {
+ const char *type_name =
+ stasis_message_type_name(stasis_message_type(message));
+ if (!ast_begins_with(type_name, "Cache")) {
+ return NULL;
+ }
+
+ return "cached";
+}
+
+AST_TEST_DEFINE(router_cache_updates)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
+ struct stasis_cache_update *update;
+ int actual_len, ret;
+ struct stasis_message *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test special handling cache_update messages";
+ info->description = "Test special handling cache_update messages";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ caching_topic = stasis_caching_topic_create(topic, cache_simple);
+ ast_test_validate(test, NULL != caching_topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+ consumer3 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer3);
+
+ test_message_type1 = stasis_message_type_create("Cache1", NULL);
+ ast_test_validate(test, NULL != test_message_type1);
+ test_message_type2 = stasis_message_type_create("Cache2", NULL);
+ ast_test_validate(test, NULL != test_message_type2);
+ test_message_type3 = stasis_message_type_create("NonCache", NULL);
+ ast_test_validate(test, NULL != test_message_type3);
+
+ uut = stasis_message_router_create(
+ stasis_caching_get_topic(caching_topic));
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add_cache_update(
+ uut, test_message_type1, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer1, +1);
+ ret = stasis_message_router_add(
+ uut, stasis_cache_update_type(), consumer_exec, consumer2);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer2, +1);
+ ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer3, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message1 = stasis_message_create(test_message_type1, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type2, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type3, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual = consumer1->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type1 == update->type);
+ ast_test_validate(test, test_message1 == update->new_snapshot);
+
+ actual = consumer2->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type2 == update->type);
+ ast_test_validate(test, test_message2 == update->new_snapshot);
+
+ actual = consumer3->messages_rxed[0];
+ ast_test_validate(test, test_message3 == actual);
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(no_to_json)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);