]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
stasis: Add internal filtering of messages.
authorJoshua Colp <jcolp@digium.com>
Sun, 23 Sep 2018 20:50:01 +0000 (17:50 -0300)
committerJoshua Colp <jcolp@digium.com>
Sun, 18 Nov 2018 20:08:07 +0000 (15:08 -0500)
This change adds the ability for subscriptions to indicate
which message types they are interested in accepting. By
doing so the filtering is done before being dispatched
to the subscriber, reducing the amount of work that has
to be done.

This is optional and if a subscriber does not add
message types they wish to accept and set the subscription
to selective filtering the previous behavior is preserved
and they receive all messages.

There is also the ability to explicitly force the reception
of all messages for cases such as AMI or ARI where a large
number of messages are expected that are then generically
converted into a different format.

ASTERISK-28103

Change-Id: I99bee23895baa0a117985d51683f7963b77aa190

33 files changed:
apps/app_queue.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
include/asterisk/stasis.h
include/asterisk/stasis_cache_pattern.h
include/asterisk/stasis_message_router.h
main/ccss.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/pbx.c
main/presencestate.c
main/stasis.c
main/stasis_cache.c
main/stasis_cache_pattern.c
main/stasis_message.c
main/stasis_message_router.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/parking/parking_manager.c
res/res_hep_rtcp.c
res/res_pjsip_mwi.c
res/res_pjsip_outbound_registration.c
res/res_pjsip_publish_asterisk.c
res/res_pjsip_pubsub.c
res/res_pjsip_refer.c
res/res_security_log.c
res/res_stasis_device_state.c
res/res_xmpp.c

index 80c253f2247f1475ebcdccd58862df41ab1bbd56..b29988961f5edbed420116c04d4b9ea88ae6f8b2 100644 (file)
@@ -11336,6 +11336,8 @@ static int load_module(void)
        if (!device_state_sub) {
                err = -1;
        }
+       stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        manager_topic = ast_manager_get_topic();
        queue_topic = ast_queue_topic_all();
index f4f651487def05e920dbbe159df765fe0364eb40..1eb618bd4525f20eb55cae88da7f84747665e1ab 100644 (file)
@@ -12594,6 +12594,8 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
                                 * knows that we care about it.  Then, chan_dahdi will get the MWI from the
                                 * event cache instead of checking the mailbox directly. */
                                tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                               stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());
+                               stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                        }
                }
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
index 01d42b57fa20db5e4554b0cef4e7641d680a6814..0ca4234d7d15234546babcb0f9f4cc039c1caba8 100644 (file)
@@ -1456,6 +1456,8 @@ static void network_change_stasis_subscribe(void)
        if (!network_change_sub) {
                network_change_sub = stasis_subscribe(ast_system_topic(),
                        network_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+               stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -1469,6 +1471,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -13072,6 +13076,8 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
                         * mailboxes.  However, we just grab the events out of the cache when it
                         * is time to send MWI, since it is only sent with a REGACK. */
                        peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                       stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index 2ac7690a6e52841bee3d3a65996106b5e56f2b32..46342ce31240e27b525d342403d13fcdf5645d16 100644 (file)
@@ -4242,6 +4242,8 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
                                                 * knows that we care about it.  Then, chan_mgcp will get the MWI from the
                                                 * event cache instead of checking the mailbox directly. */
                                                e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                                               stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());
+                                               stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                                        }
                                }
                                snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
