]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fix some contention issues under really high load...That doesn't mean you need to...
authorAnthony Minessale <anthm@freeswitch.org>
Wed, 7 Nov 2012 18:10:50 +0000 (12:10 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 7 Nov 2012 18:10:50 +0000 (12:10 -0600)
src/mod/endpoints/mod_sofia/mod_sofia.h
src/mod/endpoints/mod_sofia/sofia.c
src/mod/endpoints/mod_sofia/sofia_glue.c
src/mod/endpoints/mod_sofia/sofia_presence.c
src/mod/endpoints/mod_sofia/sofia_reg.c
src/mod/event_handlers/mod_event_socket/mod_event_socket.c
src/switch_event.c

index 864d3a1e6f5bb23f4713ae56cc3e91d8f8caeeac..909516d63eff997be214ff7fe9a3778586be12a7 100644 (file)
@@ -389,6 +389,7 @@ struct mod_sofia_globals {
        int tracelevel;
        char *capture_server;   
        int rewrite_multicasted_fs_path;
+       int presence_flush;
 };
 extern struct mod_sofia_globals mod_sofia_globals;
 
@@ -694,6 +695,7 @@ struct sofia_profile {
        int ireg_seconds;
        sofia_paid_type_t paid_type;
        uint32_t rtp_digit_delay;
+       switch_queue_t *event_queue;
 };
 
 struct private_object {
@@ -1206,6 +1208,8 @@ int sofia_recover_callback(switch_core_session_t *session);
 void sofia_glue_set_name(private_object_t *tech_pvt, const char *channame);
 private_object_t *sofia_glue_new_pvt(switch_core_session_t *session);
 switch_status_t sofia_init(void);
+void sofia_glue_fire_events(sofia_profile_t *profile);
+void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event);
 
 /* For Emacs:
  * Local Variables:
index 1c14c473d782c6009f90c4a5107055196fe44ff0..6fcccabf53d60b7633d2f8b438e3ab7cb2ec55aa 100644 (file)
@@ -1559,7 +1559,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
 {
        sofia_dispatch_event_t *de = *dep; 
        switch_memory_pool_t *pool;
-       sofia_profile_t *profile = (*dep)->profile;
+       //sofia_profile_t *profile = (*dep)->profile;
        switch_thread_data_t *td;
 
        switch_core_new_memory_pool(&pool);
@@ -1571,44 +1571,9 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
        td->func = sofia_msg_thread_run_once;
        td->obj = de;
 
-       switch_mutex_lock(profile->ireg_mutex);
        switch_thread_pool_launch_thread(&td);
-       switch_mutex_unlock(profile->ireg_mutex);
-}
-
-#if 0
-void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
-{
-       sofia_dispatch_event_t *de = *dep; 
-       switch_threadattr_t *thd_attr = NULL;
-       switch_memory_pool_t *pool;
-       switch_thread_t *thread;
-       sofia_profile_t *profile = (*dep)->profile;
-       switch_status_t status;
-
-       switch_core_new_memory_pool(&pool);
-
 
-       *dep = NULL;
-       de->pool = pool;
-
-       switch_mutex_lock(profile->ireg_mutex);
-       switch_threadattr_create(&thd_attr, de->pool);
-       switch_threadattr_detach_set(thd_attr, 1);
-       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-       status = switch_thread_create(&thread, 
-                                                                 thd_attr, 
-                                                                 sofia_msg_thread_run_once, 
-                                                                 de,
-                                                                 de->pool);
-       switch_mutex_unlock(profile->ireg_mutex);
-       
-       if (status != SWITCH_STATUS_SUCCESS) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot create threads!\n");
-               sofia_process_dispatch_event(&de);
-       }
 }
-#endif
 
 void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
 {
@@ -1992,6 +1957,7 @@ void sofia_event_callback(nua_event_t event,
        sofia_queue_message(de);
 
  end:
+       //switch_cond_next();
 
        return;
 }
@@ -2133,7 +2099,7 @@ void event_handler(switch_event_t *event)
                        contact_str = fixed_contact_str;
                }
 
-               switch_mutex_lock(profile->ireg_mutex);
+
                sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
 
                switch_find_local_ip(guess_ip4, sizeof(guess_ip4), NULL, AF_INET);
@@ -2150,7 +2116,7 @@ void event_handler(switch_event_t *event)
                        sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Propagating registration for %s@%s->%s\n", from_user, from_host, contact_str);
                }
-               switch_mutex_unlock(profile->ireg_mutex);
+
 
                if (profile) {
                        sofia_glue_release_profile(profile);
@@ -2557,6 +2523,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
 
        switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
        switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
+       switch_queue_create(&profile->event_queue, SOFIA_QUEUE_SIZE, profile->pool);
+
 
        switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
        switch_sql_queue_manager_init_name(qname,
index bbaa9ae34764586728b08f68ff13bef7e12a677e..7e55407962a776bbf662013cb054ea609ac01686 100644 (file)
@@ -6498,6 +6498,9 @@ switch_bool_t sofia_glue_execute_sql_callback(sofia_profile_t *profile,
 
        switch_cache_db_release_db_handle(&dbh);
 
+
+       sofia_glue_fire_events(profile);
+
        return ret;
 }
 
@@ -7139,6 +7142,23 @@ char *sofia_glue_get_host(const char *str, switch_memory_pool_t *pool)
        return s;
 }
 
+void sofia_glue_fire_events(sofia_profile_t *profile)
+{
+       void *pop = NULL;
+
+       while (profile->event_queue && switch_queue_trypop(profile->event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+               switch_event_t *event = (switch_event_t *) pop;
+               switch_event_fire(&event);
+       }
+
+}
+
+void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event)
+{
+       switch_queue_push(profile->event_queue, *event);
+       *event = NULL;
+}
+
 
 /* For Emacs:
  * Local Variables:
index 17ab6b6efa79dc02307a3c725aa2c808a4377877..ed156c5f05d6a9517400052c0713d2f939584998 100644 (file)
@@ -1027,7 +1027,7 @@ static void conference_data_event_handler(switch_event_t *event)
        switch_safe_free(dup_domain);
 }
 
-static void actual_sofia_presence_event_handler(switch_event_t *event)
+static switch_event_t *actual_sofia_presence_event_handler(switch_event_t *event)
 {
        sofia_profile_t *profile = NULL;
        char *from = switch_event_get_header(event, "from");
@@ -1047,10 +1047,10 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
        switch_console_callback_match_t *matches;
        struct presence_helper helper = { 0 };                  
        int hup = 0;
-
+       switch_event_t *s_event = NULL;
 
        if (!mod_sofia_globals.running) {
-               return;
+               goto done;
        }
 
        if (zstr(proto) || !strcasecmp(proto, "any")) {
@@ -1091,7 +1091,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
 
                
                                                if (!mod_sofia_globals.profile_hash) {
-                                                       return;
+                                                       goto done;
                                                }
                                                
                                                if (from) {
@@ -1171,7 +1171,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
                }
                
                switch_safe_free(sql);
-               return;
+               goto done;
        }
 
        if (zstr(event_type)) {
@@ -1195,7 +1195,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
                        }
                } else {
                        switch_safe_free(user);
-                       return;
+                       goto done;
                }
                if ((euser = strchr(user, '+'))) {
                        euser++;
@@ -1203,7 +1203,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
                        euser = user;
                }
        } else {
-               return;
+               goto done;
        }
 
        switch (event->event_id) {
@@ -1462,8 +1462,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
 
                                if (hup && dh.hits < 1) { 
                                        /* so many phones get confused when whe hangup we have to reprobe to get them all to reset to absolute states so the lights stay correct */
-                                       switch_event_t *s_event;
-
+                                       
                                        if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) {
                                                switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "proto", SOFIA_CHAT_PROTO);
                                                switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "login", profile->name);
