if (!device_state_sub) {
err = -1;
}
+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
manager_topic = ast_manager_get_topic();
queue_topic = ast_queue_topic_all();
* knows that we care about it. Then, chan_dahdi will get the MWI from the
* event cache instead of checking the mailbox directly. */
tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+ stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
#ifdef HAVE_DAHDI_LINEREVERSE_VMWI
if (!network_change_sub) {
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
* mailboxes. However, we just grab the events out of the cache when it
* is time to send MWI, since it is only sent with a REGACK. */
peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+ stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
* knows that we care about it. Then, chan_mgcp will get the MWI from the
* event cache instead of checking the mailbox directly. */
e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+ stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
if (!network_change_sub) {
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
mailbox_specific_topic = ast_mwi_topic(mailbox->id);
if (mailbox_specific_topic) {
mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
+ stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());
+ stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());
+ stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
}
mailbox_specific_topic = ast_mwi_topic(l->mailbox);
if (mailbox_specific_topic) {
l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
+ stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
if (!pri->mbox[i].sub) {
ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);
+ } else {
+ stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
#if defined(HAVE_PRI_MWI_V2)
if (ast_strlen_zero(pri->mbox[i].vm_number)) {
STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
};
+/*!
+ * \brief Stasis subscription message filters
+ */
+enum stasis_subscription_message_filter {
+ STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
+ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
+ STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
+};
+
/*!
* \brief Create a new message type.
*
*/
unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
+/*!
+ * \brief Gets the id of a given message type
+ * \param type The type to get the id of.
+ * \return The id
+ * \since 17.0.0
+ */
+int stasis_message_type_id(const struct stasis_message_type *type);
+
/*!
* \brief Check whether a message type is declined
*
*/
const char *stasis_topic_name(const struct stasis_topic *topic);
+/*!
+ * \brief Return the number of subscribers of a topic.
+ * \param topic Topic.
+ * \return Number of subscribers of the topic.
+ * \since 17.0.0
+ */
+size_t stasis_topic_subscribers(const struct stasis_topic *topic);
+
/*!
* \brief Publish a message to a topic's subscribers.
* \param topic Topic.
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12.8.0
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
*/
struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
+/*!
+ * \brief Indicate to a subscription that we are interested in a message type.
+ *
+ * This will cause the subscription to allow the given message type to be
+ * raised to our subscription callback. This enables internal filtering in
+ * the stasis message bus to reduce messages.
+ *
+ * \param subscription Subscription to add message type to.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ *
+ * \note If you are wanting to use stasis_final_message you will need to accept
+ * \ref stasis_subscription_change_type as a message type.
+ *
+ * \note Until the subscription is set to selective filtering it is possible for it
+ * to receive messages of message types that would not normally be accepted.
+ */
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+ const struct stasis_message_type *type);
+
+/*!
+ * \brief Indicate to a subscription that we are not interested in a message type.
+ *
+ * \param subscription Subscription to remove message type from.
+ * \param type The message type we don't wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+ const struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a subscription
+ *
+ * This will cause the subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param subscription Subscription that should receive all messages.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+ enum stasis_subscription_message_filter filter);
+
/*!
* \brief Cancel a subscription.
*
struct stasis_topic *stasis_caching_get_topic(
struct stasis_caching_topic *caching_topic);
+/*!
+ * \brief Indicate to a caching topic that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param caching_topic The caching topic.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+ struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param caching_topic The caching topic.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+ enum stasis_subscription_message_filter filter);
+
/*!
* \brief A message which instructs the caching topic to remove an entry from
* its cache.
struct stasis_topic *stasis_cp_single_topic_cached(
struct stasis_cp_single *one);
+/*!
+ * \brief Indicate to an instance that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param one One side of the cache pattern.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+ struct stasis_message_type *type);
+
+/*!
+ * \brief Set the message type filtering level on a cache
+ *
+ * This will cause the underlying subscription to filter messages according to the
+ * provided filter level. For example if selective is used then only
+ * messages matching those provided to \ref stasis_subscription_accept_message_type
+ * will be raised to the subscription callback.
+ *
+ * \param one One side of the cache pattern.
+ * \param filter What filter to use
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+ enum stasis_subscription_message_filter filter);
+
#endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */
* \retval -1 on failure
*
* \since 12
+ *
+ * \note Setting a default callback will automatically cause the underlying
+ * subscription to receive all messages and not be filtered. If filtering is
+ * desired then a specific route for each message type should be provided.
*/
int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
cc_unref(generic_list, "Failed to subscribe to device state");
return NULL;
}
+ stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());
+ stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
generic_list->current_state = ast_device_state(monitor->interface->device_name);
ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");
return generic_list;
if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
return -1;
}
+ stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());
+ stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());
+ stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cc_ref(agent, "Ref agent for subscription");
return 0;
}
if (!device_state_topic_cached) {
return -1;
}
+ stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());
+ stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
devstate_change_cb, NULL);
ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
return -1;
}
+ stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());
+ stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return 0;
}
endpoint_publish_snapshot(endpoint);
}
-static void endpoint_default(void *data,
+static void endpoint_subscription_change(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
if (!endpoint->topics) {
return NULL;
}
+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
r |= stasis_message_router_add(endpoint->router,
stasis_cache_clear_type(), endpoint_cache_clear,
endpoint);
- r |= stasis_message_router_set_default(endpoint->router,
- endpoint_default, endpoint);
+ r |= stasis_message_router_add(endpoint->router,
+ stasis_subscription_change_type(), endpoint_subscription_change,
+ endpoint);
if (r) {
return NULL;
}
if (!endpoint->topics) {
return NULL;
}
+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ao2_link(tech_endpoints, endpoint);
}
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
return -1;
}
+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+ stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());
+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {
return -1;
}
+ stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());
+ stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return 0;
}
if (!presence_state_topic_cached) {
return -1;
}
+ stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());
+ stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return 0;
}
return topic->name;
}
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+ return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
/*! Flag set when final message for sub has been processed.
* Be sure join_lock is held before reading/setting. */
int final_message_processed;
+
+ /*! The message types this subscription is accepting */
+ AST_VECTOR(, char) accepted_message_types;
+ /*! The message filter currently in use */
+ enum stasis_subscription_message_filter filter;
};
static void subscription_dtor(void *obj)
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
ast_cond_destroy(&sub->join_cond);
+
+ AST_VECTOR_FREE(&sub->accepted_message_types);
}
/*!
static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_message *message)
{
+ unsigned int final = stasis_subscription_final_message(sub, message);
+ int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
/* Notify that the final message has been received */
- if (stasis_subscription_final_message(sub, message)) {
+ if (final) {
ao2_lock(sub);
sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
}
- /* Since sub is mostly immutable, no need to lock sub */
- sub->callback(sub->data, sub, message);
+ if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+ (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+ /* Since sub is mostly immutable, no need to lock sub */
+ sub->callback(sub->data, sub, message);
+ }
/* Notify that the final message has been processed */
- if (stasis_subscription_final_message(sub, message)) {
+ if (final) {
ao2_lock(sub);
sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond);
sub->callback = callback;
sub->data = data;
ast_cond_init(&sub->join_cond, NULL);
+ sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+ AST_VECTOR_INIT(&sub->accepted_message_types, 0);
if (topic_add_subscription(topic, sub) != 0) {
ao2_ref(sub, -1);
return res;
}
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+ const struct stasis_message_type *type)
+{
+ if (!subscription) {
+ return -1;
+ }
+
+ ast_assert(type != NULL);
+ ast_assert(stasis_message_type_name(type) != NULL);
+
+ if (!type || !stasis_message_type_name(type)) {
+ /* Filtering is unreliable as this message type is not yet initialized
+ * so force all messages through.
+ */
+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+ return 0;
+ }
+
+ ao2_lock(subscription->topic);
+ if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+ /* We do this for the same reason as above. The subscription can still operate, so allow
+ * it to do so by forcing all messages through.
+ */
+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+ }
+ ao2_unlock(subscription->topic);
+
+ return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+ const struct stasis_message_type *type)
+{
+ if (!subscription) {
+ return -1;
+ }
+
+ ast_assert(type != NULL);
+ ast_assert(stasis_message_type_name(type) != NULL);
+
+ if (!type || !stasis_message_type_name(type)) {
+ return 0;
+ }
+
+ ao2_lock(subscription->topic);
+ if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+ /* The memory is already allocated so this can't fail */
+ AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+ }
+ ao2_unlock(subscription->topic);
+
+ return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+ enum stasis_subscription_message_filter filter)
+{
+ if (!subscription) {
+ return -1;
+ }
+
+ ao2_lock(subscription->topic);
+ if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+ subscription->filter = filter;
+ }
+ ao2_unlock(subscription->topic);
+
+ return 0;
+}
+
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
struct stasis_message *message,
int synchronous)
{
+ /* Determine if this subscription is interested in this message. Note that final
+ * messages are special and are always invoked on the subscription.
+ */
+ if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+ int message_type_id = stasis_message_type_id(stasis_message_type(message));
+ if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+ !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+ !stasis_subscription_final_message(sub, message)) {
+ return;
+ }
+ }
+
if (!sub->mailbox) {
/* Dispatch directly */
subscription_invoke(sub, message);
ast_assert(topic != NULL);
ast_assert(message != NULL);
+ /* If there are no subscribers don't bother */
+ if (!stasis_topic_subscribers(topic)) {
+ return;
+ }
+
/*
* The topic may be unref'ed by the subscription invocation.
* Make sure we hold onto a reference while dispatching.
return caching_topic->topic;
}
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+ struct stasis_message_type *type)
+{
+ int res;
+
+ if (!caching_topic) {
+ return -1;
+ }
+
+ /* We wait to accept the stasis specific message types until now so that by default everything
+ * will flow to us.
+ */
+ res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
+ res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
+ res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
+
+ return res;
+}
+
+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
+ enum stasis_subscription_message_filter filter)
+{
+ if (!caching_topic) {
+ return -1;
+ }
+ return stasis_subscription_set_filter(caching_topic->sub, filter);
+}
+
+
struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
{
if (!caching_topic) {
/* Update the cache */
snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
if (snapshots.old || msg_put) {
- update = update_create(snapshots.old, msg_put);
- if (update) {
- stasis_publish(caching_topic->topic, update);
+ if (stasis_topic_subscribers(caching_topic->topic)) {
+ update = update_create(snapshots.old, msg_put);
+ if (update) {
+ stasis_publish(caching_topic->topic, update);
+ ao2_ref(update, -1);
+ }
}
- ao2_cleanup(update);
} else {
ast_debug(1,
"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
snapshots.aggregate_new);
}
- update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
- if (update) {
- stasis_publish(caching_topic->topic, update);
+ if (stasis_topic_subscribers(caching_topic->topic)) {
+ update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+ if (update) {
+ stasis_publish(caching_topic->topic, update);
+ ao2_ref(update, -1);
+ }
}
- ao2_cleanup(update);
}
ao2_cleanup(snapshots.old);
}
return stasis_caching_get_topic(one->topic_cached);
}
+
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+ struct stasis_message_type *type)
+{
+ if (!one) {
+ return -1;
+ }
+ return stasis_caching_accept_message_type(one->topic_cached, type);
+}
+
+int stasis_cp_single_set_filter(struct stasis_cp_single *one,
+ enum stasis_subscription_message_filter filter)
+{
+ if (!one) {
+ return -1;
+ }
+ return stasis_caching_set_filter(one->topic_cached, filter);
+}
struct stasis_message_vtable *vtable;
char *name;
unsigned int hash;
+ int id;
};
static struct stasis_message_vtable null_vtable = {};
+static int message_type_id;
static void message_type_dtor(void *obj)
{
}
type->hash = ast_hashtab_hash_string(name);
type->vtable = vtable;
+ type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
*result = type;
return STASIS_MESSAGE_TYPE_SUCCESS;
return type->hash;
}
+int stasis_message_type_id(const struct stasis_message_type *type)
+{
+ return type->id;
+}
+
/*! \internal */
struct stasis_message {
/*! Time the message was created */
return NULL;
}
+ /* We need to receive subscription change messages so we know when our subscription goes away */
+ stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
+
return router;
}
}
ao2_lock(router);
res = route_table_add(&router->routes, message_type, callback, data);
+ if (!res) {
+ stasis_subscription_accept_message_type(router->subscription, message_type);
+ /* Until a specific message type was added we would already drop the message, so being
+ * selective now doesn't harm us. If we have a default route then we are already forced
+ * to filter nothing and messages will come in regardless.
+ */
+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+ }
ao2_unlock(router);
return res;
}
}
ao2_lock(router);
res = route_table_add(&router->cache_routes, message_type, callback, data);
+ if (!res) {
+ stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+ }
ao2_unlock(router);
return res;
}
router->default_route.callback = callback;
router->default_route.data = data;
ao2_unlock(router);
+
+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
+
/* While this implementation can never fail, it used to be able to */
return 0;
}
return;
}
+ if (ast_parked_call_type() != stasis_message_type(message)) {
+ return;
+ }
+
if (payload->event_type != PARKED_CALL) {
/* We are only concerned with calls parked */
return;
return -1;
}
+ stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());
+ stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());
+ stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+
/* Now for the fun part... park it! */
ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);
if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
return -1;
}
+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());
+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());
+ stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
datastore->data = parked_datastore;
{
if (!parking_sub) {
parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);
+ stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());
+ stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
if (!stasis_rtp_subscription) {
return AST_MODULE_LOAD_DECLINE;
}
+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());
+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());
+ stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return AST_MODULE_LOAD_SUCCESS;
}
ao2_ref(mwi_sub, -1);
mwi_stasis_sub = NULL;
}
+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());
+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());
+ stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return mwi_stasis_sub;
}
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
} else {
- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ struct stasis_subscription *sub;
+
+ sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return AST_MODULE_LOAD_SUCCESS;
}
ao2_ref(datastore, -1);
return -1;
}
+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());
+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());
+ stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);
ao2_ref(datastore, -1);
return -1;
}
+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());
+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());
+ stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, subscription_persistence_load, NULL);
} else {
- stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+ struct stasis_subscription *sub;
+
+ sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
ast_channel_unlock(chan);
ao2_cleanup(refer->progress);
+ } else {
+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());
+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());
+ stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
LOG_SECURITY = -1;
return AST_MODULE_LOAD_DECLINE;
}
+ stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());
+ stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ast_verb(3, "Security Logging Enabled\n");
ao2_ref(sub, -1);
return -1;
}
+ stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
+ stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
+ stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
ao2_unlock(device_state_subscriptions);
if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
return;
}
+ stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());
+ stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
return;
}
+ stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());
+ stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);