index 2693e5f65549eac4e7427537b40c22ce2f7c6be5..9089b5e10044ea6e6d9ada4405b25d78ec922b42 100644 (file)
@@ -17494,6 +17494,8 @@ static void network_change_stasis_subscribe(void)
        if (!network_change_sub) {
                network_change_sub = stasis_subscribe(ast_system_topic(),
                        network_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+               stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -17507,6 +17509,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 
 }
@@ -28385,6 +28389,9 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
                mailbox_specific_topic = ast_mwi_topic(mailbox->id);
                if (mailbox_specific_topic) {
                        mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
+                       stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());
+                       stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());
+                       stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 }
index 2b13e5eaaab9459950b7eee391e9363c361046e7..910b7b811763255da10faa984bfaee0cf83896e6 100644 (file)
@@ -8334,6 +8334,8 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
                mailbox_specific_topic = ast_mwi_topic(l->mailbox);
                if (mailbox_specific_topic) {
                        l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
+                       stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index fbc4e40f05667f7c67bba158508247252b1c2b69..ec6d666aea6c2360cb88df1be4ee30b93e1b8434 100644 (file)
@@ -9130,6 +9130,9 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
                if (!pri->mbox[i].sub) {
                        ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
                                sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);
+               } else {
+                       stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
 #if defined(HAVE_PRI_MWI_V2)
                if (ast_strlen_zero(pri->mbox[i].vm_number)) {
index 2b56b53f81283514d91cd0615b5ec737735f5c7d..ebd00ee23d19901ef95847f09ab40300fa9ab0c9 100644 (file)
@@ -291,6 +291,15 @@ enum stasis_message_type_result {
        STASIS_MESSAGE_TYPE_DECLINED,   /*!< Message type was not created due to configuration */
 };
 
+/*!
+ * \brief Stasis subscription message filters
+ */
+enum stasis_subscription_message_filter {
+       STASIS_SUBSCRIPTION_FILTER_NONE = 0,    /*!< No filter is in place, all messages are raised */
+       STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
+       STASIS_SUBSCRIPTION_FILTER_SELECTIVE,   /*!< Only messages of allowed message types are raised */
+};
+
 /*!
  * \brief Create a new message type.
  *
@@ -326,6 +335,14 @@ const char *stasis_message_type_name(const struct stasis_message_type *type);
  */
 unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
 
+/*!
+ * \brief Gets the id of a given message type
+ * \param type The type to get the id of.
+ * \return The id
+ * \since 17.0.0
+ */
+int stasis_message_type_id(const struct stasis_message_type *type);
+
 /*!
  * \brief Check whether a message type is declined
  *
@@ -494,6 +511,14 @@ struct stasis_topic *stasis_topic_create(const char *name);
  */
 const char *stasis_topic_name(const struct stasis_topic *topic);
 
+/*!
+ * \brief Return the number of subscribers of a topic.
+ * \param topic Topic.
+ * \return Number of subscribers of the topic.
+ * \since 17.0.0
+ */
+size_t stasis_topic_subscribers(const struct stasis_topic *topic);
+
 /*!
  * \brief Publish a message to a topic's subscribers.
  * \param topic Topic.
@@ -559,6 +584,10 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
@@ -584,10 +613,68 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12.8.0
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
 
+/*!
+ * \brief Indicate to a subscription that we are interested in a message type.
+ *
+ * This will cause the subscription to allow the given message type to be
+ * raised to our subscription callback. This enables internal filtering in
+ * the stasis message bus to reduce messages.
+ *
+ * \param subscription Subscription to add message type to.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ *
+ * \note If you are wanting to use stasis_final_message you will need to accept
+ * \ref stasis_subscription_change_type as a message type.
+ *
+ * \note Until the subscription is set to selective filtering it is possible for it
+ * to receive messages of message types that would not normally be accepted.
+ */
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type);
+
+/*!
+ * \brief Indicate to a subscription that we are not interested in a message type.
+ *
+ * \param subscription Subscription to remove message type from.
+ * \param type The message type we don't wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a subscription
+ *
+ * This will cause the subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param subscription Subscription that should receive all messages.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter);
+
 /*!
  * \brief Cancel a subscription.
  *
@@ -1036,6 +1123,41 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
 struct stasis_topic *stasis_caching_get_topic(
        struct stasis_caching_topic *caching_topic);
 
+/*!
+ * \brief Indicate to a caching topic that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param caching_topic The caching topic.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+       struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param caching_topic The caching topic.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+       enum stasis_subscription_message_filter filter);
+
 /*!
  * \brief A message which instructs the caching topic to remove an entry from
  * its cache.
index e61d3e931cfdc52fcebbfdf63f8f233869e000bc..514d62e695a509a92e9984770c463761e2f603f7 100644 (file)
@@ -169,4 +169,39 @@ struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one);
 struct stasis_topic *stasis_cp_single_topic_cached(
        struct stasis_cp_single *one);
 
+/*!
+ * \brief Indicate to an instance that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param one One side of the cache pattern.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+       struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param one One side of the cache pattern.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+       enum stasis_subscription_message_filter filter);
+
 #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */
index 50270a788b4c89537d5ce9ad3e26f863ff1408d5..8dcdfcc913ec6b9e98898cd9e4566a8e57239123 100644 (file)
@@ -233,6 +233,10 @@ void stasis_message_router_remove_cache_update(
  * \retval -1 on failure
  *
  * \since 12
+ *
+ * \note Setting a default callback will automatically cause the underlying
+ * subscription to receive all messages and not be filtered. If filtering is
+ * desired then a specific route for each message type should be provided.
  */
 int stasis_message_router_set_default(struct stasis_message_router *router,
                                      stasis_subscription_cb callback,
index 5758574f6639baaa6c9b8b17b88eba320e4d1e7b..52ec58647bd269764cccb35488ce27e4bbc467a5 100644 (file)
@@ -1433,6 +1433,8 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_
                cc_unref(generic_list, "Failed to subscribe to device state");
                return NULL;
        }
+       stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        generic_list->current_state = ast_device_state(monitor->interface->device_name);
        ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");
        return generic_list;
@@ -2804,6 +2806,9 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent)
        if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        cc_ref(agent, "Ref agent for subscription");
        return 0;
 }