@@ -1471,10 +1470,9 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
                                                switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "to", "%s@%s", euser, host);
                                                switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
                                                switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
-                                               switch_event_fire(&s_event);
                                        }
                                }
-                       
+                               
                
                                if (!zstr((char *) helper.stream.data)) {
                                        char *this_sql = (char *) helper.stream.data;
@@ -1509,11 +1507,24 @@ static void actual_sofia_presence_event_handler(switch_event_t *event)
 
        switch_safe_free(sql);
        switch_safe_free(user);
+
+       return s_event;
 }
 
 static int EVENT_THREAD_RUNNING = 0;
 static int EVENT_THREAD_STARTED = 0;
 
+static void do_flush(void)
+{
+       void *pop = NULL;
+
+       while (mod_sofia_globals.presence_queue && switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+               switch_event_t *event = (switch_event_t *) pop;
+               switch_event_destroy(&event);
+       }
+
+}
+
 void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread, void *obj)
 {
        void *pop;
@@ -1544,6 +1555,15 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
                                break;
                        }
 
+                       if (mod_sofia_globals.presence_flush) {
+                               switch_mutex_lock(mod_sofia_globals.mutex);
+                               if (mod_sofia_globals.presence_flush) {
+                                       do_flush();
+                                       mod_sofia_globals.presence_flush = 0;
+                               }
+                               switch_mutex_unlock(mod_sofia_globals.mutex);
+                       }
+
                        switch(event->event_id) {
                        case SWITCH_EVENT_MESSAGE_WAITING:
                                actual_sofia_presence_mwi_event_handler(event);
@@ -1552,7 +1572,11 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
                                conference_data_event_handler(event);
                                break;
                        default:
-                               actual_sofia_presence_event_handler(event);
+                               do {
+                                       switch_event_t *ievent = event;
+                                       event = actual_sofia_presence_event_handler(ievent);
+                                       switch_event_destroy(&ievent);
+                               } while (event);
                                break;
                        }
 
