]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
stasis: Add internal filtering of messages.
authorJoshua Colp <jcolp@digium.com>
Sun, 23 Sep 2018 20:50:01 +0000 (17:50 -0300)
committerJoshua Colp <jcolp@digium.com>
Sun, 18 Nov 2018 20:07:56 +0000 (14:07 -0600)
This change adds the ability for subscriptions to indicate
which message types they are interested in accepting. By
doing so the filtering is done before being dispatched
to the subscriber, reducing the amount of work that has
to be done.

This is optional and if a subscriber does not add
message types they wish to accept and set the subscription
to selective filtering the previous behavior is preserved
and they receive all messages.

There is also the ability to explicitly force the reception
of all messages for cases such as AMI or ARI where a large
number of messages are expected that are then generically
converted into a different format.

ASTERISK-28103

Change-Id: I99bee23895baa0a117985d51683f7963b77aa190

33 files changed:
apps/app_queue.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
include/asterisk/stasis.h
include/asterisk/stasis_cache_pattern.h
include/asterisk/stasis_message_router.h
main/ccss.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/pbx.c
main/presencestate.c
main/stasis.c
main/stasis_cache.c
main/stasis_cache_pattern.c
main/stasis_message.c
main/stasis_message_router.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/parking/parking_manager.c
res/res_hep_rtcp.c
res/res_pjsip_mwi.c
res/res_pjsip_outbound_registration.c
res/res_pjsip_publish_asterisk.c
res/res_pjsip_pubsub.c
res/res_pjsip_refer.c
res/res_security_log.c
res/res_stasis_device_state.c
res/res_xmpp.c

index ddbd1bdfdd170c7a0079c0880aaad2031a7b4ba6..ce00b5e67630c7981d12afab55de247f0fc1331b 100644 (file)
@@ -11007,6 +11007,8 @@ static int load_module(void)
        if (!device_state_sub) {
                err = -1;
        }
+       stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        manager_topic = ast_manager_get_topic();
        queue_topic = ast_queue_topic_all();
index 3b05d721709679e454e14901b9159952f0543aa9..c40e084c91a0e254b99a7dd1e6b53ecce7b233cb 100644 (file)
@@ -12690,6 +12690,8 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
                                 * knows that we care about it.  Then, chan_dahdi will get the MWI from the
                                 * event cache instead of checking the mailbox directly. */
                                tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                               stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());
+                               stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                        }
                }
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
index 7a369e1a749dd19ef157700c14d43a2ca60bbadb..d534c0003ffe45a3d2488ddf548acd6c688b073a 100644 (file)
@@ -1463,6 +1463,8 @@ static void network_change_stasis_subscribe(void)
        if (!network_change_sub) {
                network_change_sub = stasis_subscribe(ast_system_topic(),
                        network_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+               stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -1476,6 +1478,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -13100,6 +13104,8 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
                         * mailboxes.  However, we just grab the events out of the cache when it
                         * is time to send MWI, since it is only sent with a REGACK. */
                        peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                       stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index c9ed1e2f5981bcaa58303e484a817ba172ea9185..5b3089b75fb994cff91c9d7636dab9a390185ecb 100644 (file)
@@ -4234,6 +4234,8 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
                                                 * knows that we care about it.  Then, chan_mgcp will get the MWI from the
                                                 * event cache instead of checking the mailbox directly. */
                                                e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+                                               stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());
+                                               stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                                        }
                                }
                                snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