index 7dcbe829005e02a027485c95d1058ac34a46a717..b6c740ce26f995c2fcf30575b8db086531ebe105 100644 (file)
@@ -920,6 +920,8 @@ int devstate_init(void)
        if (!device_state_topic_cached) {
                return -1;
        }
+       stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());
+       stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
                devstate_change_cb, NULL);
@@ -927,6 +929,8 @@ int devstate_init(void)
                ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
                return -1;
        }
+       stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return 0;
 }
index f1608f3a0018f511c58dca057f7c2737cc9f5ec3..3129fb49f7c8852c25092972121cead1394c9adf 100644 (file)
@@ -202,7 +202,7 @@ static void endpoint_cache_clear(void *data,
        endpoint_publish_snapshot(endpoint);
 }
 
-static void endpoint_default(void *data,
+static void endpoint_subscription_change(void *data,
        struct stasis_subscription *sub,
        struct stasis_message *message)
 {
@@ -263,6 +263,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                if (!endpoint->topics) {
                        return NULL;
                }
+               stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+               stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
                endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
                if (!endpoint->router) {
@@ -271,8 +273,9 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                r |= stasis_message_router_add(endpoint->router,
                        stasis_cache_clear_type(), endpoint_cache_clear,
                        endpoint);
-               r |= stasis_message_router_set_default(endpoint->router,
-                       endpoint_default, endpoint);
+               r |= stasis_message_router_add(endpoint->router,
+                       stasis_subscription_change_type(), endpoint_subscription_change,
+                       endpoint);
                if (r) {
                        return NULL;
                }
@@ -288,6 +291,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                if (!endpoint->topics) {
                        return NULL;
                }
+               stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+               stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
                ao2_link(tech_endpoints, endpoint);
        }
index 7accaa15f7f88aa4f60d3114f576fccd0a6c307e..0da023a518949808a2e21d494314b87ed0ed4323 100644 (file)
@@ -1527,6 +1527,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
index f9612957e54a8d28cc31df2ad60687407e0336a5..0a23735c05f12de6a892a8aac99ac746dc828ae3 100644 (file)
@@ -8416,10 +8416,15 @@ int load_pbx(void)
        if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());
+       stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());
+       stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return 0;
 }
index 4121bf5b69a15a07696e2a49739c06f61b55995d..65b7f69270cbafa4a7dfe5a6211d79f4337cce14 100644 (file)
@@ -514,6 +514,8 @@ int ast_presence_state_engine_init(void)
        if (!presence_state_topic_cached) {
                return -1;
        }
+       stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());
+       stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        AST_TEST_REGISTER(test_presence_chan);
 
index ed838733b99b286b56c8a476b7f1f4465dae75d3..93112d98ebb9334c9a48771b05243853eb47052a 100644 (file)
@@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
        return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+       return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
