snapshot_get_id id_fn;
cache_aggregate_calc_fn aggregate_calc_fn;
cache_aggregate_publish_fn aggregate_publish_fn;
+ int registered;
};
/*! \internal */
* be bad. */
ast_assert(stasis_subscription_is_done(caching_topic->sub));
+ ao2_container_unregister(stasis_topic_name(caching_topic->topic));
+
ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
}
msg_type = stasis_message_type(message);
- if (stasis_cache_clear_type() == msg_type) {
+
+ if (stasis_subscription_change_type() == msg_type) {
+ struct stasis_subscription_change *change = stasis_message_data(message);
+
+ /*
+ * If this change type is an unsubscribe, we need to find the original
+ * subscribe and remove it from the cache otherwise the cache will
+ * continue to grow unabated.
+ */
+ if (strcmp(change->description, "Unsubscribe") == 0) {
+ struct stasis_cache_entry *sub;
+
+ ao2_wrlock(caching_topic->cache->entries);
+ sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
+ if (sub) {
+ cache_remove(caching_topic->cache->entries, sub, stasis_message_eid(message));
+ ao2_cleanup(sub);
+ }
+ ao2_unlock(caching_topic->cache->entries);
+ ao2_cleanup(caching_topic_needs_unref);
+ return;
+ }
+ msg_put = message;
+ msg = message;
+ } else if (stasis_cache_clear_type() == msg_type) {
/* Cache clear event. */
msg_put = NULL;
msg = stasis_message_data(message);
ao2_cleanup(caching_topic_needs_unref);
}
+static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+ struct stasis_cache_entry *entry = v_obj;
+
+ if (!entry) {
+ return;
+ }
+ prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
+ entry->key.id, entry->key.hash);
+}
+
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
{
struct stasis_caching_topic *caching_topic;
}
caching_topic->topic = stasis_topic_create(new_name);
- ast_free(new_name);
if (caching_topic->topic == NULL) {
ao2_ref(caching_topic, -1);
+ ast_free(new_name);
return NULL;
}
ao2_ref(cache, +1);
caching_topic->cache = cache;
+ if (!cache->registered) {
+ if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
+ ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
+ cache->entries, new_name);
+ } else {
+ cache->registered = 1;
+ }
+ }
+ ast_free(new_name);
caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
if (caching_topic->sub == NULL) {