]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
mod_hash: remote sync in working state
authorMathieu Rene <mrene@avgs.ca>
Sat, 17 Jul 2010 05:14:11 +0000 (01:14 -0400)
committerMathieu Rene <mrene@avgs.ca>
Sat, 17 Jul 2010 05:14:11 +0000 (01:14 -0400)
src/mod/applications/mod_hash/mod_hash.c

index 782c980085d2e4f220194c2d72fbd8bc16a25df4..5e21e331084173131249e37ef3dbcd3c39062246 100644 (file)
@@ -54,10 +54,11 @@ static struct {
 } globals;
 
 typedef struct {
-       uint32_t total_usage;
-       uint32_t rate_usage;
-       time_t last_check;
-       uint32_t interval;
+       uint32_t total_usage;   /* < Total */
+       uint32_t rate_usage;    /* < Current rate usage */
+       time_t last_check;              /* < Last rate check */
+       uint32_t interval;              /* < Interval used on last rate check */
+       uint32_t last_update;   /* < Last updated timestamp (rate or total) */
 } limit_hash_item_t;
 
 struct callback {
@@ -74,9 +75,9 @@ typedef struct {
 } limit_hash_private_t;
 
 typedef enum {
-       REMOTE_OFF = 0, /* Thread not running */
-       REMOTE_DOWN,    /* Cannot connect to remote instance */
-       REMOTE_UP               /* All good */
+       REMOTE_OFF = 0, /* Thread not running */
+       REMOTE_DOWN,    /* <C annot connect to remote instance */
+       REMOTE_UP               /* All good */
 } limit_remote_state_t;
 
 typedef struct {
@@ -100,6 +101,8 @@ typedef struct {
        limit_remote_state_t state;
 } limit_remote_t;
 
+static limit_hash_item_t get_remote_usage(const char *key);
+
 /* \brief Enforces limit_hash restrictions
  * \param session current session
  * \param realm limit realm
@@ -117,6 +120,7 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
        time_t now = switch_epoch_time_now(NULL);
        limit_hash_private_t *pvt = NULL;
        uint8_t increment = 1;
+       limit_hash_item_t remote_usage;
 
        hashkey = switch_core_session_sprintf(session, "%s_%s", realm, resource);
 
@@ -146,6 +150,8 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
                switch_channel_set_private(channel, "limit_hash", pvt);
        }
 
+       remote_usage = get_remote_usage(hashkey);
+
        if (interval > 0) {
                item->interval = interval;
                if (item->last_check <= (now - interval)) {
@@ -164,7 +170,7 @@ SWITCH_LIMIT_INCR(limit_incr_hash)
                                goto end;
                        }
                }
-       } else if ((max >= 0) && (item->total_usage + increment > (uint32_t) max)) {
+       } else if ((max >= 0) && (item->total_usage + increment + remote_usage.total_usage > (uint32_t) max)) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s is already at max value (%d)\n", hashkey, item->total_usage);
                status = SWITCH_STATUS_GENERR;
                goto end;
@@ -225,6 +231,19 @@ SWITCH_HASH_DELETE_FUNC(limit_hash_cleanup_delete_callback) {
        return SWITCH_FALSE;
 }
 
+SWITCH_HASH_DELETE_FUNC(limit_hash_remote_cleanup_callback) 
+{
+       limit_hash_item_t *item = (limit_hash_item_t *) val;
+       switch_time_t now = (switch_time_t)(intptr_t)pData;
+       
+       if (item->last_update != now) {
+               free(item);
+               return SWITCH_TRUE;
+       }
+       
+       return SWITCH_FALSE;
+}
+
 /* !\brief Periodically checks for unused limit entries and frees them */
 SWITCH_STANDARD_SCHED_FUNC(limit_hash_cleanup_callback)
 {
@@ -300,14 +319,19 @@ SWITCH_LIMIT_USAGE(limit_usage_hash)
        char *hash_key = NULL;
        limit_hash_item_t *item = NULL;
        int count = 0;
+       limit_hash_item_t remote_usage;
 
        switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
 
        hash_key = switch_mprintf("%s_%s", realm, resource);
+       remote_usage = get_remote_usage(hash_key);
+       
+       count = remote_usage.total_usage;
+       *rcount = remote_usage.rate_usage;
 
        if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) {
-               count = item->total_usage;
-               *rcount = item->rate_usage;
+               count += item->total_usage;
+               *rcount += item->rate_usage;
        }
 
        switch_safe_free(hash_key);
@@ -576,7 +600,7 @@ void limit_remote_destroy(limit_remote_t **r)
                switch_thread_rwlock_wrlock((*r)->rwlock);
 
                /* Free hashtable data */
-               for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) {
+               for (hi = switch_hash_first(NULL, (*r)->index); hi; hi = switch_hash_next(hi)) {
                        void *val;      
                        const void *key;
                        switch_ssize_t keylen;
@@ -593,6 +617,41 @@ void limit_remote_destroy(limit_remote_t **r)
        }
 }
 
+/* Compute the usage sum of a resource on remote boxes */
+static limit_hash_item_t get_remote_usage(const char *key) {
+       limit_hash_item_t usage = { 0 };
+       switch_hash_index_t *hi;
+       
+       switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
+       for (hi = switch_hash_first(NULL, globals.remote_hash); hi; hi = switch_hash_next(hi)) {
+               void *val;      
+               const void *hashkey;
+               switch_ssize_t keylen;
+               limit_remote_t *remote;
+               limit_hash_item_t *item;
+               switch_hash_this(hi, &hashkey, &keylen, &val);
+                                                       
+               remote = (limit_remote_t *)val;
+               if (remote->state != REMOTE_UP) {
+                       continue;
+               }
+               
+               switch_thread_rwlock_rdlock(remote->rwlock);
+               if ((item = switch_core_hash_find(remote->index, key))) {
+                       usage.total_usage += item->total_usage;
+                       usage.rate_usage += item->rate_usage;
+                       if (!usage.last_check) {
+                               usage.last_check = item->last_check;
+                       }
+               }
+               switch_thread_rwlock_unlock(remote->rwlock);
+       }
+       
+       switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
+       
+       return usage;
+}
+
 static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj)
 {
        limit_remote_t *remote = (limit_remote_t*)obj;
@@ -614,9 +673,58 @@ static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, voi
                                        remote->host, remote->port);
                                memset(&remote->handle, 0, sizeof(remote->handle));
                                remote->state = REMOTE_DOWN;