@@ -391,6 +396,11 @@ struct stasis_subscription {
        /*! Flag set when final message for sub has been processed.
         *  Be sure join_lock is held before reading/setting. */
        int final_message_processed;
+
+       /*! The message types this subscription is accepting */
+       AST_VECTOR(, char) accepted_message_types;
+       /*! The message filter currently in use */
+       enum stasis_subscription_message_filter filter;
 };
 
 static void subscription_dtor(void *obj)
@@ -409,6 +419,8 @@ static void subscription_dtor(void *obj)
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
        ast_cond_destroy(&sub->join_cond);
+
+       AST_VECTOR_FREE(&sub->accepted_message_types);
 }
 
 /*!
@@ -420,19 +432,25 @@ static void subscription_dtor(void *obj)
 static void subscription_invoke(struct stasis_subscription *sub,
                                  struct stasis_message *message)
 {
+       unsigned int final = stasis_subscription_final_message(sub, message);
+       int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
        /* Notify that the final message has been received */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_rxed = 1;
                ast_cond_signal(&sub->join_cond);
                ao2_unlock(sub);
        }
 
-       /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, message);
+       if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+               (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+               /* Since sub is mostly immutable, no need to lock sub */
+               sub->callback(sub->data, sub, message);
+       }
 
        /* Notify that the final message has been processed */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_processed = 1;
                ast_cond_signal(&sub->join_cond);
@@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe(
        sub->callback = callback;
        sub->data = data;
        ast_cond_init(&sub->join_cond, NULL);
+       sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+       AST_VECTOR_INIT(&sub->accepted_message_types, 0);
 
        if (topic_add_subscription(topic, sub) != 0) {
                ao2_ref(sub, -1);
@@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
        return res;
 }
 
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               /* Filtering is unreliable as this message type is not yet initialized
+                * so force all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+               /* We do this for the same reason as above. The subscription can still operate, so allow
+                * it to do so by forcing all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+               /* The memory is already allocated so this can't fail */
+               AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ao2_lock(subscription->topic);
+       if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+               subscription->filter = filter;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
@@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
        struct stasis_message *message,
        int synchronous)
 {
+       /* Determine if this subscription is interested in this message. Note that final
+        * messages are special and are always invoked on the subscription.
+        */
+       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+               int message_type_id = stasis_message_type_id(stasis_message_type(message));
+               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+                       !stasis_subscription_final_message(sub, message)) {
+                       return;
+               }
+       }
+
        if (!sub->mailbox) {
                /* Dispatch directly */
                subscription_invoke(sub, message);
@@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
 
+       /* If there are no subscribers don't bother */
+       if (!stasis_topic_subscribers(topic)) {
+               return;
+       }
+
        /*
         * The topic may be unref'ed by the subscription invocation.
         * Make sure we hold onto a reference while dispatching.
index 3d353b311e2bf10813eb0e63385c1d437ae7caa5..bc975fd3da2d0471e57486002bfa599e13bf6c46 100644 (file)
@@ -87,6 +87,35 @@ struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *cachi
        return caching_topic->topic;
 }
 
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+       struct stasis_message_type *type)
+{
+       int res;
+
+       if (!caching_topic) {
+               return -1;
+       }
+
+       /* We wait to accept the stasis specific message types until now so that by default everything
+        * will flow to us.
+        */
+       res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
+       res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
+       res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
+
+       return res;
+}
+
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!caching_topic) {
+               return -1;
+       }
+       return stasis_subscription_set_filter(caching_topic->sub, filter);
+}
+
+
 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
 {
        if (!caching_topic) {
@@ -856,11 +885,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                /* Update the cache */
                snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
                if (snapshots.old || msg_put) {
-                       update = update_create(snapshots.old, msg_put);
-                       if (update) {
-                               stasis_publish(caching_topic->topic, update);
+                       if (stasis_topic_subscribers(caching_topic->topic)) {
+                               update = update_create(snapshots.old, msg_put);
+                               if (update) {
+                                       stasis_publish(caching_topic->topic, update);
+                                       ao2_ref(update, -1);
+                               }
                        }
-                       ao2_cleanup(update);
                } else {
                        ast_debug(1,
                                "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
@@ -873,11 +904,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                                caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
                                        snapshots.aggregate_new);
                        }
-                       update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
-                       if (update) {
-                               stasis_publish(caching_topic->topic, update);
+                       if (stasis_topic_subscribers(caching_topic->topic)) {
+                               update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+                               if (update) {
+                                       stasis_publish(caching_topic->topic, update);
+                                       ao2_ref(update, -1);
+                               }
                        }
-                       ao2_cleanup(update);
                }
 
                ao2_cleanup(snapshots.old);
index f0e34b9558873c0e230016181fd7e237e98a2a5c..04d816463ce84c8baaf27c4073d581b9dce189c7 100644 (file)
@@ -217,3 +217,21 @@ struct stasis_topic *stasis_cp_single_topic_cached(
        }
        return stasis_caching_get_topic(one->topic_cached);
 }
+
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+       struct stasis_message_type *type)
+{
+       if (!one) {
+               return -1;
+       }
+       return stasis_caching_accept_message_type(one->topic_cached, type);
+}
+
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!one) {
+               return -1;
+       }
+       return stasis_caching_set_filter(one->topic_cached, filter);
+}
index 19f4a928fdf1a559628916d076b3462b63eddac5..1fdbe858e3be8e43dcc2aaefcfc86064775d982c 100644 (file)
@@ -39,9 +39,11 @@ struct stasis_message_type {
        struct stasis_message_vtable *vtable;
        char *name;
        unsigned int hash;
+       int id;
 };
 
 static struct stasis_message_vtable null_vtable = {};
