]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Rework stasis cache clear events
authorKinsey Moore <kmoore@digium.com>
Fri, 7 Jun 2013 12:56:56 +0000 (12:56 +0000)
committerKinsey Moore <kmoore@digium.com>
Fri, 7 Jun 2013 12:56:56 +0000 (12:56 +0000)
Stasis cache clear message payloads now consist of a stasis_message
representative of the message to be cleared from the cache. This allows
multiple parallel caches to coexist and be cleared properly by the same
cache clear message even when keyed on different fields.

This change fixes a bug where multiple cache clears could be posted for
channels. The cache clear is now produced in the destructor instead of
ast_hangup.

Additionally, dummy channels are no longer capable of producing channel
snapshots.

Review: https://reviewboard.asterisk.org/r/2596

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@390830 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis.h
main/bridging.c
main/channel.c
main/endpoints.c
main/stasis_cache.c
main/stasis_channels.c
tests/test_stasis.c

index fdc629324db7b5623fcb3a4d092b2ef722d2bf35..6a55d0926781bcfa9ddd43d5cdfb3fefafb19fb3 100644 (file)
@@ -502,31 +502,11 @@ struct stasis_cache_update {
 };
 
 /*!
- * \brief Cache clear message.
- */
-struct stasis_cache_clear {
-       /*! Type of object being cleared from the cache */
-       struct stasis_message_type *type;
-       /*! Id of the object being cleared from the cache */
-       char id[];
-};
-
-/*!
- * \brief Message type for \ref stasis_cache_clear.
+ * \brief Message type for clearing a message from a stasis cache.
  * \since 12
  */
 struct stasis_message_type *stasis_cache_clear_type(void);
 
-/*!
- * \brief A message which instructs the caching topic to remove an entry from its cache.
- * \param type Message type.
- * \param id Unique id of the snapshot to clear.
- * \return Message which, when sent to the \a topic, will clear the item from the cache.
- * \return \c NULL on error.
- * \since 12
- */
-struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
-
 /*! @} */
 
 /*! @{ */
@@ -537,6 +517,18 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *typ
  */
 struct stasis_caching_topic;
 
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from its cache.
+ *
+ * \param message Message representative of the cache entry that should be cleared.
+ *     This will become the data held in the stasis_cache_clear message.
+ *
+ * \return Message which, when sent to the \a topic, will clear the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
+
 /*!
  * \brief Callback extract a unique identity from a snapshot message.
  *
index d5d17ae9eed3b17bbd758304162455a35a5e0962..6a21c0b64f938a569186d65bc92f24592221e789 100644 (file)
@@ -1283,17 +1283,32 @@ static void bridge_handle_actions(struct ast_bridge *bridge)
        }
 }
 
+static struct stasis_message *create_bridge_snapshot_message(struct ast_bridge *bridge)
+{
+       RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
+       snapshot = ast_bridge_snapshot_create(bridge);
+       if (!snapshot) {
+               return NULL;
+       }
+
+       return stasis_message_create(ast_bridge_snapshot_type(), snapshot);
+}
+
 static void destroy_bridge(void *obj)
 {
        struct ast_bridge *bridge = obj;
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
 
        ast_debug(1, "Bridge %s: actually destroying %s bridge, nobody wants it anymore\n",
                bridge->uniqueid, bridge->v_table->name);
 
-       msg = stasis_cache_clear_create(ast_bridge_snapshot_type(), bridge->uniqueid);
-       if (msg) {
-               stasis_publish(ast_bridge_topic(bridge), msg);
+       clear_msg = create_bridge_snapshot_message(bridge);
+       if (clear_msg) {
+               RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+               msg = stasis_cache_clear_create(clear_msg);
+               if (msg) {
+                       stasis_publish(ast_bridge_topic(bridge), msg);
+               }
        }
 
        /* Do any pending actions in the context of destruction. */
index 4adff953cf8825825b91155dfb0e1ab4b7a967d6..f3aabd04a652d6354b13c02b40fd36153ea86fa8 100644 (file)
@@ -817,11 +817,28 @@ int ast_str2cause(const char *name)
        return -1;
 }
 
+static struct stasis_message *create_channel_snapshot_message(struct ast_channel *channel)
+{
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+       snapshot = ast_channel_snapshot_create(channel);
+       if (!snapshot) {
+               return NULL;
+       }
+
+       return stasis_message_create(ast_channel_snapshot_type(), snapshot);
+}
+
 static void publish_cache_clear(struct ast_channel *chan)
 {
        RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
 
-       message = stasis_cache_clear_create(ast_channel_snapshot_type(), ast_channel_uniqueid(chan));
+       clear_msg = create_channel_snapshot_message(chan);
+       if (!clear_msg) {
+               return;
+       }
+
+       message = stasis_cache_clear_create(clear_msg);
        stasis_publish(ast_channel_topic(chan), message);
 }
 
@@ -1161,6 +1178,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
        }
 
        ast_channel_internal_finalize(tmp);
+       ast_publish_channel_state(tmp);
        return tmp;
 }
 
@@ -2369,6 +2387,8 @@ static void ast_channel_destructor(void *obj)
        char device_name[AST_CHANNEL_NAME];
        struct ast_callid *callid;
 
