]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Remove unnecessary waits from stasis.
authorMark Michelson <mmichelson@digium.com>
Wed, 2 Oct 2013 22:08:49 +0000 (22:08 +0000)
committerMark Michelson <mmichelson@digium.com>
Wed, 2 Oct 2013 22:08:49 +0000 (22:08 +0000)
Since caches are updated on publisher threads, there is no need
to wait for the cache updates to occur after a stasis message
is published.

In the case of chan_pjsip device state changes, this set of
changes caused an improvement to performance.

Review: https://reviewboard.asterisk.org/r/2890

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400318 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_pjsip.c
include/asterisk/stasis.h
include/asterisk/stasis_endpoints.h
main/stasis.c
main/stasis_endpoints.c
main/stasis_wait.c [deleted file]
res/ari/resource_endpoints.c
tests/test_cel.c

index fcca25c8d1cbdcb40d2de36bfe56fe5c0ba2203f..e505751c3a1cd557ab6331b42f376950150191a8 100644 (file)
@@ -896,7 +896,11 @@ static int chan_pjsip_devicestate(const char *data)
        }
 
        endpoint_snapshot = ast_endpoint_latest_snapshot(ast_endpoint_get_tech(endpoint->persistent),
-               ast_endpoint_get_resource(endpoint->persistent), 1);
+               ast_endpoint_get_resource(endpoint->persistent));
+
+       if (!endpoint_snapshot) {
+               return AST_DEVICE_INVALID;
+       }
 
        if (endpoint_snapshot->state == AST_ENDPOINT_OFFLINE) {
                state = AST_DEVICE_UNAVAILABLE;
@@ -916,7 +920,6 @@ static int chan_pjsip_devicestate(const char *data)
                RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
                struct ast_channel_snapshot *snapshot;
 
-               stasis_topic_wait(ast_channel_topic_all_cached());
                msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
                        endpoint_snapshot->channel_ids[num]);
 
index 529aa12bbdd1715bc993fc1d8584f470bf85ece2..943e0c72cfb9692a0cfda1d88d5e178d89a7df62 100644 (file)
@@ -347,15 +347,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
  */
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
 
-/*!
- * \brief Wait for all pending messages on a given topic to be processed.
- * \param topic Topic to await pending messages on.
- * \return 0 on success.
- * \return Non-zero on error.
- * \since 12
- */
-int stasis_topic_wait(struct stasis_topic *topic);
-
 /*! @} */
 
 /*! @{ */
@@ -868,11 +859,6 @@ int stasis_cache_init(void);
  */
 int stasis_config_init(void);
 
-/*!
- * \internal
- */
-int stasis_wait_init(void);
-
 /*! @} */
 
 /*!
index 4a35e958730c84713316a4253b0f36869513c645..0ba233bf872abd861a0cb1bc64eedd29d6dea6f4 100644 (file)
@@ -194,14 +194,12 @@ struct stasis_cache *ast_endpoint_cache(void);
  *
  * \param tech Name of the endpoint's technology.
  * \param resource Resource name of the endpoint.
- * \param guaranteed Whether to require all pending messages to have been processed or not.
  * \return Snapshot of the endpoint with the given name.
  * \return \c NULL if endpoint is not found, or on error.
  * \since 12
  */
 struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
-       const char *resource,
-       unsigned int guaranteed
+       const char *resource
 );
 
 /*! @} */
index 42c90176996c3a50a8bb1291da385536ab3c5512..eabdfdc1c544e9422b5524ebbb9936c63a440634 100644 (file)
@@ -814,11 +814,6 @@ int stasis_init(void)
        /* Be sure the types are cleaned up after the message bus */
        ast_register_cleanup(stasis_cleanup);
 
-       if (stasis_wait_init() != 0) {
-               ast_log(LOG_ERROR, "Stasis initialization failed\n");
-               return -1;
-       }
-
        cache_init = stasis_cache_init();
        if (cache_init != 0) {
                return -1;
index 4a183784c092a452c09dd23ae60892171da73e6c..096770a3d0c2957b8114c565d659e6f3b2c10fef 100644 (file)
@@ -187,7 +187,7 @@ void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_mess
 }
 
 struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
