]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fix regression in core event system
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 2 Sep 2010 17:30:26 +0000 (12:30 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 2 Sep 2010 17:30:26 +0000 (12:30 -0500)
src/switch_event.c

index 1301d924779e75b8e90c38d11c875840f53d4c93..cfd6a7a13951a76aebe3b0b594ad401f23d7186d 100644 (file)
@@ -79,6 +79,7 @@ static switch_memory_pool_t *THRUNTIME_POOL = NULL;
 static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 };
 static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 };
 static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 };
+static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 };
 static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 };
 static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
 static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
@@ -238,9 +239,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
 {
        switch_queue_t *queue = (switch_queue_t *) obj;
        int my_id = 0;
+
        switch_mutex_lock(EVENT_QUEUE_MUTEX);
        THREAD_COUNT++;
-       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
        for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
                if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
@@ -248,6 +249,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
                }
        }
 
+       EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
+       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+
        for (;;) {
                void *pop = NULL;
                switch_event_t *event = NULL;
@@ -270,6 +274,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
 
 
        switch_mutex_lock(EVENT_QUEUE_MUTEX);
+       EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
        THREAD_COUNT--;
        switch_mutex_unlock(EVENT_QUEUE_MUTEX);
 
@@ -298,6 +303,7 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
        for (;;) {
                void *pop = NULL;
                switch_event_t *event = NULL;
+               int loops = 0;
 
                if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
                        break;
@@ -314,13 +320,13 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
                event = (switch_event_t *) pop;
 
                while (event) {
-                       int max;
 
-                       switch_mutex_lock(EVENT_QUEUE_MUTEX);
-                       max = SOFT_MAX_DISPATCH;
-                       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
+                       if (++loops > 2) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system overloading\n");
+                               switch_yield(1000000);
+                       }
 
-                       for (index = 0; (int)index < max; index++) {
+                       for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
                                if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
                                        event = NULL;
                                        break;
@@ -328,19 +334,15 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
                        }
 
                        if (event) {
-                               switch_mutex_lock(EVENT_QUEUE_MUTEX);
                                if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Adding a new event thread #%d\n", SOFT_MAX_DISPATCH + 1);
+                                       switch_mutex_lock(EVENT_QUEUE_MUTEX);
                                        launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
+                                       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
                                } else {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event threads maxed out at %d.\n", SOFT_MAX_DISPATCH);
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of threads!\n");
                                        switch_yield(1000000);
                                }
-                               switch_mutex_unlock(EVENT_QUEUE_MUTEX);
                        }
-                       
-
-                       switch_cond_next();
                }
        }
 
@@ -566,6 +568,8 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
 {
        switch_threadattr_t *thd_attr;
        uint32_t index = 0;
+       int launched = 0;
+       uint32_t sanity = 200;
 
        if (max > MAX_DISPATCH) {
                return;
@@ -584,8 +588,10 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                switch_threadattr_priority_increase(thd_attr);
                switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Create event dispatch thread %d\n", index);
-               switch_yield(100000);
+               while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
+               launched++;
+               break;
        }
 
        SOFT_MAX_DISPATCH = index;