]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
break event parsing for mod_voicemail into its own internal queue (by Moc)
authorAnthony Minessale <anthm@freeswitch.org>
Wed, 17 Aug 2011 16:27:20 +0000 (11:27 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 17 Aug 2011 16:27:20 +0000 (11:27 -0500)
src/mod/applications/mod_voicemail/mod_voicemail.c

index b66262068240e60e5dc2bdc24f6183e88057b8c6..3d0e48981e2516961a08081b6bbcbf0ecd99fdfb 100644 (file)
@@ -45,6 +45,7 @@ SWITCH_MODULE_DEFINITION(mod_voicemail, mod_voicemail_load, mod_voicemail_shutdo
 #define VM_EVENT_MAINT "vm::maintenance"
 
 #define VM_MAX_GREETINGS 9
+#define VM_EVENT_QUEUE_SIZE 50000
 
 static switch_status_t voicemail_inject(const char *data, switch_core_session_t *session);
 
@@ -53,6 +54,9 @@ static struct {
        switch_hash_t *profile_hash;
        int debug;
        int message_query_exact_match;
+       int32_t threads;
+       int32_t running;
+       switch_queue_t *event_queue;
        switch_mutex_t *mutex;
        switch_memory_pool_t *pool;
 } globals;
@@ -3655,7 +3659,7 @@ SWITCH_STANDARD_API(prefs_api_function)
        }
 
 
-static void message_query_handler(switch_event_t *event)
+static void actual_message_query_handler(switch_event_t *event)
 {
        char *account = switch_event_get_header(event, "message-account");
        int created = 0;
@@ -3727,6 +3731,101 @@ static void message_query_handler(switch_event_t *event)
 
 }
 
+static int EVENT_THREAD_RUNNING = 0;
+static int EVENT_THREAD_STARTED = 0;
+
+void *SWITCH_THREAD_FUNC vm_event_thread_run(switch_thread_t *thread, void *obj)
+{
+       void *pop;
+       int done = 0;
+
+       switch_mutex_lock(globals.mutex);
+       if (!EVENT_THREAD_RUNNING) {
+               EVENT_THREAD_RUNNING++;
+               globals.threads++;
+       } else {
+               done = 1;
+       }
+       switch_mutex_unlock(globals.mutex);
+
+       if (done) {
+               return NULL;
+       }
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Started\n");
+
+       while (globals.running == 1) {
+               int count = 0;
+
+               if (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+                       switch_event_t *event = (switch_event_t *) pop;
+
+                       if (!pop) {
+                               break;
+                       }
+                       actual_message_query_handler(event);
+                       switch_event_destroy(&event);
+                       count++;
+               }
+
+               if (!count) {
+                       switch_yield(100000);
+               }
+       }
+
+       while (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+               switch_event_t *event = (switch_event_t *) pop;
+               switch_event_destroy(&event);
+       }
+
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
+
+       switch_mutex_lock(globals.mutex);
+       globals.threads--;
+       EVENT_THREAD_RUNNING = EVENT_THREAD_STARTED = 0;
+       switch_mutex_unlock(globals.mutex);
+
+       return NULL;
+}
+
+void vm_event_thread_start(void)
+{
+       switch_thread_t *thread;
+       switch_threadattr_t *thd_attr = NULL;
+       int done = 0;
+
+       switch_mutex_lock(globals.mutex);
+       if (!EVENT_THREAD_STARTED) {
+               EVENT_THREAD_STARTED++;
+       } else {
+               done = 1;
+       }
+       switch_mutex_unlock(globals.mutex);
+
+       if (done) {
+               return;
+       }
+
+       switch_threadattr_create(&thd_attr, globals.pool);
+       switch_threadattr_detach_set(thd_attr, 1);
+       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+       switch_threadattr_priority_increase(thd_attr);
+       switch_thread_create(&thread, thd_attr, vm_event_thread_run, NULL, globals.pool);
+}
+
+void vm_event_handler(switch_event_t *event)
+{
+       switch_event_t *cloned_event;
+
+       switch_event_dup(&cloned_event, event);
+       switch_assert(cloned_event);
+       switch_queue_push(globals.event_queue, cloned_event);
+
+       if (!EVENT_THREAD_STARTED) {
+               vm_event_thread_start();
+       }
+}
 
 struct holder {
        vm_profile_t *profile;
@@ -5502,14 +5601,20 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_voicemail_load)
        switch_core_hash_init(&globals.profile_hash, globals.pool);
        switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
 
+       switch_mutex_lock(globals.mutex);
+       globals.running = 1;
+       switch_mutex_unlock(globals.mutex);
+
+       switch_queue_create(&globals.event_queue, VM_EVENT_QUEUE_SIZE, globals.pool);
 
        if ((status = load_config()) != SWITCH_STATUS_SUCCESS) {
+               globals.running = 0;
                return status;
        }
        /* connect my internal structure to the blank pointer passed to me */
        *module_interface = switch_loadable_module_create_module_interface(pool, modname);
 
-       if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, message_query_handler, NULL)
+       if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, vm_event_handler, NULL)
                != SWITCH_STATUS_SUCCESS) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
                return SWITCH_STATUS_GENERR;
@@ -5554,9 +5659,23 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_voicemail_shutdown)
        void *val = NULL;
        const void *key;
        switch_ssize_t keylen;
+       int sanity = 0;
+
+       switch_mutex_lock(globals.mutex);
+       if (globals.running == 1) {
+               globals.running = 0;
+       }
+       switch_mutex_unlock(globals.mutex);
 
        switch_event_free_subclass(VM_EVENT_MAINT);
-       switch_event_unbind_callback(message_query_handler);
+       switch_event_unbind_callback(vm_event_handler);
+
+       while (globals.threads) {
+               switch_cond_next();
+               if (++sanity >= 60000) {
+                       break;
+               }
+       }
 
        switch_mutex_lock(globals.mutex);
        while ((hi = switch_hash_first(NULL, globals.profile_hash))) {