index 97ce93c554425196b7ced17d151bd291da3603e2..49d5f64a5b261f85465a553ad8454a06a3866eb2 100644 (file)
@@ -17384,6 +17384,8 @@ static void network_change_stasis_subscribe(void)
        if (!network_change_sub) {
                network_change_sub = stasis_subscribe(ast_system_topic(),
                        network_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+               stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
@@ -17397,6 +17399,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 
 }
@@ -28163,6 +28167,9 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
                mailbox_specific_topic = ast_mwi_topic(mailbox->id);
                if (mailbox_specific_topic) {
                        mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
+                       stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());
+                       stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());
+                       stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 }
index 43cc32a2be4916a4857b08797ad3e9e8c0249b26..9c63da9027a27e564913875c6d36a5d523c9b4e7 100644 (file)
@@ -8330,6 +8330,8 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
                mailbox_specific_topic = ast_mwi_topic(l->mailbox);
                if (mailbox_specific_topic) {
                        l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
+                       stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index f371fbf3678d7fd66bf1206da27676cd616050cb..682abf37a4bb406d4f4b72227bd03f856dbc8bfe 100644 (file)
@@ -9143,6 +9143,9 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
                if (!pri->mbox[i].sub) {
                        ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
                                sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);
+               } else {
+                       stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());
+                       stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
 #if defined(HAVE_PRI_MWI_V2)
                if (ast_strlen_zero(pri->mbox[i].vm_number)) {
index 25faa467b17d685dda5cc33af6d5506af8db94bc..a9d5a74c8cb9f6ba05bab1af5023a782a6e9ba5f 100644 (file)
@@ -293,6 +293,15 @@ enum stasis_message_type_result {
        STASIS_MESSAGE_TYPE_DECLINED,   /*!< Message type was not created due to configuration */
 };
 
+/*!
+ * \brief Stasis subscription message filters
+ */
+enum stasis_subscription_message_filter {
+       STASIS_SUBSCRIPTION_FILTER_NONE = 0,    /*!< No filter is in place, all messages are raised */
+       STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
+       STASIS_SUBSCRIPTION_FILTER_SELECTIVE,   /*!< Only messages of allowed message types are raised */
+};
+
 /*!
  * \brief Create a new message type.
  *
@@ -328,6 +337,14 @@ const char *stasis_message_type_name(const struct stasis_message_type *type);
  */
 unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
 
+/*!
+ * \brief Gets the id of a given message type
+ * \param type The type to get the id of.
+ * \return The id
+ * \since 17.0.0
+ */
+int stasis_message_type_id(const struct stasis_message_type *type);
+
 /*!
  * \brief Check whether a message type is declined
  *
@@ -500,6 +517,14 @@ struct stasis_topic *stasis_topic_create(const char *name);
  */
 const char *stasis_topic_name(const struct stasis_topic *topic);
 
+/*!
+ * \brief Return the number of subscribers of a topic.
+ * \param topic Topic.
+ * \return Number of subscribers of the topic.
+ * \since 17.0.0
+ */
+size_t stasis_topic_subscribers(const struct stasis_topic *topic);
+
 /*!
  * \brief Publish a message to a topic's subscribers.
  * \param topic Topic.
@@ -569,6 +594,10 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
@@ -594,10 +623,68 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12.8.0
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
        stasis_subscription_cb callback, void *data);
 
+/*!
+ * \brief Indicate to a subscription that we are interested in a message type.
+ *
+ * This will cause the subscription to allow the given message type to be
+ * raised to our subscription callback. This enables internal filtering in
+ * the stasis message bus to reduce messages.
+ *
+ * \param subscription Subscription to add message type to.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ *
+ * \note If you are wanting to use stasis_final_message you will need to accept
+ * \ref stasis_subscription_change_type as a message type.
+ *
+ * \note Until the subscription is set to selective filtering it is possible for it
+ * to receive messages of message types that would not normally be accepted.
+ */
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type);
+
+/*!
+ * \brief Indicate to a subscription that we are not interested in a message type.
+ *
+ * \param subscription Subscription to remove message type from.
+ * \param type The message type we don't wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a subscription
+ *
+ * This will cause the subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param subscription Subscription that should receive all messages.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter);
+
 /*!
  * \brief Cancel a subscription.
  *
@@ -1052,6 +1139,41 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
 struct stasis_topic *stasis_caching_get_topic(
        struct stasis_caching_topic *caching_topic);
 
+/*!
+ * \brief Indicate to a caching topic that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param caching_topic The caching topic.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+       struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param caching_topic The caching topic.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+       enum stasis_subscription_message_filter filter);
+
 /*!
  * \brief A message which instructs the caching topic to remove an entry from
  * its cache.
index e61d3e931cfdc52fcebbfdf63f8f233869e000bc..514d62e695a509a92e9984770c463761e2f603f7 100644 (file)
@@ -169,4 +169,39 @@ struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one);
 struct stasis_topic *stasis_cp_single_topic_cached(
        struct stasis_cp_single *one);
 
+/*!
+ * \brief Indicate to an instance that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param one One side of the cache pattern.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+       struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param one One side of the cache pattern.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+       enum stasis_subscription_message_filter filter);
+
 #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */
index 50270a788b4c89537d5ce9ad3e26f863ff1408d5..8dcdfcc913ec6b9e98898cd9e4566a8e57239123 100644 (file)
@@ -233,6 +233,10 @@ void stasis_message_router_remove_cache_update(
  * \retval -1 on failure
  *
  * \since 12
+ *
+ * \note Setting a default callback will automatically cause the underlying
+ * subscription to receive all messages and not be filtered. If filtering is
+ * desired then a specific route for each message type should be provided.
  */
 int stasis_message_router_set_default(struct stasis_message_router *router,
                                      stasis_subscription_cb callback,
index 7ff77fdab255b3f8d051e474b835f42aaf046d32..205dc1b063854a69a2e992f4ef59b3da294e5409 100644 (file)
@@ -1439,6 +1439,8 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_
                cc_unref(generic_list, "Failed to subscribe to device state");
                return NULL;
        }
+       stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        generic_list->current_state = ast_device_state(monitor->interface->device_name);
        ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");
        return generic_list;
@@ -2810,6 +2812,9 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent)
        if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        cc_ref(agent, "Ref agent for subscription");
        return 0;
 }
index b6650457a8f05ed90a7794a667e97fb07fc1a23b..6706725e5e377fcfa183521a554a61c6071d73a4 100644 (file)
@@ -945,6 +945,8 @@ int devstate_init(void)
        if (!device_state_topic_cached) {
                return -1;
        }
+       stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());
+       stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
                devstate_change_cb, NULL);
@@ -952,6 +954,8 @@ int devstate_init(void)
                ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
                return -1;
        }
+       stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return 0;
 }
index 88506a4c84af15dc9df698ed0acce41e4a5a85a8..69d022fb0a68e23e6711104e08350413c5d995cc 100644 (file)
@@ -204,7 +204,7 @@ static void endpoint_cache_clear(void *data,
        endpoint_publish_snapshot(endpoint);
 }
 
-static void endpoint_default(void *data,
+static void endpoint_subscription_change(void *data,
        struct stasis_subscription *sub,
        struct stasis_message *message)
 {
@@ -265,6 +265,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                if (!endpoint->topics) {
                        return NULL;
                }
+               stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+               stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
                endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
                if (!endpoint->router) {
@@ -273,8 +275,9 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                r |= stasis_message_router_add(endpoint->router,
                        stasis_cache_clear_type(), endpoint_cache_clear,
                        endpoint);
-               r |= stasis_message_router_set_default(endpoint->router,
-                       endpoint_default, endpoint);
+               r |= stasis_message_router_add(endpoint->router,
+                       stasis_subscription_change_type(), endpoint_subscription_change,
+                       endpoint);
                if (r) {
                        return NULL;
                }
@@ -290,6 +293,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
                if (!endpoint->topics) {
                        return NULL;
                }
+               stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+               stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
                ao2_link(tech_endpoints, endpoint);
        }
index e5ca57122ba42dc49bf9ac7caa286418d7e099d3..5b4cc3af34bbd9b5d2fcb9a010faa6ef7ebaf444 100644 (file)
@@ -1521,6 +1521,8 @@ static void acl_change_stasis_subscribe(void)
        if (!acl_change_sub) {
                acl_change_sub = stasis_subscribe(ast_security_topic(),
                        acl_change_stasis_cb, NULL);
+               stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+               stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
index c87496b7307b761590085c65054edac4419f7f1c..434173d678bf1695bef170e320673eb8b04ccbae 100644 (file)
@@ -8339,10 +8339,15 @@ int load_pbx(void)
        if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());
+       stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());
+       stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return 0;
 }
index 56c903cf288e675867376b324a6702a51e257850..ff4934ade132d0c6f9e59b38ef50ac5fa8ca4aab 100644 (file)
@@ -389,6 +389,8 @@ int ast_presence_state_engine_init(void)
        if (!presence_state_topic_cached) {
                return -1;
        }
+       stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());
+       stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return 0;
 }
index 26e404c836517d1e016791037af898d3e50a951f..d054897a9c1e1ed4d23a7925ccd5edadc9f3d19a 100644 (file)
@@ -372,6 +372,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
        return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+       return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
@@ -393,6 +398,11 @@ struct stasis_subscription {
        /*! Flag set when final message for sub has been processed.
         *  Be sure join_lock is held before reading/setting. */
        int final_message_processed;
+
+       /*! The message types this subscription is accepting */
+       AST_VECTOR(, char) accepted_message_types;
+       /*! The message filter currently in use */
+       enum stasis_subscription_message_filter filter;
 };
 
 static void subscription_dtor(void *obj)