+       publish_cache_clear(chan);
+
        if (ast_channel_internal_is_finalized(chan)) {
                ast_cel_report_event(chan, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
                ast_cel_check_retire_linkedid(chan);
@@ -2884,7 +2904,6 @@ int ast_hangup(struct ast_channel *chan)
        ast_cc_offer(chan);
 
        ast_publish_channel_state(chan);
-       publish_cache_clear(chan);
 
        if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
                !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
index c2d0577f977613bf21313cd64e561a5220b5c7ac..a5d50cfde6324a5d89c8c1e257528737f111cc9b 100644 (file)
@@ -145,18 +145,25 @@ static void endpoint_channel_snapshot(void *data,
        }
 }
 
+/*! \brief Handler for channel snapshot cache clears */
 static void endpoint_cache_clear(void *data,
        struct stasis_subscription *sub, struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_endpoint *endpoint = data;
-       struct stasis_cache_clear *clear = stasis_message_data(message);
+       struct stasis_message *clear_msg = stasis_message_data(message);
+       struct ast_channel_snapshot *clear_snapshot;
+
+       if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
+               return;
+       }
+
+       clear_snapshot = stasis_message_data(clear_msg);
 
        ast_assert(endpoint != NULL);
-       ast_assert(clear != NULL);
 
        ao2_lock(endpoint);
-       ao2_find(endpoint->channel_ids, clear->id, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+       ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
        ao2_unlock(endpoint);
        endpoint_publish_snapshot(endpoint);
 }
@@ -247,17 +254,32 @@ const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
        return endpoint->tech;
 }
 
+static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
+{
+       RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+       snapshot = ast_endpoint_snapshot_create(endpoint);
+       if (!snapshot) {
+               return NULL;
+       }
+
+       return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
+}
+
 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
 {
-       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
 
        if (endpoint == NULL) {
                return;
        }
 
-       message = stasis_cache_clear_create(ast_endpoint_snapshot_type(), endpoint->id);
-       if (message) {
-               stasis_publish(endpoint->topic, message);
+       clear_msg = create_endpoint_snapshot_message(endpoint);
+       if (clear_msg) {
+               RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+               message = stasis_cache_clear_create(clear_msg);
+               if (message) {
+                       stasis_publish(endpoint->topic, message);
+               }
        }
 
        /* Bump refcount to hold on to the router */
index ac34959db393e7f6fedcb6a230c115170c10babf..546ad49980c7783d80a027111c8e89132292c53b 100644 (file)
@@ -262,30 +262,11 @@ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_top
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
 
-static void cache_clear_dtor(void *obj)
+struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
 {
-       struct stasis_cache_clear *ev = obj;
-       ao2_cleanup(ev->type);
-       ev->type = NULL;
-}
-
-struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
-{
-       RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-       ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor);
-       if (!ev) {
-               return NULL;
-       }
-
-       /* strcpy safe */
-       strcpy(ev->id, id);
-       ao2_ref(type, +1);
-       ev->type = type;
-
-       msg = stasis_message_create(stasis_cache_clear_type(), ev);
-
+       msg = stasis_message_create(stasis_cache_clear_type(), id_message);
        if (!msg) {
                return NULL;
        }
@@ -363,21 +344,25 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru
        if (stasis_cache_clear_type() == stasis_message_type(message)) {
                RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
                RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
-               struct stasis_cache_clear *clear = stasis_message_data(message);
-               ast_assert(clear->type != NULL);
-               ast_assert(clear->id != NULL);
-               old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
-               if (old_snapshot) {
-                       update = update_create(topic, old_snapshot, NULL);
-                       stasis_publish(caching_topic->topic, update);
-               } else {
-                       /* While this could be a problem, it's very likely to
-                        * happen with message forwarding */
-                       ast_debug(1,
-                               "Attempting to remove an item from the cache that isn't there: %s %s\n",
-                               stasis_message_type_name(clear->type), clear->id);
+               struct stasis_message *clear_msg = stasis_message_data(message);
+               const char *clear_id = caching_topic->id_fn(clear_msg);
+               struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
+
+               ast_assert(clear_type != NULL);
+
+               if (clear_id) {
+                       old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
+                       if (old_snapshot) {
+                               update = update_create(topic, old_snapshot, NULL);
+                               stasis_publish(caching_topic->topic, update);
+                               return;
+                       }
+
+                       ast_log(LOG_ERROR,
+                               "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+                               stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
+                       return;
                }
-               return;
        }
 
        id = caching_topic->id_fn(message);
index 52b6ef4ce573a75dbac78090b718e530852143e1..65e9f917ddb0b863569218667b112120fcf10ad8 100644 (file)
@@ -131,6 +131,11 @@ struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *cha
 {
        RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
 
+       /* no snapshots for dummy channels */
+       if (!ast_channel_tech(chan)) {
+               return NULL;
+       }
+
        snapshot = ao2_alloc(sizeof(*snapshot), channel_snapshot_dtor);
        if (!snapshot || ast_string_field_init(snapshot, 1024)) {
                return NULL;
index 8f81378ab1675f8652782dc8a7cd1dae6e76e1ad..915226d293213c6c39ff2a1ed51c03f1392db4a2 100644 (file)
@@ -716,7 +716,7 @@ AST_TEST_DEFINE(cache)
        ao2_ref(test_message2_2, -1);
 
        /* Clear snapshot 1 */
-       test_message1_clear = stasis_cache_clear_create(cache_type, "1");
+       test_message1_clear = stasis_cache_clear_create(test_message1_1);
        ast_test_validate(test, NULL != test_message1_clear);
        stasis_publish(topic, test_message1_clear);
 
@@ -811,7 +811,7 @@ AST_TEST_DEFINE(cache_dump)
        }
 
        /* Clear snapshot 1 */
-       test_message1_clear = stasis_cache_clear_create(cache_type, "1");
+       test_message1_clear = stasis_cache_clear_create(test_message1_1);
        ast_test_validate(test, NULL != test_message1_clear);
        stasis_publish(topic, test_message1_clear);