]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Stasis performance improvements
authorDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 15:24:00 +0000 (15:24 +0000)
committerDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 15:24:00 +0000 (15:24 +0000)
This patch addresses several performance problems that were found in
the initial performance testing of Asterisk 12.

The Stasis dispatch object was allocated as an AO2 object, even though
it has a very confined lifecycle. This was replaced with a straight
ast_malloc().

The Stasis message router was spending an inordinate amount of time
searching hash tables. In this case, most of our routers had 6 or
fewer routes in them to begin with. This was replaced with an array
that's searched linearly for the route.

We more heavily rely on AO2 objects in Asterisk 12, and the memset()
in ao2_ref() actually became noticeable on the profile. This was
#ifdef'ed to only run when AO2_DEBUG was enabled.

After being misled by an erroneous comment in taskprocessor.c during
profiling, the wrong comment was removed.

Review: https://reviewboard.asterisk.org/r/2873/

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400138 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis_message_router.h
main/astobj2.c
main/stasis.c
main/stasis_message_router.c
main/taskprocessor.c
res/res_pjsip/include/res_pjsip_private.h
tests/test_stasis.c

index b14868b4a5ff6a54e9bf4122852e78dfd555e53d..3209adb16778de3768a9c4082d226192b964ed44 100644 (file)
@@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
  * updates for types not handled by routes added with
  * stasis_message_router_add_cache_update().
  *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
  * \param router Router to add the route to.
  * \param message_type Type of message to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router,
  * These are distinct from regular routes, so one could have both a regular
  * route and a cache route for the same \a message_type.
  *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
  * \param router Router to add the route to.
  * \param message_type Subtype of cache update to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
 /*!
  * \brief Remove a route from a message router.
  *
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
  * \param router Router to remove the route from.
  * \param message_type Type of message to route.
  *
@@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router,
 /*!
  * \brief Remove a cache route from a message router.
  *
+ * If a route is removed from another thread, there is no notification that
+ * all messages using this route have been processed. This typically means that
+ * the associated \c data pointer for this route must be kept until the
+ * route itself is disposed of.
+ *
  * \param router Router to remove the route from.
  * \param message_type Type of message to route.
  *
index 88a6a6a234226d765f2fc986e6781db932249512..88801bd2febcb8ad4ec7d278e65fafd52dfdfbcb 100644 (file)
@@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
        ast_atomic_fetchadd_int(&ao2.total_objects, -1);
 #endif
 
+       /* In case someone uses an object after it's been freed */
+       obj->priv_data.magic = 0;
+
        switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
        case AO2_ALLOC_OPT_LOCK_MUTEX:
                obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
                ast_mutex_destroy(&obj_mutex->mutex.lock);
 
-               /*
-                * For safety, zero-out the astobj2_lock header and also the
-                * first word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
                ast_free(obj_mutex);
                break;
        case AO2_ALLOC_OPT_LOCK_RWLOCK:
                obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
                ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
 
-               /*
-                * For safety, zero-out the astobj2_rwlock header and also the
-                * first word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
                ast_free(obj_rwlock);
                break;
        case AO2_ALLOC_OPT_LOCK_NOLOCK:
-               /*
-                * For safety, zero-out the astobj2 header and also the first
-                * word of the user-data, which we make sure is always
-                * allocated.
-                */
-               memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
                ast_free(obj);
                break;
        default:
@@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
        struct astobj2_lock *obj_mutex;
        struct astobj2_rwlock *obj_rwlock;
 
-       if (data_size < sizeof(void *)) {
-               /*
-                * We always alloc at least the size of a void *,
-                * for debugging purposes.
-                */
-               data_size = sizeof(void *);
-       }
-
        switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
        case AO2_ALLOC_OPT_LOCK_MUTEX:
 #if defined(__AST_DEBUG_MALLOC)
index 1a03bb3d44a370e7d6aff2e5832f20d35acc37b1..7c8c3492098e039fb218e2a311aadd9420e9cc4b 100644 (file)
@@ -467,23 +467,24 @@ struct dispatch {
        struct stasis_subscription *sub;
 };
 
-static void dispatch_dtor(void *data)
+static void dispatch_dtor(struct dispatch *dispatch)
 {
-       struct dispatch *dispatch = data;
        ao2_cleanup(dispatch->topic);
        ao2_cleanup(dispatch->message);
        ao2_cleanup(dispatch->sub);
+
+       ast_free(dispatch);
 }
 
 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
 {
-       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+       struct dispatch *dispatch;
 
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
        ast_assert(sub != NULL);
 
-       dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
+       dispatch = ast_malloc(sizeof(*dispatch));
        if (!dispatch) {
                return NULL;
        }
@@ -497,7 +498,6 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
        dispatch->sub = sub;
        ao2_ref(sub, +1);
 
-       ao2_ref(dispatch, +1);
        return dispatch;
 }
 
