]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7557 mod_mongo add limit backend - still testing
authorChris Rienzo <chris.rienzo@grasshopper.com>
Thu, 14 May 2015 21:19:42 +0000 (17:19 -0400)
committerChris Rienzo <chris.rienzo@grasshopper.com>
Thu, 14 May 2015 21:19:42 +0000 (17:19 -0400)
src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml
src/mod/applications/mod_mongo/mod_mongo.c

index 57f9f6566f2b44f458d01dee4f1f7160b2e4d40a..041432a23258fb27ec39a98eaecf22649d26af93 100644 (file)
@@ -1,11 +1,17 @@
 <configuration name="mongo.conf">
   <settings>
-    <!-- 
+    <!--
       connection-string handles different ways to connect to mongo
       mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
     -->
-    <param name="connection-string" value="mongodb://127.0.0.1:27017/?connectTimeoutMS=5000"/>
-
+    <!-- connection for queries and limit backend -->
+    <param name="connection-string" value="mongodb://127.0.0.1:27017/?connecttimetoutms=5000"/>
+    <!-- connection for limit backend, if different than connection-string (default is value set in connection-string) -->
+    <!--param name="limit-connection-string" value="mongodb://127.0.0.1:27019/?connecttimeoutms=5000"/-->
+    <!-- database name for limit backend (default is limit) -->
+    <!--param name="limit-database" value="limit"/-->
+    <!-- collection name for limit backend (default is mod_mongo) -->
+    <!--param name="limit-collection" value="mod_mongo"/-->
 <!--
     <param name="map" value="function() { emit(this.a, 1); }"/>
     <param name="reduce" value="function(key, values) { return Array.sum(values); }"/>
index 0de63fd0b6f01d3b328c7dd96ffed8afd0067aa8..a14958dab61609422a06d19e99b9f2751993e281 100644 (file)
@@ -26,7 +26,7 @@
  * Tamas Cseke <cstomi.levlist@gmail.com>
  * Christopher Rienzo <crienzo@grasshopper.com>
  *
- * mod_mongo.c -- API for MongoDB 
+ * mod_mongo.c -- API for MongoDB
  *
  */
 #include <switch.h>
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load);
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown);
-SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, NULL);
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime);
+SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, mod_mongo_runtime);
 
 static struct {
+       int shutdown;
        const char *map;
        const char *reduce;
        const char *finalize;
        const char *conn_str;
-       mongoc_uri_t *uri;
        mongoc_client_pool_t *client_pool;
+       const char *limit_database;
+       const char *limit_collection;
+       const char *limit_conn_str;
+       int limit_cleanup_interval_sec;
+       mongoc_client_pool_t *limit_client_pool;
+       switch_mutex_t *mod_mongo_private_mutex;
+       switch_thread_rwlock_t *limit_database_rwlock;
+       switch_thread_rwlock_t *shutdown_rwlock;
 } globals;
 
+/**
+ * resources acquired by this session
+ */
+struct mod_mongo_private {
+       switch_hash_t *resources;
+       switch_mutex_t *mutex;
+};
+
 /**
  * @param query_options_str
  * @return query options
@@ -94,11 +111,11 @@ static int parse_query_options(char *query_options_str)
 /**
  * @return a new connection to mongodb or NULL if error
  */
-static mongoc_client_t *get_connection(void)
+static mongoc_client_t *get_connection(mongoc_client_pool_t *client_pool, const char *conn_str)
 {
-       mongoc_client_t *client = mongoc_client_pool_pop(globals.client_pool);
+       mongoc_client_t *client = mongoc_client_pool_pop(client_pool);
        if (!client) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get connection to: %s\n", globals.conn_str);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get connection to: %s\n", conn_str);
                return NULL;
        }
        /* TODO auth */
@@ -108,12 +125,12 @@ static mongoc_client_t *get_connection(void)
 /**
  * Mark connection as finished
  */