+static int message_type_id;
 
 static void message_type_dtor(void *obj)
 {
@@ -78,6 +80,7 @@ int stasis_message_type_create(const char *name,
        }
        type->hash = ast_hashtab_hash_string(name);
        type->vtable = vtable;
+       type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
        *result = type;
 
        return STASIS_MESSAGE_TYPE_SUCCESS;
@@ -93,6 +96,11 @@ unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
        return type->hash;
 }
 
+int stasis_message_type_id(const struct stasis_message_type *type)
+{
+       return type->id;
+}
+
 /*! \internal */
 struct stasis_message {
        /*! Time the message was created */
index 41d426beca577e98abf377ee43b0d938f090731b..41ebc7ea8a9ed99134f695c043c6ae2eb5b8975d 100644 (file)
@@ -235,6 +235,9 @@ static struct stasis_message_router *stasis_message_router_create_internal(
                return NULL;
        }
 
+       /* We need to receive subscription change messages so we know when our subscription goes away */
+       stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
+
        return router;
 }
 
@@ -316,6 +319,14 @@ int stasis_message_router_add(struct stasis_message_router *router,
        }
        ao2_lock(router);
        res = route_table_add(&router->routes, message_type, callback, data);
+       if (!res) {
+               stasis_subscription_accept_message_type(router->subscription, message_type);
+               /* Until a specific message type was added we would already drop the message, so being
+                * selective now doesn't harm us. If we have a default route then we are already forced
+                * to filter nothing and messages will come in regardless.
+                */
+               stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+       }
        ao2_unlock(router);
        return res;
 }
@@ -334,6 +345,10 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
        }
        ao2_lock(router);
        res = route_table_add(&router->cache_routes, message_type, callback, data);
+       if (!res) {
+               stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
+               stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+       }
        ao2_unlock(router);
        return res;
 }
@@ -378,6 +393,9 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
        router->default_route.callback = callback;
        router->default_route.data = data;
        ao2_unlock(router);
+
+       stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
+
        /* While this implementation can never fail, it used to be able to */
        return 0;
 }
index dd2fb75879e2c180f35e26f4913bf88543a1cab9..f9b3e85d26a19e9132c4d0a8159c5b2e2c3168d4 100644 (file)
@@ -868,6 +868,10 @@ static void park_announce_update_cb(void *data, struct stasis_subscription *sub,
                return;
        }
 
+       if (ast_parked_call_type() != stasis_message_type(message)) {
+               return;
+       }
+
        if (payload->event_type != PARKED_CALL) {
                /* We are only concerned with calls parked */
                return;
@@ -954,6 +958,10 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
                return -1;
        }
 
+       stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());
+       stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+
        /* Now for the fun part... park it! */
        ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);
 
index cbc23fac1b2e34590eba1e75942ea3595f582ee5..ee2b5a163bde98b0b13109c544dd6f8094b388a1 100644 (file)
@@ -213,6 +213,9 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
        if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());