+                               /* Delete all remote tracking entries */
+                               switch_thread_rwlock_wrlock(remote->rwlock);
+                               switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, NULL);
+                               switch_thread_rwlock_unlock(remote->rwlock);
                        } else {
-                               const char *data = esl_event_get_header(remote->handle.last_sr_event, "reply-text");
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "RECV: %s\n", data);
+                               if (!zstr(remote->handle.last_sr_event->body)) {
+                                       char *data = strdup(remote->handle.last_sr_event->body);
+                                       char *p = data, *p2;
+                                       switch_time_t now = switch_epoch_time_now(NULL);
+                                       while (p && *p) {
+                                               /* We are getting the limit data as:
+                                                       L/a_c/1/0/0/0 
+                                               */
+                                               if ((p2 = strchr(p, '\n'))) {
+                                                       *p2++ = '\0';
+                                               }
+                                               
+                                               /* Now p points at the beginning of the current line, 
+                                               p2 at the start of the next one */
+                                               if (*p == 'L') { /* Limit data */
+                                                       char *argv[5];
+                                                       int argc = switch_split(p+2, '/', argv);
+                                                       
+                                                       if (argc < 5) {
+                                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Protocol error: missing argument in line: %s\n", p);
+                                                       } else {
+                                                               limit_hash_item_t *item;
+                                                               switch_thread_rwlock_wrlock(remote->rwlock);
+                                                               if (!(item = switch_core_hash_find(remote->index, argv[0]))) {
+                                                                       item = malloc(sizeof(*item));
+                                                                       switch_core_hash_insert(remote->index, argv[0], item);
+                                                               }
+                                                               item->total_usage = atoi(argv[1]);
+                                                               item->rate_usage = atoi(argv[2]);
+                                                               item->interval = atoi(argv[3]);
+                                                               item->last_check = atoi(argv[4]);
+                                                               item->last_update = now;
+                                                               switch_thread_rwlock_unlock(remote->rwlock);
+                                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Imported key %s %d %d/%d (%d - %d)\n",
+                                                                       argv[0], item->total_usage, item->rate_usage, item->interval, (int)item->last_check, (int)item->last_update);
+                                                       }
+                                               }
+                                               
+                                               p = p2;
+                                       }
+                                       free(data);
+                                       
+                                       /* Now free up anything that wasnt in this update since it means their usage is 0 */
+                                       switch_thread_rwlock_wrlock(remote->rwlock);
+                                       switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, (void*)(intptr_t)now);
+                                       switch_thread_rwlock_unlock(remote->rwlock);
+                               }
                        }
                }
                
@@ -717,9 +825,34 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown)
 {
        switch_hash_index_t *hi;
+       switch_bool_t remote_clean = SWITCH_TRUE;
        
        switch_scheduler_del_task_group("mod_hash");
 
+       /* Kill remote connections, destroy needs a wrlock so we unlock after finding a pointer */
+       while(remote_clean) {
+               void *val;      
+               const void *key;
+               switch_ssize_t keylen;
+               limit_remote_t *item = NULL;
+               
+               switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
+               if ((hi = switch_hash_first(NULL, globals.remote_hash))) {
+                       switch_hash_this(hi, &key, &keylen, &val);
+                       item = (limit_remote_t *)val;
+               }
+               switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
+               
+               if (!item) { 
+                       remote_clean = SWITCH_FALSE; 
+               } else {
+                       limit_remote_destroy(&item);
+                       switch_thread_rwlock_wrlock(globals.remote_hash_rwlock);
+                       switch_core_hash_delete(globals.remote_hash, key);
+                       switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
+               }
+       }
+
        switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
        switch_thread_rwlock_wrlock(globals.db_hash_rwlock);