]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
deliver events with the core thread pool set events-use-dispatch=true in switch.conf...
authorAnthony Minessale <anthm@freeswitch.org>
Wed, 4 Sep 2013 22:42:35 +0000 (03:42 +0500)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 4 Sep 2013 22:42:40 +0000 (03:42 +0500)
src/include/private/switch_core_pvt.h
src/include/switch_core.h
src/switch_core.c
src/switch_core_session.c
src/switch_event.c

index dd83e7052d2dbbd232ae171152c3818aa0aaac49..fffd51abac266a2e2cd0e87b3a4a9f37fc94f157 100644 (file)
@@ -275,6 +275,7 @@ struct switch_runtime {
        char *core_db_post_trans_execute;
        char *core_db_inner_pre_trans_execute;
        char *core_db_inner_post_trans_execute;
+       int events_use_dispatch;
 };
 
 extern struct switch_runtime runtime;
index 1a12da7129a9db2d30808fb950d106ef0dcc97bf..8f029872811bfbe49733ea75faea0b4261cedefc 100644 (file)
@@ -65,6 +65,7 @@ typedef struct switch_thread_data_s {
        switch_thread_start_t func;
        void *obj;
        int alloc;
+       switch_memory_pool_t *pool;
 } switch_thread_data_t;
 
 typedef struct switch_hold_record_s {
index 652916668616aac9905b520c62f0ec10cfae616d..7782265dc311f0f244990b50173698c2dffae17a 100644 (file)
@@ -1963,9 +1963,18 @@ static void switch_load_core_config(const char *file)
                                        switch_core_min_idle_cpu(atof(val));
                                } else if (!strcasecmp(var, "tipping-point") && !zstr(val)) {
                                        runtime.tipping_point = atoi(val);
+                               } else if (!strcasecmp(var, "events-use-dispatch") && !zstr(val)) {
+                                       runtime.events_use_dispatch = 1;
                                } else if (!strcasecmp(var, "initial-event-threads") && !zstr(val)) {
-                                       int tmp = atoi(val);
+                                       int tmp;
+
+                                       if (!runtime.events_use_dispatch) {
+                                               runtime.events_use_dispatch = 1;
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, 
+                                                                                 "Implicitly setting events-use-dispatch based on usage of this initial-event-threads parameter.\n");
+                                       }
 
+                                       tmp = atoi(val);
 
                                        if (tmp > runtime.cpu_count / 2) {
                                                tmp = runtime.cpu_count / 2;
index 88519d4c63a03a65197d1b38654af85b042f8594..b322873d3a49ce332266328f0c2a57f0f9a9e59b 100644 (file)
@@ -1622,10 +1622,14 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
 
                        td->func(thread, td->obj);
 
-                       if (td->alloc) {
+                       if (td->pool) {
+                               switch_memory_pool_t *pool = td->pool;
+                               td = NULL;
+                               switch_core_destroy_memory_pool(&pool);
+                       } else if (td->alloc) {
                                free(td);
                        }
-                       
+
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread);
                        
                        switch_mutex_lock(session_manager.mutex);
index bdb9bad9a7c5a6d25413c3773f871288fabfce68..b87d1d4d4be51ebee73160605b78efc4529eefcd 100644 (file)
@@ -36,6 +36,7 @@
 #include <switch.h>
 #include <switch_event.h>
 #include "tpl.h"
+#include "private/switch_core_pvt.h"
 
 //#define SWITCH_EVENT_RECYCLE
 #define DISPATCH_QUEUE_LEN 10000
@@ -244,6 +245,34 @@ static int switch_events_match(switch_event_t *event, switch_event_node_t *node)
        return match;
 }
 
+
+static void *SWITCH_THREAD_FUNC switch_event_deliver_thread(switch_thread_t *thread, void *obj)
+{
+       switch_event_t *event = (switch_event_t *) obj;
+
+       switch_event_deliver(&event);
+
+       return NULL;
+}
+
+static void switch_event_deliver_thread_pool(switch_event_t **event)
+{
+       switch_thread_data_t *td;
+       
+       td = malloc(sizeof(*td));
+       switch_assert(td);
+
+       td->alloc = 1;
+       td->func = switch_event_deliver_thread;
+       td->obj = *event;
+       td->pool = NULL;
+
+       *event = NULL;
+
+       switch_thread_pool_launch_thread(&td);
+
+}
+
 static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
 {
        switch_queue_t *queue = (switch_queue_t *) obj;
@@ -489,19 +518,22 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
        SYSTEM_RUNNING = 0;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
+       if (runtime.events_use_dispatch) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
 
-       for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
-               switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
-       }
-
-       switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
-
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
+               for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
+                       switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
+               }
+               
 
-       for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
-               switch_status_t st;
-               switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
+               switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
+               
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
+               
+               for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
+                       switch_status_t st;
+                       switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
+               }
        }
 
        x = 0;
@@ -513,7 +545,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
                last = THREAD_COUNT;
        }
 
-       {
+       if (runtime.events_use_dispatch) {
                void *pop = NULL;
                switch_event_t *event = NULL;
 
@@ -622,19 +654,21 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
 
        //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
 
-
-       switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
-       switch_event_launch_dispatch_threads(1);
+       if (runtime.events_use_dispatch) {
+               switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
+               switch_event_launch_dispatch_threads(1);
+       }
 
        //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
        //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
        //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
 
-       while (!THREAD_COUNT) {
-               switch_cond_next();
+       if (runtime.events_use_dispatch) {
+               while (!THREAD_COUNT) {
+                       switch_cond_next();
+               }
        }
 
-
        switch_mutex_lock(EVENT_QUEUE_MUTEX);
        SYSTEM_RUNNING = 1;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
@@ -1884,9 +1918,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con
                (*event)->event_user_data = user_data;
        }
 
-       if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
-               switch_event_destroy(event);
-               return SWITCH_STATUS_FALSE;
+
+
+       if (runtime.events_use_dispatch) {
+               if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
+                       switch_event_destroy(event);
+                       return SWITCH_STATUS_FALSE;
+               }
+       } else {
+               switch_event_deliver_thread_pool(event);
        }
 
        return SWITCH_STATUS_SUCCESS;