-static void connection_done(mongoc_client_t *conn)
+static void connection_done(mongoc_client_pool_t *client_pool, mongoc_client_t *conn)
 {
-       mongoc_client_pool_push(globals.client_pool, conn);
+       mongoc_client_pool_push(client_pool, conn);
 }
 
-SWITCH_STANDARD_API(mongo_mapreduce_function)
+SWITCH_STANDARD_API(mod_mongo_mapreduce_function)
 {
        switch_status_t status = SWITCH_STATUS_SUCCESS;
        char *db = NULL, *collection = NULL, *json_query = NULL;
@@ -129,7 +146,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function)
        }
 
        if (!zstr(db) && !zstr(collection) && !zstr(json_query)) {
-               mongoc_client_t *conn = get_connection();
+               mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
                if (conn) {
                        bson_error_t error;
                        bson_t *query = bson_new_from_json((uint8_t *)json_query, strlen(json_query), &error);
@@ -172,7 +189,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function)
                        } else {
                                stream->write_function(stream, "-ERR\nfailed to parse query!\n");
                        }
-                       connection_done(conn);
+                       connection_done(globals.client_pool, conn);
                } else {
                        stream->write_function(stream, "-ERR\nfailed to get connection!\n");
                }
@@ -185,7 +202,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function)
        return status;
 }
 
-SWITCH_STANDARD_API(mongo_find_n_function)
+SWITCH_STANDARD_API(mod_mongo_find_n_function)
 {
        switch_status_t status = SWITCH_STATUS_SUCCESS;
        char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
@@ -223,7 +240,7 @@ SWITCH_STANDARD_API(mongo_find_n_function)
 
        if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) {
                bson_error_t error;
-               mongoc_client_t *conn = get_connection();
+               mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
                if (conn) {
                        mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection);
                        if (col) {
@@ -268,7 +285,7 @@ SWITCH_STANDARD_API(mongo_find_n_function)
                        } else {
                                stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection);
                        }
-                       connection_done(conn);
+                       connection_done(globals.client_pool, conn);
                } else {
                        stream->write_function(stream, "-ERR\nfailed to get connection!\n");
                }
@@ -281,7 +298,7 @@ SWITCH_STANDARD_API(mongo_find_n_function)
        return status;
 }
 
-SWITCH_STANDARD_API(mongo_find_one_function)
+SWITCH_STANDARD_API(mod_mongo_find_one_function)
 {
        switch_status_t status = SWITCH_STATUS_SUCCESS;
        char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
@@ -308,7 +325,7 @@ SWITCH_STANDARD_API(mongo_find_one_function)
 
        if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) {
                bson_error_t error;
-               mongoc_client_t *conn = get_connection();
+               mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
                if (conn) {
                        mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection);
                        if (col) {
@@ -348,7 +365,7 @@ SWITCH_STANDARD_API(mongo_find_one_function)
                        } else {
                                stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection);
                        }
-                       connection_done(conn);
+                       connection_done(globals.client_pool, conn);
                } else {
                        stream->write_function(stream, "-ERR\nfailed to get connection!\n");
                }
@@ -361,7 +378,362 @@ SWITCH_STANDARD_API(mongo_find_one_function)
        return status;
 }
 