@@ -1561,10 +1585,7 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread
                }
        }
 
-       while (switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
-               switch_event_t *event = (switch_event_t *) pop;
-               switch_event_destroy(&event);
-       }
+       do_flush();
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
 
@@ -1606,13 +1627,23 @@ void sofia_presence_event_handler(switch_event_t *event)
 {
        switch_event_t *cloned_event;
 
+       if (!EVENT_THREAD_STARTED) {
+               sofia_presence_event_thread_start();
+               switch_yield(500000);
+       }
+
        switch_event_dup(&cloned_event, event);
        switch_assert(cloned_event);
-       switch_queue_push(mod_sofia_globals.presence_queue, cloned_event);
 
-       if (!EVENT_THREAD_STARTED) {
-               sofia_presence_event_thread_start();
+       if (switch_queue_trypush(mod_sofia_globals.presence_queue, cloned_event) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Presence queue overloaded.... Flushing queue\n");
+               switch_mutex_lock(mod_sofia_globals.mutex);
+               mod_sofia_globals.presence_flush = 1;
+               switch_mutex_unlock(mod_sofia_globals.mutex);
+               switch_event_destroy(&cloned_event);
        }
+
+
 }
 
 
@@ -1640,7 +1671,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch
                        }
 
 
-                       switch_event_fire(&event);
+                       sofia_event_fire(profile, &event);
                }
                return 0;
        }
@@ -1653,7 +1684,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch
                switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_subtype", "probe");
                switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto-specific-event-name", event_name);
                switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "expires", expires);
-               switch_event_fire(&event);
+               sofia_event_fire(profile, &event);
        }
 
        return 0;
@@ -1777,7 +1808,7 @@ static int sofia_presence_resub_callback(void *pArg, int argc, char **argv, char
                        }
                }
 
-               switch_event_fire(&event);
+               sofia_event_fire(profile, &event);
        }
 
        switch_safe_free(free_me);
index 3e0e3965fec110e7fd4044e1153bec17761d2db8..4efa9fabe6c53cc2d4be0b62092226f2c8ea2762 100644 (file)
@@ -635,7 +635,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames
                        switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "contact", argv[3]);
                        switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "expires", argv[6]);
                        switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "user-agent", argv[7]);
-                       switch_event_fire(&s_event);
+                       sofia_event_fire(profile, &s_event);
                }
 
                if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
@@ -653,7 +653,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames
 
                        switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "status", "Unregistered");
                        switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
-                       switch_event_fire(&s_event);
+                       sofia_event_fire(profile, &s_event);
                }
 
        }
@@ -859,7 +859,6 @@ void sofia_reg_check_sync(sofia_profile_t *profile)
        sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
        sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
-
        sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
        sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
index f970a1c63e4ef4c9d56f2f4262fd60677d7fb6df..f1dc5874e83e15bbb1ccae9d75c89bc197bdbdd1 100644 (file)
@@ -175,13 +175,8 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
                        if (switch_queue_trypush(l->log_queue, dnode) == SWITCH_STATUS_SUCCESS) {
                                if (l->lost_logs) {
                                        int ll = l->lost_logs;
-                                       switch_event_t *event;
                                        l->lost_logs = 0;
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d log lines!\n", ll);
-                                       if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
-                                               switch_event_add_header(event, SWITCH_STACK_BOTTOM, "info", "lost %d log lines", ll);
-                                               switch_event_fire(&event);
-                                       }
                                }
                        } else {
                                switch_log_node_free(&dnode);
@@ -378,11 +373,6 @@ static void event_handler(switch_event_t *event)
                                                int le = l->lost_events;
                                                l->lost_events = 0;
                                                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost %d events!\n", le);
-                                               clone = NULL;
-                                               if (switch_event_create(&clone, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
-                                                       switch_event_add_header(clone, SWITCH_STACK_BOTTOM, "info", "lost %d events", le);
-                                                       switch_event_fire(&clone);
-                                               }
                                        }
                                } else {
                                        if (++l->lost_events > MAX_MISSED) {
index 566ccc82f836ffe13460be0c3d727e6f5a3f75f1..e589e02f43a72f27dacbf9cf19807030552a5010 100644 (file)
@@ -472,7 +472,6 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
 
-       
        for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
                switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
        }
@@ -487,8 +486,8 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
        }
 
        x = 0;
-       while (x < 10000 && THREAD_COUNT) {
-               switch_cond_next();
+       while (x < 100 && THREAD_COUNT) {
+               switch_yield(100000);
                if (THREAD_COUNT == last) {
                        x++;
                }