@@ -508,9 +508,10 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
  */
 static int dispatch_exec(void *data)
 {
-       RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+       struct dispatch *dispatch = data;
 
        subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
+       dispatch_dtor(dispatch);
 
        return 0;
 }
@@ -534,20 +535,19 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
                ast_assert(sub != NULL);
 
                if (sub->mailbox) {
-                       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+                       struct dispatch *dispatch;
 
                        dispatch = dispatch_create(publisher_topic, message, sub);
                        if (!dispatch) {
-                               ast_log(LOG_DEBUG, "Dropping dispatch\n");
+                               ast_log(LOG_ERROR, "Dropping dispatch\n");
                                break;
                        }
 
-                       if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-                               /* Ownership transferred to mailbox.
-                                * Don't increment ref, b/c the task processor
-                                * may have already gotten rid of the object.
+                       if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+                               /* Push failed; just delete the dispatch.
                                 */
-                               dispatch = NULL;
+                               ast_log(LOG_ERROR, "Dropping dispatch\n");
+                               dispatch_dtor(dispatch);
                        }
                } else {
                        /* Dispatch directly */
index 26d2f2c0c73c8141d7503d5dd775b83c252ea34c..864cf42c4791f87cf0514556962835bcc8562c46 100644 (file)
@@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_message_router.h"
 
-/*! Number of hash buckets for the route table. Keep it prime! */
-#define ROUTE_TABLE_BUCKETS 7
-
 /*! \internal */
 struct stasis_message_route {
        /*! Message type handle by this route. */
@@ -47,29 +44,79 @@ struct stasis_message_route {
        void *data;
 };
 
-static void route_dtor(void *obj)
+struct route_table {
+       /*! Current number of entries in the route table */
+       size_t current_size;
+       /*! Allocated number of entires in the route table */
+       size_t max_size;
+       /*! The route table itself */
+       struct stasis_message_route routes[];
+};
+
+static struct stasis_message_route *table_find_route(struct route_table *table,
+       struct stasis_message_type *message_type)
 {
-       struct stasis_message_route *route = obj;
+       size_t idx;
+
+       /* While a linear search for routes may seem very inefficient, most
+        * route tables have six routes or less. For such small data, it's
+        * hard to beat a linear search. If we start having larger route
+        * tables, then we can look into containers with more efficient
+        * lookups.
+        */
+       for (idx = 0; idx < table->current_size; ++idx) {
+               if (table->routes[idx].message_type == message_type) {
+                       return &table->routes[idx];
+               }
+       }
 
-       ao2_cleanup(route->message_type);
-       route->message_type = NULL;
+       return NULL;
 }
 
-static int route_hash(const void *obj, const int flags)
+static int table_add_route(struct route_table **table_ptr,
+       struct stasis_message_type *message_type,
+       stasis_subscription_cb callback, void *data)
 {
-       const struct stasis_message_route *route = obj;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
+       struct route_table *table = *table_ptr;
+       struct stasis_message_route *route;
+
+       ast_assert(table_find_route(table, message_type) == NULL);
+
+       if (table->current_size + 1 > table->max_size) {
+               size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
+               struct route_table *new_table = ast_realloc(table,
+                       sizeof(*new_table) +
+                       sizeof(new_table->routes[0]) * new_max_size);
+               if (!new_table) {
+                       return -1;
+               }
+               *table_ptr = table = new_table;
+               table->max_size = new_max_size;
+       }
 
-       return ast_str_hash(stasis_message_type_name(message_type));
+       route = &table->routes[table->current_size++];
+
+       route->message_type = ao2_bump(message_type);
+       route->callback = callback;
+       route->data = data;
+
+       return 0;
 }
 
-static int route_cmp(void *obj, void *arg, int flags)
+static int table_remove_route(struct route_table *table,
+       struct stasis_message_type *message_type)
 {
-       const struct stasis_message_route *left = obj;
-       const struct stasis_message_route *right = arg;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
-
-       return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
+       size_t idx;
+
+       for (idx = 0; idx < table->current_size; ++idx) {
+               if (table->routes[idx].message_type == message_type) {
+                       ao2_cleanup(message_type);
+                       table->routes[idx] =
+                               table->routes[--table->current_size];
+                       return 0;
+               }
+       }
+       return -1;
 }
 
 /*! \internal */
@@ -77,11 +124,11 @@ struct stasis_message_router {
        /*! Subscription to the upstream topic */
        struct stasis_subscription *subscription;
        /*! Subscribed routes */
-       struct ao2_container *routes;
-       /*! Subscribed routes for \ref stasi_cache_update messages */
-       struct ao2_container *cache_routes;
+       struct route_table *routes;
+       /*! Subscribed routes for \ref stasis_cache_update messages */
+       struct route_table *cache_routes;
        /*! Route of last resort */
-       struct stasis_message_route *default_route;
+       struct stasis_message_route default_route;
 };
 
 static void router_dtor(void *obj)
@@ -92,49 +139,47 @@ static void router_dtor(void *obj)
        ast_assert(stasis_subscription_is_done(router->subscription));
        router->subscription = NULL;
 
-       ao2_cleanup(router->routes);
+       ast_free(router->routes);
        router->routes = NULL;
 
-       ao2_cleanup(router->cache_routes);
+       ast_free(router->cache_routes);
        router->cache_routes = NULL;
-
-       ao2_cleanup(router->default_route);
-       router->default_route = NULL;
 }
 
-static struct stasis_message_route *find_route(
+static int find_route(
        struct stasis_message_router *router,
-       struct stasis_message *message)
+       struct stasis_message *message,
+       struct stasis_message_route *route_out)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route *route = NULL;
        struct stasis_message_type *type = stasis_message_type(message);
        SCOPED_AO2LOCK(lock, router);
 
+       ast_assert(route_out != NULL);
+
        if (type == stasis_cache_update_type()) {
                /* Find a cache route */
                struct stasis_cache_update *update =
                        stasis_message_data(message);
-               route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+               route = table_find_route(router->cache_routes, update->type);
        }
 
        if (route == NULL) {
                /* Find a regular route */
-               route = ao2_find(router->routes, type, OBJ_KEY);
+               route = table_find_route(router->routes, type);
        }
 
-       if (route == NULL) {
+       if (route == NULL && router->default_route.callback) {
                /* Maybe the default route, then? */
-               if ((route = router->default_route)) {
-                       ao2_ref(route, +1);
-               }
+               route = &router->default_route;
        }
 
-       if (route == NULL) {
-               return NULL;
+       if (!route) {
+               return -1;
        }
 
-       ao2_ref(route, +1);
-       return route;
+       *route_out = *route;
+       return 0;
 }
 
 static void router_dispatch(void *data,
@@ -143,15 +188,12 @@ static void router_dispatch(void *data,
                            struct stasis_message *message)
 {
        struct stasis_message_router *router = data;
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route route;
 
-       route = find_route(router, message);
-
-       if (route) {
-               route->callback(route->data, sub, topic, message);
+       if (find_route(router, message, &route) == 0) {
+               route.callback(route.data, sub, topic, message);
        }
 
-
        if (stasis_subscription_final_message(sub, message)) {
                ao2_cleanup(router);
        }
@@ -167,14 +209,12 @@ struct stasis_message_router *stasis_message_router_create(
                return NULL;
        }
 
-       router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
-               route_cmp);
+       router->routes = ast_calloc(1, sizeof(*router->routes));
        if (!router->routes) {
                return NULL;
        }
 
-       router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
-               route_hash, route_cmp);
+       router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
        if (!router->cache_routes) {
                return NULL;
        }
@@ -216,100 +256,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
        return stasis_subscription_is_done(router->subscription);
 }
 
-
-static struct stasis_message_route *route_create(
-       struct stasis_message_type *message_type,
-       stasis_subscription_cb callback,
-       void *data)
-{
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = ao2_alloc(sizeof(*route), route_dtor);
-       if (!route) {
-               return NULL;
-       }
-
-       if (message_type) {
-               ao2_ref(message_type, +1);
-       }
-       route->message_type = message_type;
-       route->callback = callback;
-       route->data = data;
-
-       ao2_ref(route, +1);
-       return route;
-}
-
-static int add_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
-{
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
-
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
-       }
-
-       ao2_link(router->routes, route);
-       return 0;
-}
-
-static int add_cache_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
-{
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->cache_routes, route->message_type,
-               OBJ_KEY);
-
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
-       }
-
-       ao2_link(router->cache_routes, route);
-       return 0;
-}
-
 int stasis_message_router_add(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = route_create(message_type, callback, data);
-       if (!route) {
-               return -1;
-       }
-
-       return add_route(router, route);
+       SCOPED_AO2LOCK(lock, router);
+       return table_add_route(&router->routes, message_type, callback, data);
 }
 
 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = route_create(message_type, callback, data);