@@ -411,6 +421,8 @@ static void subscription_dtor(void *obj)
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
        ast_cond_destroy(&sub->join_cond);
+
+       AST_VECTOR_FREE(&sub->accepted_message_types);
 }
 
 /*!
@@ -422,19 +434,25 @@ static void subscription_dtor(void *obj)
 static void subscription_invoke(struct stasis_subscription *sub,
                                  struct stasis_message *message)
 {
+       unsigned int final = stasis_subscription_final_message(sub, message);
+       int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
        /* Notify that the final message has been received */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_rxed = 1;
                ast_cond_signal(&sub->join_cond);
                ao2_unlock(sub);
        }
 
-       /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, message);
+       if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+               (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+               /* Since sub is mostly immutable, no need to lock sub */
+               sub->callback(sub->data, sub, message);
+       }
 
        /* Notify that the final message has been processed */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_processed = 1;
                ast_cond_signal(&sub->join_cond);
@@ -502,6 +520,8 @@ struct stasis_subscription *internal_stasis_subscribe(
        sub->callback = callback;
        sub->data = data;
        ast_cond_init(&sub->join_cond, NULL);
+       sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+       AST_VECTOR_INIT(&sub->accepted_message_types, 0);
 
        if (topic_add_subscription(topic, sub) != 0) {
                ao2_ref(sub, -1);
@@ -588,6 +608,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
        return res;
 }
 
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               /* Filtering is unreliable as this message type is not yet initialized
+                * so force all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+               /* We do this for the same reason as above. The subscription can still operate, so allow
+                * it to do so by forcing all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+               /* The memory is already allocated so this can't fail */
+               AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ao2_lock(subscription->topic);
+       if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+               subscription->filter = filter;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
@@ -783,6 +873,18 @@ static void dispatch_message(struct stasis_subscription *sub,
        struct stasis_message *message,
        int synchronous)
 {
+       /* Determine if this subscription is interested in this message. Note that final
+        * messages are special and are always invoked on the subscription.
+        */
+       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+               int message_type_id = stasis_message_type_id(stasis_message_type(message));
+               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+                       !stasis_subscription_final_message(sub, message)) {
+                       return;
+               }
+       }
+
        if (!sub->mailbox) {
                /* Dispatch directly */
                subscription_invoke(sub, message);
@@ -842,6 +944,11 @@ static void publish_msg(struct stasis_topic *topic,
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
 
+       /* If there are no subscribers don't bother */
+       if (!stasis_topic_subscribers(topic)) {
+               return;
+       }
+
        /*
         * The topic may be unref'ed by the subscription invocation.
         * Make sure we hold onto a reference while dispatching.
index 1adfc0ee5f9624d3173bfb39fe2c2068e80f8e0f..fd560b00f48f55d0fa4fce598a8c1e585207b6f0 100644 (file)
@@ -89,6 +89,35 @@ struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *cachi
        return caching_topic->topic;
 }
 
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+       struct stasis_message_type *type)
+{
+       int res;
+
+       if (!caching_topic) {
+               return -1;
+       }
+
+       /* We wait to accept the stasis specific message types until now so that by default everything
+        * will flow to us.
+        */
+       res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
+       res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
+       res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
+
+       return res;
+}
+
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!caching_topic) {
+               return -1;
+       }
+       return stasis_subscription_set_filter(caching_topic->sub, filter);
+}
+
+
 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
 {
        if (!caching_topic) {
@@ -858,11 +887,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                /* Update the cache */
                snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
                if (snapshots.old || msg_put) {
-                       update = update_create(snapshots.old, msg_put);
-                       if (update) {
-                               stasis_publish(caching_topic->topic, update);
+                       if (stasis_topic_subscribers(caching_topic->topic)) {
+                               update = update_create(snapshots.old, msg_put);
+                               if (update) {
+                                       stasis_publish(caching_topic->topic, update);
+                                       ao2_ref(update, -1);
+                               }
                        }
-                       ao2_cleanup(update);
                } else {
                        ast_debug(1,
                                "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
@@ -875,11 +906,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                                caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
                                        snapshots.aggregate_new);
                        }
-                       update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
-                       if (update) {
-                               stasis_publish(caching_topic->topic, update);
+                       if (stasis_topic_subscribers(caching_topic->topic)) {
+                               update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+                               if (update) {
+                                       stasis_publish(caching_topic->topic, update);
+                                       ao2_ref(update, -1);
+                               }
                        }
-                       ao2_cleanup(update);
                }
 
                ao2_cleanup(snapshots.old);
index 2a2ea44e5efda3cc9b3b4e12a13fa871800b3ffc..c2ea76622063512948e9fe8dca944f26b3e80590 100644 (file)
@@ -219,3 +219,21 @@ struct stasis_topic *stasis_cp_single_topic_cached(
        }
        return stasis_caching_get_topic(one->topic_cached);
 }
+
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+       struct stasis_message_type *type)
+{
+       if (!one) {
+               return -1;
+       }
+       return stasis_caching_accept_message_type(one->topic_cached, type);
+}
+
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!one) {
+               return -1;
+       }
+       return stasis_caching_set_filter(one->topic_cached, filter);
+}
index ef03d1389a6117cc4287e1f0bf78ec103e2f2166..2685a43f4430da511ebc0ab3729c54b234b24171 100644 (file)
@@ -41,9 +41,11 @@ struct stasis_message_type {
        struct stasis_message_vtable *vtable;
        char *name;
        unsigned int hash;
+       int id;
 };
 
 static struct stasis_message_vtable null_vtable = {};