+       stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        datastore->data = parked_datastore;
 
index 6d0a4c06cbf34a285cab37f055fb4d996f69c49a..83558ba74ec7b5031f84c9b7ab1795e88f02127d 100644 (file)
@@ -686,6 +686,8 @@ static void parking_manager_enable_stasis(void)
 {
        if (!parking_sub) {
                parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);
+               stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());
+               stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
index c3abbc164b96213e9832f3471b402988d1422d07..f73cd44e4e521092e61c088233242e61c4ef9a96 100644 (file)
@@ -167,6 +167,9 @@ static int load_module(void)
        if (!stasis_rtp_subscription) {
                return AST_MODULE_LOAD_DECLINE;
        }
+       stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());
+       stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());
+       stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return AST_MODULE_LOAD_SUCCESS;
 }
index 4cd892c05bd5bbaee136f6fb2a3774ee9d8f70b8..83bff8893c07619202f45746c9f243aa8e1158ba 100644 (file)
@@ -269,6 +269,9 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
                ao2_ref(mwi_sub, -1);
                mwi_stasis_sub = NULL;
        }
+       stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());
+       stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        return mwi_stasis_sub;
 }
 
@@ -1364,7 +1367,11 @@ static int load_module(void)
                if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                        ast_sip_push_task(NULL, send_initial_notify_all, NULL);
                } else {
-                       stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+                       struct stasis_subscription *sub;
+
+                       sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+                       stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+                       stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index 0d815ad3966276afcd81b64ba7af8e78d0839864..7aba73441db66d4b589fe1843722aca0d837c86c 100644 (file)
@@ -2282,6 +2282,8 @@ static int load_module(void)
 
        network_change_sub = stasis_subscribe(ast_system_topic(),
                network_change_stasis_cb, NULL);
+       stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+       stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return AST_MODULE_LOAD_SUCCESS;
 }
index 220ba0bc5467b9a51517fad36c02985ee9f5e78a..692f9a747906413a27c16a644dbd12292ff9ed12 100644 (file)
@@ -360,6 +360,9 @@ static int asterisk_start_devicestate_publishing(struct ast_sip_outbound_publish
                ao2_ref(datastore, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_device_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);
@@ -435,6 +438,9 @@ static int asterisk_start_mwi_publishing(struct ast_sip_outbound_publish *config
                ao2_ref(datastore, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());
+       stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);
index b5ee15923d15c971de0b35a3668a9942c1428e15..9e8a32bf7961add946c349a70e29cc861cec51a9 100644 (file)
@@ -5567,7 +5567,11 @@ static int load_module(void)
        if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                ast_sip_push_task(NULL, subscription_persistence_load, NULL);
        } else {
-               stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               struct stasis_subscription *sub;
+
+               sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+               stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 
        ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
index 1e6ca7f46eaaeb9323dfb101fe100b110417d90f..3dfaabc445ab8b4dad6d8389aef4a09a5f1888f7 100644 (file)
@@ -686,6 +686,10 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
                        ast_channel_unlock(chan);
 
                        ao2_cleanup(refer->progress);
+               } else {
+                       stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());
+                       stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());
+                       stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index 555ba23c1843d6cb17f495de718ae922964c623a..95429cad3f81f320b2044bd9251572c1bf60dc1e 100644 (file)
@@ -141,6 +141,8 @@ static int load_module(void)
                LOG_SECURITY = -1;
                return AST_MODULE_LOAD_DECLINE;
        }
+       stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());
+       stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        ast_verb(3, "Security Logging Enabled\n");
 
index be09b15ad09c9f6e5308c7ef4da5148f8e30189c..1c80f9efaa221dbe26fec06621039b3692a7f100 100644 (file)
@@ -394,6 +394,9 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
                ao2_ref(sub, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
        ao2_unlock(device_state_subscriptions);
index b72581fa5434d46eff7446fe1fa1f0d73fe537f5..7d032ad5d534d517ae04102380c26efcd92fe132 100644 (file)
@@ -1626,11 +1626,15 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
        if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
                return;
        }
+       stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());
+       stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
                client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
                return;
        }
+       stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_device_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);