-       if (!route) {
-               return -1;
-       }
-
-       return add_cache_route(router, route);
+       SCOPED_AO2LOCK(lock, router);
+       return table_add_route(&router->cache_routes, message_type, callback, data);
 }
 
 void stasis_message_router_remove(struct stasis_message_router *router,
        struct stasis_message_type *message_type)
 {
        SCOPED_AO2LOCK(lock, router);
-
-       ao2_find(router->routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       table_remove_route(router->routes, message_type);
 }
 
 void stasis_message_router_remove_cache_update(
@@ -317,9 +284,7 @@ void stasis_message_router_remove_cache_update(
        struct stasis_message_type *message_type)
 {
        SCOPED_AO2LOCK(lock, router);
-
-       ao2_find(router->cache_routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       table_remove_route(router->cache_routes, message_type);
 }
 
 int stasis_message_router_set_default(struct stasis_message_router *router,
@@ -327,7 +292,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
                                      void *data)
 {
        SCOPED_AO2LOCK(lock, router);
-       ao2_cleanup(router->default_route);
-       router->default_route = route_create(NULL, callback, data);
-       return router->default_route ? 0 : -1;
+       router->default_route.callback = callback;
+       router->default_route.data = data;
+       /* While this implementation can never fail, it used to be able to */
+       return 0;
 }
index a8d1c80f97cd0aacb0feaca913fe2b9e515172ea..92bd64c328d82fcc44f9accab46e83111dbeb4dc 100644 (file)
@@ -431,10 +431,6 @@ static void tps_taskprocessor_destroy(void *tps)
        }
        ast_free((char *) t->name);
        if (t->listener) {
-               /* This code should not be reached since the listener
-                * should have been destroyed before the taskprocessor could
-                * be destroyed
-                */
                ao2_ref(t->listener, -1);
                t->listener = NULL;
        }
index 2b7a82b3fe13d13bded36fd4c8721733c17247ff..c3e6c21924b6eaf422404f9ad1168b7c45c6fcdd 100644 (file)
@@ -9,6 +9,7 @@
 #define RES_PJSIP_PRIVATE_H_
 
 struct ao2_container;
+struct ast_threadpool_options;
 
 /*!
  * \brief Initialize the configuration for res_pjsip
index da9c50874e340e3304420fc479e7a7a58bd32f26..498df94402c873e5a952848ce8fd7b14e728a5c1 100644 (file)
@@ -867,52 +867,6 @@ AST_TEST_DEFINE(cache_dump)
        return AST_TEST_PASS;
 }
 
-AST_TEST_DEFINE(route_conflicts)
-{
-       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
-       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
-       RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
-       RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
-       int ret;
-
-       switch (cmd) {
-       case TEST_INIT:
-               info->name = __func__;
-               info->category = test_category;
-               info->summary =
-                       "Multiple routes to the same message_type should fail";
-               info->description =
-                       "Multiple routes to the same message_type should fail";
-               return AST_TEST_NOT_RUN;
-       case TEST_EXECUTE:
-               break;
-       }
-
-       topic = stasis_topic_create("TestTopic");
-       ast_test_validate(test, NULL != topic);
-
-       consumer1 = consumer_create(1);
-       ast_test_validate(test, NULL != consumer1);
-       consumer2 = consumer_create(1);
-       ast_test_validate(test, NULL != consumer2);
-
-       test_message_type = stasis_message_type_create("TestMessage", NULL);
-       ast_test_validate(test, NULL != test_message_type);
-
-       uut = stasis_message_router_create(topic);
-       ast_test_validate(test, NULL != uut);
-
-       ret = stasis_message_router_add(
-               uut, test_message_type, consumer_exec, consumer1);
-       ast_test_validate(test, 0 == ret);
-       ret = stasis_message_router_add(
-               uut, test_message_type, consumer_exec, consumer2);
-       ast_test_validate(test, 0 != ret);
-
-       return AST_TEST_PASS;
-}
-
 AST_TEST_DEFINE(router)
 {
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1373,7 +1327,6 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(cache_filter);
        AST_TEST_UNREGISTER(cache);
        AST_TEST_UNREGISTER(cache_dump);
-       AST_TEST_UNREGISTER(route_conflicts);
        AST_TEST_UNREGISTER(router);
        AST_TEST_UNREGISTER(router_cache_updates);
        AST_TEST_UNREGISTER(interleaving);
@@ -1397,7 +1350,6 @@ static int load_module(void)
        AST_TEST_REGISTER(cache_filter);
        AST_TEST_REGISTER(cache);
        AST_TEST_REGISTER(cache_dump);
-       AST_TEST_REGISTER(route_conflicts);
        AST_TEST_REGISTER(router);
        AST_TEST_REGISTER(router_cache_updates);
        AST_TEST_REGISTER(interleaving);