+static int message_type_id;
 
 static void message_type_dtor(void *obj)
 {
@@ -80,6 +82,7 @@ int stasis_message_type_create(const char *name,
        }
        type->hash = ast_hashtab_hash_string(name);
        type->vtable = vtable;
+       type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
        *result = type;
 
        return STASIS_MESSAGE_TYPE_SUCCESS;
@@ -95,6 +98,11 @@ unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
        return type->hash;
 }
 
+int stasis_message_type_id(const struct stasis_message_type *type)
+{
+       return type->id;
+}
+
 /*! \internal */
 struct stasis_message {
        /*! Time the message was created */
index 498ddd6c2f9f2ea51ff1c096efdd0b20076ad8c2..e9aebe8a6027b5911daea7b3d7b51e7acd4ad96d 100644 (file)
@@ -237,6 +237,9 @@ static struct stasis_message_router *stasis_message_router_create_internal(
                return NULL;
        }
 
+       /* We need to receive subscription change messages so we know when our subscription goes away */
+       stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
+
        return router;
 }
 
@@ -318,6 +321,14 @@ int stasis_message_router_add(struct stasis_message_router *router,
        }
        ao2_lock(router);
        res = route_table_add(&router->routes, message_type, callback, data);
+       if (!res) {
+               stasis_subscription_accept_message_type(router->subscription, message_type);
+               /* Until a specific message type was added we would already drop the message, so being
+                * selective now doesn't harm us. If we have a default route then we are already forced
+                * to filter nothing and messages will come in regardless.
+                */
+               stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+       }
        ao2_unlock(router);
        return res;
 }
@@ -336,6 +347,10 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
        }
        ao2_lock(router);
        res = route_table_add(&router->cache_routes, message_type, callback, data);
+       if (!res) {
+               stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
+               stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+       }
        ao2_unlock(router);
        return res;
 }
@@ -380,6 +395,9 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
        router->default_route.callback = callback;
        router->default_route.data = data;
        ao2_unlock(router);
+
+       stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
+
        /* While this implementation can never fail, it used to be able to */
        return 0;
 }
index 99999c9e2d675d15f8c7020b69cf85917896e141..fea598cba93b1f06856e94d6e3450ae0e36d6440 100644 (file)
@@ -870,6 +870,10 @@ static void park_announce_update_cb(void *data, struct stasis_subscription *sub,
                return;
        }
 
+       if (ast_parked_call_type() != stasis_message_type(message)) {
+               return;
+       }
+
        if (payload->event_type != PARKED_CALL) {
                /* We are only concerned with calls parked */
                return;
@@ -956,6 +960,10 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
                return -1;
        }
 
+       stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());
+       stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+
        /* Now for the fun part... park it! */
        ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);
 
index b4884dbfb637b016993d6610eae5bec6f541e759..cf5cc721db0470ea776907906b27f9575496cdb2 100644 (file)
@@ -195,6 +195,9 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
        if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
                return -1;
        }