-       const char *name, unsigned int guaranteed)
+       const char *name)
 {
        RAII_VAR(char *, id, NULL, ast_free);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
@@ -198,10 +198,6 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
                return NULL;
        }
 
-       if (guaranteed) {
-               stasis_topic_wait(ast_endpoint_topic_all_cached());
-       }
-
        msg = stasis_cache_get(ast_endpoint_cache(),
                ast_endpoint_snapshot_type(), id);
        if (!msg) {
diff --git a/main/stasis_wait.c b/main/stasis_wait.c
deleted file mode 100644 (file)
index 32b5971..0000000
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * Joshua Colp <jcolp@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
-
-/*! \file
- *
- * \brief Wait support for Stasis topics.
- *
- * \author Joshua Colp <jcolp@digium.com>
- */
-
-/*** MODULEINFO
-       <support_level>core</support_level>
- ***/
-
-#include "asterisk.h"
-
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
-#include "asterisk/astobj2.h"
-#include "asterisk/stasis.h"
-
-static struct stasis_message_type *cache_guarantee_type(void);
-STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
-
-/*! \internal */
-struct caching_guarantee {
-       ast_mutex_t lock;
-       ast_cond_t cond;
-       unsigned int done:1;
-};
-
-static void caching_guarantee_dtor(void *obj)
-{
-       struct caching_guarantee *guarantee = obj;
-
-       ast_assert(guarantee->done == 1);
-
-       ast_mutex_destroy(&guarantee->lock);
-       ast_cond_destroy(&guarantee->cond);
-}
-
-static void guarantee_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_message *message)
-{
-       /* Wait for our particular message */
-       if (data == message) {
-               struct caching_guarantee *guarantee;
-               ast_assert(cache_guarantee_type() == stasis_message_type(message));
-               guarantee = stasis_message_data(message);
-
-               ast_mutex_lock(&guarantee->lock);
-               guarantee->done = 1;
-               ast_cond_signal(&guarantee->cond);
-               ast_mutex_unlock(&guarantee->lock);
-       }
-}
-
-static struct stasis_message *caching_guarantee_create(void)
-{
-       RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
-       if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
-               return NULL;
-       }
-
-       ast_mutex_init(&guarantee->lock);
-       ast_cond_init(&guarantee->cond, NULL);
-
-       if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
-               return NULL;
-       }
-
-       ao2_ref(msg, +1);
-       return msg;
-}
-
-int stasis_topic_wait(struct stasis_topic *topic)
-{
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
-       struct caching_guarantee *guarantee;
-
-       msg = caching_guarantee_create();
-       if (!msg) {
-               return -1;
-       }
-
-       sub = stasis_subscribe(topic, guarantee_handler, msg);
-       if (!sub) {
-               return -1;
-       }
-
-       guarantee = stasis_message_data(msg);
-
-       ast_mutex_lock(&guarantee->lock);
-       stasis_publish(topic, msg);
-       while (!guarantee->done) {
-               ast_cond_wait(&guarantee->cond, &guarantee->lock);
-       }
-       ast_mutex_unlock(&guarantee->lock);
-       return 0;
-}
-
-static void wait_cleanup(void)
-{
-       STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
-}
-
-int stasis_wait_init(void)
-{
-       ast_register_cleanup(wait_cleanup);
-
-       if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
-               return -1;
-       }
-       return 0;
-}
index 35d8a45cc528b4d65c79726277b6468a2393f1ce..caf20cb28b882d3a1a8a87dae0bb70db535a75cd 100644 (file)
@@ -140,7 +140,7 @@ void ast_ari_get_endpoint(struct ast_variable *headers,
        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
        RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
 
-       snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource, 0);
+       snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
        if (!snapshot) {
                ast_ari_response_error(response, 404, "Not Found",
                        "Endpoint not found");
index 673b8f9fc53bcff2461ea85cd306e82d522d7eb0..2ae5abb306121990bcc5c685ff27a9415185ec0b 100644 (file)
@@ -254,7 +254,6 @@ static void do_sleep(void)
        ast_hangup((channel)); \
        HANGUP_EVENT(channel, cause, dialstatus); \
        APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \
-       stasis_topic_wait(ast_channel_topic_all_cached()); \
        ao2_cleanup(stasis_cache_get(ast_channel_cache(), \
                ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \
        ao2_cleanup(channel); \