-static switch_status_t config(switch_memory_pool_t *pool)
+/**
+ * Calculate resource count from BSON document
+ */
+static switch_status_t mod_mongo_get_count(switch_core_session_t *session, const char *key, const bson_t *b, int *new_val_ret, char **resource_ret)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+       bson_iter_t iter;
+       if (new_val_ret) {
+               if (bson_iter_init_find(&iter, b, key) && BSON_ITER_HOLDS_INT32(&iter)) {
+                       *new_val_ret = bson_iter_int32(&iter);
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to get resource count\n");
+                       status = SWITCH_STATUS_GENERR;
+               }
+       }
+       if (resource_ret) {
+               if (bson_iter_init_find(&iter, b, "_id") && BSON_ITER_HOLDS_UTF8(&iter)) {
+                       uint32_t len;
+                       const char *resource = bson_iter_utf8(&iter, &len);
+                       if (!zstr(resource)) {
+                               if (bson_utf8_validate(resource, len, 0)) {
+                                       *resource_ret = strdup(resource);
+                               } else {
+                                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is not valid utf8\n");
+                                       status = SWITCH_STATUS_GENERR;
+                               }
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is empty string");
+                               status = SWITCH_STATUS_GENERR;
+                       }
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name not found\n");
+                       status = SWITCH_STATUS_GENERR;
+               }
+       }
+       return status;
+}
+
+/**
+ * Increment a resource by val
+ * @param session
+ * @param resource name of resource being incremented
+ * @param val number to increment resource by
+ * @param max maximum value of resource allowed
+ * @param new_val_ret new value of resource after increment completed
+ */
+static switch_status_t mod_mongo_increment(switch_core_session_t *session, const char *resource, int val, int max, int *new_val_ret)
+{
+       switch_status_t status = SWITCH_STATUS_GENERR;
+       mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
+       if (conn) {
+               mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
+               if (col) {
+                       int upsert;
+                       bson_t *query, *update, reply;
+                       bson_error_t error;
+
+                       /* construct update query - the counts are stored as:
+                       { _id: "realm_resource", total: 29, "fs-1": 5, "fs-2": 10, "fs-3": 3, "fs-4": 11 }
+                       */
+                       if (val >= 0) {
+                               if (max > 0) {
+                                       /* increment if < max */
+                                       query = BCON_NEW("_id", resource,
+                                               "total", "{", "$lt", BCON_INT32(max), "}");
+                                       upsert = 1; /* will fail with duplicate index key error if total condition is not satisfied */
+                               } else {
+                                       /* increment, no restrictions */
+                                       query = BCON_NEW("_id", resource);
+                                       upsert = 1;
+                               }
+                       } else {
+                               /* don't allow decrement below 0, don't add fields that don't exist */
+                               query = BCON_NEW("_id", resource,
+                                       "total", "{", "$gte", BCON_INT32(-val), "}",
+                                       switch_core_get_switchname(), "{", "$gte", BCON_INT32(-val), "}");
+                               upsert = 0;
+                       }
+                       update = BCON_NEW("$inc", "{", "total", BCON_INT32(val), switch_core_get_switchname(), BCON_INT32(val), "}");
+
+                       if (!mongoc_collection_find_and_modify(col, query, NULL, update, NULL, false, upsert, true, &reply, &error)) {
+                               if (max > 0 && error.code == 11000) {
+                                       /* duplicate key index error - limit exceeded  */
+                                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s exceeds maximum rate of %d\n", resource, max);
+                                       status = SWITCH_STATUS_FALSE;
+                               } else {
+                                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: %s\n", resource, val, error.message);
+                                       status = SWITCH_STATUS_GENERR;
+                               }
+                       } else if (new_val_ret) {
+                               status = mod_mongo_get_count(session, "total", &reply, new_val_ret, NULL);
+                       } else {
+                               status = SWITCH_STATUS_SUCCESS;
+                       }
+
+                       bson_destroy(query);
+                       bson_destroy(update);
+                       bson_destroy(&reply);
+                       mongoc_collection_destroy(col);
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: unable to get collection %s from database %s\n", resource, val, globals.limit_collection, globals.limit_database);
+               }
+               connection_done(globals.limit_client_pool, conn);
+       }
+       return status;
+}
+
+/**
+ * Get resource usage
+ */
+static switch_status_t mod_mongo_get_usage(const char *resource, int *usage)
+{
+       switch_status_t status = SWITCH_STATUS_GENERR;
+       mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
+       if (conn) {
+               mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
+               if (col) {
+                       bson_t *query = BCON_NEW("_id", resource);
+                       bson_t *fields = BCON_NEW("{", "total", BCON_INT32(1), "}");
+                       bson_error_t error;
+                       mongoc_cursor_t *cursor = mongoc_collection_find(col, 0, 0, 1, 0, query, fields, NULL);
+                       if (cursor) {
+                               if (!mongoc_cursor_error(cursor, &error)) {
+                                       /* get result from cursor */
+                                       const bson_t *result;
+                                       if (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
+                                               status = mod_mongo_get_count(NULL, "total", result, usage, NULL);
+                                       }
+                               }
+                               mongoc_cursor_destroy(cursor);
+                       }
+                       bson_destroy(query);
+                       bson_destroy(fields);
+                       mongoc_collection_destroy(col);
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Get usage failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
+               }
+               connection_done(globals.limit_client_pool, conn);
+       }
+       return status;
+}
+
+/**
+ * Clear all limits on this server
+ */
+static switch_status_t mod_mongo_reset(void)
+{
+       switch_status_t status = SWITCH_STATUS_GENERR;
+       mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
+       if (conn) {
+               mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
+               if (col) {
+                       bson_t *query;
+                       //bson_t *fields;
+                       mongoc_cursor_t *cursor;
+                       bson_error_t error;
+                       query = BCON_NEW(switch_core_get_switchname(), "{", "$gt", BCON_INT32(0), "}");
+                       //fields = BCON_NEW(switch_core_get_switchname(), "1");
+
+                       /* find all docs w/ this server and clear its counts */
+                       switch_thread_rwlock_wrlock(globals.limit_database_rwlock); /* prevent increments on this server */
+                       cursor = mongoc_collection_find(col, 0, 0, 0, 0, query, NULL, NULL);
+                       if (cursor) {
+                               if (!mongoc_cursor_error(cursor, &error)) {
+                                       /* get result from cursor */
+                                       const bson_t *result;
+                                       char *resource = NULL;
+                                       while (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
+                                               int count = 0;
+                                               if ((status = mod_mongo_get_count(NULL, switch_core_get_switchname(), result, &count, &resource)) == SWITCH_STATUS_SUCCESS) {
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset %s, -%d\n", resource, count);
+                                                       if (count > 0 && !zstr(resource)) {
+                                                               /* decrement server counts from mongo */
+                                                               if ((status = mod_mongo_increment(NULL, resource, -count, 0, NULL)) == SWITCH_STATUS_GENERR) {
+                                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset done - increment error\n");
+                                                                       break;
+                                                               }
+                                                       }
+                                                       switch_safe_free(resource);
+                                                       resource = NULL;
+                                               } else {
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset failed - get count\n");
+                                                       break;
+                                               }
+                                       }
+                                       switch_safe_free(resource);
+                               } else {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: %s\n", error.message);
+                               }
+                               mongoc_cursor_destroy(cursor);
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: NULL cursor returned\n");
+                       }
+                       switch_thread_rwlock_unlock(globals.limit_database_rwlock);
+
+                       bson_destroy(query);
+                       //bson_destroy(fields);
+                       mongoc_collection_destroy(col);
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
+               }
+               connection_done(globals.limit_client_pool, conn);
+       }
+       return status;
+}
+
+/**
+ * Clean up all entries w/ resource count of 0
+ */
+static switch_status_t mod_mongo_cleanup(void)
+{
+       switch_status_t status = SWITCH_STATUS_GENERR;
+       mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
+       if (conn) {
+               mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
+               if (col) {
+                       bson_t *selector = BCON_NEW("total", BCON_INT32(0));
+                       bson_error_t error;
+                       if (mongoc_collection_remove(col, 0, selector, NULL, &error)) {
+                               status = SWITCH_STATUS_SUCCESS;
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: %s\n", error.message);
+                       }
+                       bson_destroy(selector);
+                       mongoc_collection_destroy(col);
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
+               }
+               connection_done(globals.limit_client_pool, conn);
+       }
+       return status;
+}
+
+/**
+ * @brief Enforces limit_mongo restrictions
+ * @param session current session
+ * @param realm limit realm
+ * @param id limit id
+ * @param max maximum count
+ * @param interval interval for rate limiting
+ * @return SWITCH_TRUE if the access is allowed, SWITCH_FALSE if it isn't
+ */
+SWITCH_LIMIT_INCR(mod_mongo_limit_incr)
+{
+       switch_status_t status = SWITCH_STATUS_FALSE;
+       switch_channel_t *channel = switch_core_session_get_channel(session);
+       const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource);
+
+       /* get session's resource tracking information */
+       struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo");
+       if (!pvt) {
+               switch_mutex_lock(globals.mod_mongo_private_mutex); /* prevents concurrent alloc of mod_mongo_private */
+               pvt = switch_channel_get_private(channel, "limit_mongo");
+               if (!pvt) {
+                       pvt = (struct mod_mongo_private *) switch_core_session_alloc(session, sizeof(*pvt));
+                       switch_core_hash_init(&pvt->resources);
+                       switch_mutex_init(&pvt->mutex, SWITCH_MUTEX_UNNESTED, switch_core_session_get_pool(session));
+                       switch_channel_set_private(channel, "limit_mongo", pvt);
+               }
+               switch_mutex_unlock(globals.mod_mongo_private_mutex);
+       }
+
+       switch_mutex_lock(pvt->mutex); /* prevents concurrent increment in session */
+       switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */
+
+       /* check if resource is already incremented on this session */
+       if (!switch_core_hash_find(pvt->resources, limit_id)) {
+               /* increment resource usage */
+               if ((status = mod_mongo_increment(session, limit_id, 1, max, NULL)) == SWITCH_STATUS_SUCCESS) {
+                       /* remember this resource was incremented */
+                       switch_core_hash_insert(pvt->resources, limit_id, limit_id);
+               }
+       } else {
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s already acquired\n", limit_id);
+       }
+
+       switch_thread_rwlock_unlock(globals.limit_database_rwlock);
+       switch_mutex_unlock(pvt->mutex);
+
+       return status;
+}
+
+/**
+ * @brief Releases usage of a limit_mongo-controlled resource
+ */
+SWITCH_LIMIT_RELEASE(mod_mongo_limit_release)
+{
+       switch_channel_t *channel = switch_core_session_get_channel(session);
+       struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo");
+       int status = SWITCH_STATUS_SUCCESS;
+
+       if (!pvt) {
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "No limit tracking data for channel\n");
+               return SWITCH_STATUS_SUCCESS;
+       }
+
+       switch_mutex_lock(pvt->mutex); /* prevents concurrent decrement in session */
+       switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */
+
+       /* no realm / resource = clear all resources */
+       if (realm == NULL && resource == NULL) {
+               /* clear all resources */
+               switch_hash_index_t *hi = NULL;
+               while ((hi = switch_core_hash_first_iter(pvt->resources, hi))) {
+                       void *p_val = NULL;
+                       const void *p_key;
+                       switch_ssize_t keylen;
+                       switch_core_hash_this(hi, &p_key, &keylen, &p_val);
+                       if (mod_mongo_increment(session, (const char *)p_key, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) {
+                               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", (const char *)p_key);
+                               status = SWITCH_STATUS_FALSE;
+                               break;
+                       } else {
+                               switch_core_hash_delete(pvt->resources, (const char *) p_key);
+                       }
+               }
+       } else if (!zstr(realm) && !zstr(resource)) {
+               /* clear specific resource */
+               const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource);
+               if (switch_core_hash_find(pvt->resources, limit_id)) {
+                       if (mod_mongo_increment(session, limit_id, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) {
+                               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", limit_id);
+                       } else {
+                               switch_core_hash_delete(pvt->resources, limit_id);
+                       }
+               }
+       } else {
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Missing either realm or resource to release\n");
+       }
+
+       switch_thread_rwlock_unlock(globals.limit_database_rwlock);
+       switch_mutex_unlock(pvt->mutex);
+
+       return status;
+}
+
+SWITCH_LIMIT_USAGE(mod_mongo_limit_usage)
+{
+       char *limit_id = switch_mprintf("%s_%s", realm, resource);
+       int usage = 0;
+       mod_mongo_get_usage(limit_id, &usage);
+       switch_safe_free(limit_id);
+       return usage;
+}
+
+SWITCH_LIMIT_RESET(mod_mongo_limit_reset)
+{
+       return mod_mongo_reset();
+}
+
+SWITCH_LIMIT_STATUS(mod_mongo_limit_status)
+{
+       return strdup("-ERR not supported");
+}
+
+static switch_status_t do_config(switch_memory_pool_t *pool)
 {
        const char *cf = "mongo.conf";
        switch_xml_t cfg, xml, settings, param;
@@ -372,8 +744,12 @@ static switch_status_t config(switch_memory_pool_t *pool)
        globals.reduce = "";
        globals.finalize = "";
        globals.conn_str = "";
-       globals.uri = NULL;
        globals.client_pool = NULL;
+       globals.limit_database = "limit";
+       globals.limit_collection = "mod_mongo";
+       globals.limit_conn_str = "";
+       globals.limit_client_pool = NULL;
+       globals.limit_cleanup_interval_sec = 300;
 
        if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
@@ -391,21 +767,75 @@ static switch_status_t config(switch_memory_pool_t *pool)
                                        status = SWITCH_STATUS_GENERR;
                                        goto done;
                                } else {
+                                       mongoc_uri_t *uri;
                                        globals.conn_str = switch_core_strdup(pool, val);
-                                       globals.uri = mongoc_uri_new(globals.conn_str);
-                                       if (globals.uri) {
-                                               globals.client_pool = mongoc_client_pool_new(globals.uri);
+                                       uri = mongoc_uri_new(globals.conn_str);
+                                       if (uri) {
+                                               globals.client_pool = mongoc_client_pool_new(uri);
+                                               mongoc_uri_destroy(uri);
                                                if (!globals.client_pool) {
                                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for connection-string: %s\n", globals.conn_str);
                                                        status = SWITCH_STATUS_GENERR;
                                                        goto done;
                                                }
+                                               if (!globals.limit_client_pool) {
+                                                       /* use connection-string for limit backend unless overriden by limit-connection-string */
+                                                       globals.limit_client_pool = globals.client_pool;
+                                                       globals.limit_conn_str = globals.conn_str;
+                                               }
                                        } else {
+                                               mongoc_uri_destroy(uri);
                                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid connection-string: %s\n", globals.conn_str);
                                                status = SWITCH_STATUS_GENERR;
                                                goto done;
                                        }
                                }
+                       } else if (!strcmp(var, "limit-connection-string")) {
+                               if (zstr(val)) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-connection-string - using connection-string instead\n");
+                                       continue;
+                               } else {
+                                       mongoc_uri_t *uri;
+                                       globals.limit_conn_str = switch_core_strdup(pool, val);
+                                       uri = mongoc_uri_new(globals.limit_conn_str);
+                                       if (uri) {
+                                               globals.limit_client_pool = mongoc_client_pool_new(uri);
+                                               mongoc_uri_destroy(uri);
+                                               if (!globals.limit_client_pool) {
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for limit-connection-string: %s\n", globals.limit_conn_str);
+                                                       status = SWITCH_STATUS_GENERR;
+                                                       goto done;
+                                               }
+                                       } else {
+                                               mongoc_uri_destroy(uri);
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid limit-connection-string: %s\n", globals.limit_conn_str);
+                                               status = SWITCH_STATUS_GENERR;
+                                               goto done;
+                                       }
+                               }
+                       } else if (!strcmp(var, "limit-database")) {
+                               if (zstr(val)) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-database - using '%s'\n", globals.limit_database);
+                               } else {
+                                       globals.limit_database = switch_core_strdup(pool, val);
+                               }
+                       } else if (!strcmp(var, "limit-collection")) {
+                               if (zstr(val)) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-collection - using '%s'\n", globals.limit_collection);
+                               } else {
+                                       globals.limit_collection = switch_core_strdup(pool, val);
+                               }
+                       } else if (!strcmp(var, "limit-cleanup-interval-sec")) {
+                               if (zstr(val) || !switch_is_number(val)) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "bad value of limit-cleanup-interval-sec\n");
+                               } else {
+                                       int new_interval = atoi(val);
+                                       if (new_interval >= 0) {
+                                               globals.limit_cleanup_interval_sec = new_interval;
+                                       } else {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "limit-cleanup-interval-sec must be >= 0\n");
+                                       }
+                               }
                        } else if (!strcmp(var, "map")) {
                                if (!zstr(val)) {
                                        globals.map = switch_core_strdup(pool, val);
@@ -435,29 +865,69 @@ done:
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load)
 {
-       switch_api_interface_t *api_interface;
+       switch_api_interface_t *api_interface = NULL;
+       switch_limit_interface_t *limit_interface = NULL;
 
        *module_interface = switch_loadable_module_create_module_interface(pool, modname);
 
        memset(&globals, 0, sizeof(globals));
 
-       if (config(pool) != SWITCH_STATUS_SUCCESS) {
+       if (do_config(pool) != SWITCH_STATUS_SUCCESS) {
                return SWITCH_STATUS_TERM;
        }
 
-       SWITCH_ADD_API(api_interface, "mongo_find_one", "findOne", mongo_find_one_function, FIND_ONE_SYNTAX);
-       SWITCH_ADD_API(api_interface, "mongo_find_n", "find", mongo_find_n_function, FIND_N_SYNTAX);
-       SWITCH_ADD_API(api_interface, "mongo_mapreduce", "Map/Reduce", mongo_mapreduce_function, MAPREDUCE_SYNTAX);
+       switch_mutex_init(&globals.mod_mongo_private_mutex, SWITCH_MUTEX_UNNESTED, pool);
+       switch_thread_rwlock_create(&globals.limit_database_rwlock, pool);
+       switch_thread_rwlock_create(&globals.shutdown_rwlock, pool);
+
+       /* clear all entries */
+       mod_mongo_reset();
+
+       SWITCH_ADD_API(api_interface, "mongo_find_one", "findOne", mod_mongo_find_one_function, FIND_ONE_SYNTAX);
+       SWITCH_ADD_API(api_interface, "mongo_find_n", "find", mod_mongo_find_n_function, FIND_N_SYNTAX);
+       SWITCH_ADD_API(api_interface, "mongo_mapreduce", "Map/Reduce", mod_mongo_mapreduce_function, MAPREDUCE_SYNTAX);
+
+       SWITCH_ADD_LIMIT(limit_interface, "mongo", mod_mongo_limit_incr, mod_mongo_limit_release, mod_mongo_limit_usage, mod_mongo_limit_reset, mod_mongo_limit_status, NULL);
 
        return SWITCH_STATUS_SUCCESS;
 }
 
+/**
+ * Periodically cleanup mongo limit counters
+ */
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime)
+{
+       switch_interval_time_t cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000);
+       switch_thread_rwlock_rdlock(globals.shutdown_rwlock);
+       while(!globals.shutdown && globals.limit_cleanup_interval_sec) {
+               switch_micro_sleep(1 * 1000 * 1000);
+               if (!globals.shutdown && switch_micro_time_now() > cleanup_time) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleanup\n");
+                       mod_mongo_cleanup();
+                       cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000);
+               }
+       }
+       switch_thread_rwlock_unlock(globals.shutdown_rwlock);
+       return SWITCH_STATUS_TERM;
+}
+
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown)
 {
+       globals.shutdown = 1;
+       switch_thread_rwlock_wrlock(globals.shutdown_rwlock);
+       switch_thread_rwlock_unlock(globals.shutdown_rwlock);
+       if (globals.limit_client_pool && globals.limit_client_pool != globals.client_pool) {
+               mongoc_client_pool_destroy(globals.limit_client_pool);
+               globals.limit_client_pool = NULL;
+       }
        if (globals.client_pool) {
                mongoc_client_pool_destroy(globals.client_pool);
                globals.client_pool = NULL;
        }
+       if (globals.mod_mongo_private_mutex) {
+               switch_mutex_destroy(globals.mod_mongo_private_mutex);
+               globals.mod_mongo_private_mutex = NULL;
+       }
        return SWITCH_STATUS_SUCCESS;
 }