+       stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());
+       stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        datastore->data = parked_datastore;
 
index ed28164feb652c4a8787a5fae7b29bc4a59341cd..5919b995d9465526c2977e4ec77ab742cbb1d31c 100644 (file)
@@ -688,6 +688,8 @@ static void parking_manager_enable_stasis(void)
 {
        if (!parking_sub) {
                parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);
+               stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());
+               stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 }
 
index d799b46201b1ef3c6f0fdb2a648c2c582f870bd6..418b63e00b3081003822c9e3603bc260af709d3c 100644 (file)
@@ -169,6 +169,9 @@ static int load_module(void)
        if (!stasis_rtp_subscription) {
                return AST_MODULE_LOAD_DECLINE;
        }
+       stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());
+       stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());
+       stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return AST_MODULE_LOAD_SUCCESS;
 }
index f2ddf576e261c19b381d0b1c8d46767ab6de1134..f8c2392ec384efbed6a2f2ce087fa0e03a370d80 100644 (file)
@@ -269,6 +269,9 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
                ao2_ref(mwi_sub, -1);
                mwi_stasis_sub = NULL;
        }
+       stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());
+       stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        return mwi_stasis_sub;
 }
 
@@ -1366,7 +1369,11 @@ static int load_module(void)
                if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                        ast_sip_push_task(NULL, send_initial_notify_all, NULL);
                } else {
-                       stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+                       struct stasis_subscription *sub;
+
+                       sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+                       stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+                       stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index 4d80437aa6803751e1d43d15b0988b6f4c5d075b..c42f59e50da30f33aed231c270e2a2f75218ef84 100644 (file)
@@ -2285,6 +2285,8 @@ static int load_module(void)
 
        network_change_sub = stasis_subscribe(ast_system_topic(),
                network_change_stasis_cb, NULL);
+       stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+       stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        return AST_MODULE_LOAD_SUCCESS;
 }
index 53ee60fe4a3135a44d84d382269d6b36e5546845..2271d8bddc3ae666b436143469491b3c6b01789b 100644 (file)
@@ -360,6 +360,9 @@ static int asterisk_start_devicestate_publishing(struct ast_sip_outbound_publish
                ao2_ref(datastore, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_device_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);
@@ -435,6 +438,9 @@ static int asterisk_start_mwi_publishing(struct ast_sip_outbound_publish *config
                ao2_ref(datastore, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());
+       stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());
+       stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);
index 1c0145cff943678b75517971915f05d0e62db6aa..eb4545b1133bf51d1099f6b6df936a5b1a082a54 100644 (file)
@@ -5634,7 +5634,11 @@ static int load_module(void)
        if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
                ast_sip_push_task(NULL, subscription_persistence_load, NULL);
        } else {
-               stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               struct stasis_subscription *sub;
+
+               sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+               stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+               stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
        }
 
        ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
index b45b5a5883ae417c40009020e2dff5868dd20696..9b35c6aabab3f92f830f38698676e9b38786483d 100644 (file)
@@ -686,6 +686,10 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
                        ast_channel_unlock(chan);
 
                        ao2_cleanup(refer->progress);
+               } else {
+                       stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());
+                       stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());
+                       stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
                }
        }
 
index c3fb3cfd2de38640786a17d1df48bf3dcc7c7e15..760d1558329214c150273813b876f577922fd891 100644 (file)
@@ -143,6 +143,8 @@ static int load_module(void)
                LOG_SECURITY = -1;
                return AST_MODULE_LOAD_DECLINE;
        }
+       stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());
+       stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        ast_verb(3, "Security Logging Enabled\n");
 
index 34c77d9f095401d451527e21abb07f33a98198f0..fbdfb3d52505a16c668684684975ca4c8327d3c3 100644 (file)
@@ -396,6 +396,9 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
                ao2_ref(sub, -1);
                return -1;
        }
+       stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
+       stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
+       stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
        ao2_unlock(device_state_subscriptions);
index 41f89961ce63dd581cdee4ae69971aa77b9156b4..a85892fc850ccc7ada6b4180f6d74c35fba0a80c 100644 (file)
@@ -1628,11 +1628,15 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
        if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
                return;
        }
+       stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());
+       stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
                client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
                return;
        }
+       stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());
+       stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
 
        cached = stasis_cache_dump(ast_device_state_cache(), NULL);
        ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);