]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
optimize sofia sql by using new core transaction processor we will no longer support...
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 25 Oct 2012 16:31:42 +0000 (11:31 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 25 Oct 2012 16:31:47 +0000 (11:31 -0500)
src/include/switch_core.h
src/mod/endpoints/mod_sofia/mod_sofia.c
src/mod/endpoints/mod_sofia/mod_sofia.h
src/mod/endpoints/mod_sofia/sofia.c
src/mod/endpoints/mod_sofia/sofia_glue.c
src/mod/endpoints/mod_sofia/sofia_presence.c
src/mod/endpoints/mod_sofia/sofia_reg.c
src/switch_core_session.c
src/switch_core_sqldb.c

index a3898d9d64c8911fb185c653014a0082511fcf93..f647066c3819fa43e4bb396adeb6b29378379b9c 100644 (file)
@@ -70,6 +70,12 @@ typedef struct switch_hold_record_s {
 } switch_hold_record_t;
 
 
+typedef struct switch_thread_data_s {
+       switch_thread_start_t func;
+       void *obj;
+       int alloc;
+} switch_thread_data_t;
+
 
 #define MESSAGE_STAMP_FFL(_m) _m->_file = __FILE__; _m->_func = __SWITCH_FUNC__; _m->_line = __LINE__
 
@@ -703,6 +709,7 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_
 SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
 
 
+SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp);
 SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session);
 
 /*! 
@@ -2418,14 +2425,18 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
 SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
 SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
 
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
 SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
 SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp);
-SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp, 
-                                                                                                                                        uint32_t numq, const char *dsn,
-                                                                                                                                        const char *pre_trans_execute,
-                                                                                                                                        const char *post_trans_execute,
-                                                                                                                                        const char *inner_pre_trans_execute,
-                                                                                                                                        const char *inner_post_trans_execute);
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
+                                                                                                                                                 switch_sql_queue_manager_t **qmp, 
+                                                                                                                                                 uint32_t numq, const char *dsn,
+                                                                                                                                                 const char *pre_trans_execute,
+                                                                                                                                                 const char *post_trans_execute,
+                                                                                                                                                 const char *inner_pre_trans_execute,
+                                                                                                                                                 const char *inner_post_trans_execute);
+
+#define switch_switch_sql_queue_manager_init(_q, _d, _p1, _p2, _ip1, _ip2) switch_switch_sql_queue_manager_init_name(__FILE__, _q, _d, _p1, _p2, _ip1, _ip2)
 
 SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
 SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
index 14d501489c1d55bfa08eec2c0066822d97b85a61..898b5e49ed6caf7f16357593b4abdb9ed80df00e 100644 (file)
@@ -4900,8 +4900,7 @@ static switch_call_cause_t sofia_outgoing_channel(switch_core_session_t *session
                sql = switch_mprintf("insert into sip_dialogs (uuid,presence_id,presence_data,profile_name,hostname,rcd,call_info_state) "
                                                         "values ('%q', '%q', '%q', '%q', '%q', %ld, '')", switch_core_session_get_uuid(nsession),
                                                         switch_str_nil(presence_id), switch_str_nil(presence_data), profile->name, mod_sofia_globals.hostname, (long) now);
-               sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-               switch_safe_free(sql);
+               sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
        }
 #endif
 
index ecb2d1c149da21bb0571a10f3eb04f774d5405f0..f01a121608efd0d0d01ec6528abc9047c2e3e753 100644 (file)
@@ -241,7 +241,7 @@ typedef enum {
        PFLAG_DISABLE_HOLD,
        PFLAG_AUTO_NAT,
        PFLAG_SIPCOMPACT,
-       PFLAG_SQL_IN_TRANS,
+       PFLAG_USE_ME,
        PFLAG_PRESENCE_PRIVACY,
        PFLAG_PASS_CALLEE_ID,
        PFLAG_LOG_AUTH_FAIL,
@@ -273,6 +273,7 @@ typedef enum {
        PFLAG_MWI_USE_REG_CALLID,
        PFLAG_FIRE_MESSAGE_EVENTS,
        PFLAG_SEND_DISPLAY_UPDATE,
+       PFLAG_RUNNING_TRANS,
        /* No new flags below this line */
        PFLAG_MAX
 } PFLAGS;
@@ -632,7 +633,7 @@ struct sofia_profile {
        char *post_trans_execute;
        char *inner_pre_trans_execute;
        char *inner_post_trans_execute; 
-       switch_queue_t *sql_queue;
+       switch_sql_queue_manager_t *qm;
        char *acl[SOFIA_MAX_ACL];
        char *acl_pass_context[SOFIA_MAX_ACL];
        char *acl_fail_context[SOFIA_MAX_ACL];
index 979e185781bc55ce14ce2b64e3a73e82bd516737..d463345f7c3c3761d021b6cf3b7a3554c1051d78 100644 (file)
@@ -1520,6 +1520,28 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run_once(switch_thread_t *thread, void
        return NULL;
 }
 
+void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
+{
+       sofia_dispatch_event_t *de = *dep; 
+       switch_memory_pool_t *pool;
+       sofia_profile_t *profile = (*dep)->profile;
+       switch_thread_data_t *td;
+
+       switch_core_new_memory_pool(&pool);
+
+       *dep = NULL;
+       de->pool = pool;
+
+       td = switch_core_alloc(pool, sizeof(*td));
+       td->func = sofia_msg_thread_run_once;
+       td->obj = de;
+
+       switch_mutex_lock(profile->ireg_mutex);
+       switch_thread_pool_launch_thread(&td);
+       switch_mutex_unlock(profile->ireg_mutex);
+}
+
+#if 0
 void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
 {
        sofia_dispatch_event_t *de = *dep; 
@@ -1551,6 +1573,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
                sofia_process_dispatch_event(&de);
        }
 }
+#endif
 
 void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
 {
@@ -2158,153 +2181,63 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
        sofia_profile_t *profile = (sofia_profile_t *) obj;
        uint32_t ireg_loops = profile->ireg_seconds;                                    /* Number of loop iterations done when we haven't checked for registrations */
        uint32_t gateway_loops = GATEWAY_SECONDS;                       /* Number of loop iterations done when we haven't checked for gateways */
-       void *pop = NULL;                                       /* queue_pop placeholder */
-       switch_size_t sql_len = 1024 * 32;      /* length of sqlbuf */
-       char *tmp, *sqlbuf = NULL;                      /* Buffer for SQL statements */
-       char *sql = NULL;                                       /* Current SQL statement */
-       switch_time_t last_commit;                      /* Last time we committed stuff to the DB */
-       switch_time_t last_check;                       /* Last time we did the second-resolution loop that checks various stuff */
-       switch_size_t len = 0;                          /* Current length of sqlbuf */
-       uint32_t statements = 0;                        /* Number of statements in the current sql buffer */
-       
-       last_commit = last_check = switch_micro_time_now();
-       
-       if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
-               sqlbuf = (char *) malloc(sql_len);
-       }
 
        sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
 
-       switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
-
-       /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
-       while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
-
-               if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
-                       /* Do we have enough statements or is the timeout expired */
-                       while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
-                                               switch_micro_time_now() - last_check < 1000000 &&
-                                       (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
+       while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING))) {
+               
+               if (profile->watchdog_enabled) {
+                       uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
+                       
+                       if (profile->step_timeout) {
+                               step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
                                
-                               switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
-
-                               if (sleepy_time < 1000 || sleepy_time > 1000000) {
-                                       sleepy_time = 1000;
+                               if (step_diff > profile->step_timeout) {
+                                       step_fail = 1;
                                }
+                       }
+                       
+                       if (profile->event_timeout) {
+                               event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
                                
-                               if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
-                                       switch_size_t newlen;
-                                       
-                                       if (!sql) sql = (char *) pop;
-
-                                       newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
-
-                                       if (len + newlen + 10 > sql_len) {
-                                               switch_size_t new_mlen = len + newlen + 10 + 10240;
-                                               
-                                               if (new_mlen < SQLLEN) {
-                                                       sql_len = new_mlen;
-                                                       
-                                                       if (!(tmp = realloc(sqlbuf, sql_len))) {
-                                                               abort();
-                                                               break;
-                                                       }
-                                                       sqlbuf = tmp;
-                                               } else {
-                                                       break;
-                                               }
-                                       }
-
-                                       sprintf(sqlbuf + len, "%s;\n", sql);
-                                       len += newlen;
-                                       free(sql);
-                                       sql = NULL;
-                                       
-                                       statements++;
+                               if (event_diff > profile->event_timeout) {
+                                       event_fail = 1;
                                }
                        }
                        
-                       /* Execute here */
-                       last_commit = switch_micro_time_now();
-                       
-                       if (len) {
-                               //printf("TRANS:\n%s\n", sqlbuf);
-                               switch_mutex_lock(profile->ireg_mutex);
-                               sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
-                               //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
-                               switch_mutex_unlock(profile->ireg_mutex);
-                               statements = 0;
-                               len = 0;
+                       if (step_fail && profile->event_timeout && !event_fail) {
+                               step_fail = 0;
                        }
-
-               } else {
-                       if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
-                               sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
-                               free(pop);
+                       
+                       if (event_fail || step_fail) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
+                                                                 "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
+                               switch_yield(2000000);
+                               watchdog_triggered_abort();
                        }
                }
 
-               if (switch_micro_time_now() - last_check >= 1000000) {
-                       if (profile->watchdog_enabled) {
-                               uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
-                               
-                               if (profile->step_timeout) {
-                                       step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
-
-                                       if (step_diff > profile->step_timeout) {
-                                               step_fail = 1;
-                                       }
-                               }
-
-                               if (profile->event_timeout) {
-                                       event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
 
-                                       if (event_diff > profile->event_timeout) {
-                                               event_fail = 1;
-                                       }
-                               }
-
-                               if (step_fail && profile->event_timeout && !event_fail) {
-                                       step_fail = 0;
-                               }
-
-                               if (event_fail || step_fail) {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
-                                                                         "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
-                                       switch_yield(2000000);
-                                       watchdog_triggered_abort();
-                               }
+               if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
+                       if (++ireg_loops >= IREG_SECONDS) {
+                               time_t now = switch_epoch_time_now(NULL);
+                               sofia_reg_check_expire(profile, now, 0);
+                               ireg_loops = 0;
                        }
-
-
-                       if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
-                               if (++ireg_loops >= IREG_SECONDS) {
-                                       time_t now = switch_epoch_time_now(NULL);
-                                       sofia_reg_check_expire(profile, now, 0);
-                                       ireg_loops = 0;
-                               }
-
-                               if (++gateway_loops >= GATEWAY_SECONDS) {
-                                       sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
-                                       gateway_loops = 0;
-                               }
                        
-                               sofia_sub_check_gateway(profile, time(NULL));
+                       if (++gateway_loops >= GATEWAY_SECONDS) {
+                               sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
+                               gateway_loops = 0;
                        }
                        
-                       last_check = switch_micro_time_now();
+                       sofia_sub_check_gateway(profile, time(NULL));
                }
-       }
 
-       switch_mutex_lock(profile->ireg_mutex);
-       while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
-               sofia_glue_actually_execute_sql(profile, (char *) pop, NULL);
-               free(pop);
+               switch_yield(1000000);
+               
        }
-       switch_mutex_unlock(profile->ireg_mutex);
 
        sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING);
-       switch_safe_free(sqlbuf);
 
        return NULL;
 }
@@ -2409,6 +2342,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
        int sanity;
        switch_thread_t *worker_thread;
        switch_status_t st;
+       char qname [128] = "";
 
        switch_mutex_lock(mod_sofia_globals.mutex);
        mod_sofia_globals.threads++;
@@ -2596,6 +2530,17 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
        switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
        switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
 
+       switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
+       switch_switch_sql_queue_manager_init_name(qname,
+                                                                                         &profile->qm,
+                                                                                         1,
+                                                                                         profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
+                                                                                         profile->pre_trans_execute,
+                                                                                         profile->post_trans_execute,
+                                                                                         profile->inner_pre_trans_execute,
+                                                                                         profile->inner_post_trans_execute);
+       switch_switch_sql_queue_manager_start(profile->qm);
+
        if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {
                switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
                                                                (sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
@@ -2682,6 +2627,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
        switch_mutex_lock(profile->flag_mutex);
        switch_mutex_unlock(profile->flag_mutex);
 
+       switch_switch_sql_queue_manager_stop(profile->qm);
+
        if (switch_event_create(&s_event, SWITCH_EVENT_UNPUBLISH) == SWITCH_STATUS_SUCCESS) {
                switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
                                                                (sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
@@ -4405,7 +4352,6 @@ switch_status_t config_sofia(int reload, char *profile_name)
                                sofia_set_pflag(profile, PFLAG_SEND_DISPLAY_UPDATE);
                                sofia_set_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER);
                                //sofia_set_pflag(profile, PFLAG_PRESENCE_ON_FIRST_REGISTER);           
-                               sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
 
                                profile->shutdown_type = "false";
                                profile->local_network = "localnet.auto";
@@ -5107,20 +5053,6 @@ switch_status_t config_sofia(int reload, char *profile_name)
                                                } else {
                                                        sofia_clear_pflag(profile, PFLAG_PASS_CALLEE_ID);
                                                }
-                                       } else if (!strcasecmp(var, "sql-in-transactions")) {
-                                               int tmp = atoi(val);
-
-                                               if (switch_true(val)) {
-                                                       tmp = 500;
-                                               }
-
-                                               if (tmp > 0) {
-                                                       profile->trans_timeout = tmp;
-                                                       sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
-                                               } else {
-                                                       sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
-                                               }
-
                                        } else if (!strcasecmp(var, "enable-soa")) {
                                                if (switch_true(val)) {
                                                        sofia_set_flag(profile, TFLAG_ENABLE_SOA);
@@ -6102,8 +6034,7 @@ static void sofia_handle_sip_r_invite(switch_core_session_t *session, int status
                                                                                 switch_str_nil(presence_id), switch_str_nil(presence_data), switch_str_nil(p), (long) now);
                                        switch_assert(sql);
 
-                                       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-                                       switch_safe_free(sql);
+                                       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
                                }
                        } else if (status == 200 && (profile->pres_type)) {
@@ -9406,9 +9337,7 @@ void sofia_handle_sip_i_invite(switch_core_session_t *session, nua_t *nua, sofia
 
                switch_assert(sql);
 
-               sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-               switch_safe_free(sql);
-
+               sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
        }
 
        if (is_nat) {
index 10ce4deba17129fb0bf50e1d24f34d96497cfa95..0117612f6ae5528664034abe30ca74a818d9c43f 100644 (file)
@@ -6269,7 +6269,8 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
        };
                
        switch_cache_db_handle_t *dbh = sofia_glue_get_db_handle(profile);
-               
+       char *test2;
+
        if (!dbh) {
                return 0;
        }
@@ -6283,20 +6284,22 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
 
 
        switch_cache_db_test_reactive(dbh, test_sql, "drop table sip_registrations", reg_sql);
-
-
-       if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
-               char *test2 = switch_mprintf("%s;%s", test_sql, test_sql);
+       
+       test2 = switch_mprintf("%s;%s", test_sql, test_sql);
                        
-               if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "GREAT SCOTT!!! Cannot execute batched statements!\n"
-                                                         "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
-                       sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
-
-               }
+       if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "GREAT SCOTT!!! Cannot execute batched statements!\n"
+                                                 "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
+               
+               switch_cache_db_release_db_handle(&dbh);
                free(test2);
+               free(test_sql);
+               return 0;
        }
 
+       free(test2);
+
+
        free(test_sql);
 
        test_sql = switch_mprintf("delete from sip_subscriptions where hostname='%q' and full_to='XXX'", mod_sofia_globals.hostname);
@@ -6346,45 +6349,31 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
 
 void sofia_glue_execute_sql(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
 {
-       switch_status_t status = SWITCH_STATUS_FALSE;
-       char *d_sql = NULL, *sql;
+       char *sql;
 
        switch_assert(sqlp && *sqlp);
-       sql = *sqlp;
+       sql = *sqlp;    
 
-       if (profile->sql_queue) {
-               if (sql_already_dynamic) {
-                       d_sql = sql;
-               } else {
-                       d_sql = strdup(sql);
-               }
-
-               switch_assert(d_sql);
-               if ((status = switch_queue_trypush(profile->sql_queue, d_sql)) == SWITCH_STATUS_SUCCESS) {
-                       d_sql = NULL;
-               }
-       } else if (sql_already_dynamic) {
-               d_sql = sql;
-       }
-
-       if (status != SWITCH_STATUS_SUCCESS) {
-               sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-       }
-
-       switch_safe_free(d_sql);
+       switch_switch_sql_queue_manager_push(profile->qm, sql, 0, !sql_already_dynamic);
 
        if (sql_already_dynamic) {
                *sqlp = NULL;
        }
 }
 
+
 void sofia_glue_execute_sql_now(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
 {
-       sofia_glue_actually_execute_sql(profile, *sqlp, profile->ireg_mutex);
+       char *sql;
+
+       switch_assert(sqlp && *sqlp);
+       sql = *sqlp;    
+
+       switch_switch_sql_queue_manager_push_confirm(profile->qm, sql, 0, !sql_already_dynamic);
+
        if (sql_already_dynamic) {
-               switch_safe_free(*sqlp);
+               *sqlp = NULL;
        }
-       *sqlp = NULL;
 }
 
 
index f03584f1c34fb136fd4d371f49dd7a16d6bdd38c..42fefe3af8ed9abd9a271d48e88888949deeed34 100644 (file)
@@ -3619,9 +3619,7 @@ void sofia_presence_handle_sip_i_subscribe(int status,
                        }
                        
                        switch_assert(sql != NULL);
-                       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-                       switch_safe_free(sql);
-                       
+                       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
                        sstr = switch_mprintf("terminated;reason=noresource");
 
                } else {
@@ -4522,8 +4520,7 @@ void sofia_presence_check_subscriptions(sofia_profile_t *profile, time_t now)
                                                                  "sub del sql: %s\n", sql);            
                        }
 
-                       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-                       switch_safe_free(sql);
+                       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
                }
        }
 
index ca2ff0f5c5f5e349c4a9f6a8af95815f28ae6146..2d54f31bc4ad81252c6a250adbf9b323ba0e35ae 100644 (file)
@@ -695,7 +695,7 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int
        switch_safe_free(sql);
 
        sql = switch_mprintf("delete from sip_registrations where call_id='%q' %s", call_id, sqlextra);
-       sofia_glue_execute_sql_now(profile, &sql, SWITCH_FALSE);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
        switch_safe_free(sqlextra);
        switch_safe_free(sql);
@@ -705,84 +705,80 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int
 
 void sofia_reg_check_expire(sofia_profile_t *profile, time_t now, int reboot)
 {
-       char sql[1024];
-
-
+       char *sql;
 
        if (now) {
-               switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+               sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
                                                ",user_agent,server_user,server_host,profile_name,network_ip"
                                                ",%d from sip_registrations where expires > 0 and expires <= %ld", reboot, (long) now);
        } else {
-               switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+               sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
                                                ",user_agent,server_user,server_host,profile_name,network_ip" ",%d from sip_registrations where expires > 0", reboot);
        }
 
        sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
        if (now) {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
+               sql = switch_mprintf("delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
                                                (long) now, mod_sofia_globals.hostname);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+               sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
        }
-
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
+       
 
 
 
        if (now) {
-               switch_snprintfv(sql, sizeof(sql), "select call_id from sip_shared_appearance_dialogs where hostname='%q' "
+               sql = switch_mprintf("select call_id from sip_shared_appearance_dialogs where hostname='%q' "
                                                "and profile_name='%s' and expires <= %ld", mod_sofia_globals.hostname, profile->name, (long) now);
 
                sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_sla_dialog_del_callback, profile);
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
+               sql = switch_mprintf("delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
                                                mod_sofia_globals.hostname, (long) now);
 
 
-               sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+               sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
        }
 
 
        if (now) {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
+               sql = switch_mprintf("delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
                                                (long) now, mod_sofia_globals.hostname);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+               sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
        }
 
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
 
        if (now) {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
+               sql = switch_mprintf("delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
                                                (long) now, mod_sofia_globals.hostname);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+               sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
        }
 
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-       
-       sofia_presence_check_subscriptions(profile, now);
+       sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
 
        if (now) {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
+               sql = switch_mprintf("delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
                                                (long) now, mod_sofia_globals.hostname);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+               sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
        }
 
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
 
 
        if (now) {
                if (sofia_test_pflag(profile, PFLAG_ALL_REG_OPTIONS_PING)) {
-                       switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
+                       sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
                                                        "expires,user_agent,server_user,server_host,profile_name"
  " from sip_registrations where hostname='%s' and " 
  "profile_name='%s'", mod_sofia_globals.hostname, profile->name); 
                        
                        sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_nat_callback, profile);
                } else if (sofia_test_pflag(profile, PFLAG_NAT_OPTIONS_PING)) {
-                       switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
+                       sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
                                                        "expires,user_agent,server_user,server_host,profile_name"
                                                        " from sip_registrations where (status like '%%NAT%%' "
  "or contact like '%%fs_nat=yes%%') and hostname='%s' " 
@@ -846,37 +842,36 @@ void sofia_reg_check_call_id(sofia_profile_t *profile, const char *call_id)
 
 void sofia_reg_check_sync(sofia_profile_t *profile)
 {
-       char sql[1024];
-
+       char *sql;
 
-       switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+       sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
                                        ",user_agent,server_user,server_host,profile_name,network_ip" 
                                        " from sip_registrations where expires > 0");
 
 
        sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
-       switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
 
-       switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
-       switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
        
-       switch_snprintfv(sql, sizeof(sql), "delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sql = switch_mprintf("delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
-       switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+       sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
 }
 
 char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const char *host, char *val, switch_size_t len)
 {
        struct callback_t cbt = { 0 };
-       char sql[512] = "";
+       char *sql;
 
        if (!user) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@@ -887,10 +882,10 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c
        cbt.len = len;
 
        if (host) {
-               switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+               sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
                                                user, host, host);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
+               sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
        }
 
 
@@ -908,7 +903,7 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c
 switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *profile, const char *user, const char *host)
 {
        struct callback_t cbt = { 0 };
-       char sql[512] = "";
+       char *sql;
 
        if (!user) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@@ -916,10 +911,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p
        }
 
        if (host) {
-               switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+               sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
                                                user, host, host);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
+               sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
        }
 
 
@@ -932,7 +927,7 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p
 switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_multi(sofia_profile_t *profile, const char *user, const char *host)
 {
        struct callback_t cbt = { 0 };
-       char sql[512] = "";
+       char *sql;
 
        if (!user) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@@ -940,10 +935,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_mu
        }
 
        if (host) {
-               switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+               sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
                                                user, host, host);
        } else {
-               switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q'", user);
+               sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q'", user);
        }
 
        sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_find_reg_with_positive_expires_callback, &cbt);
@@ -973,8 +968,7 @@ void sofia_reg_auth_challenge(sofia_profile_t *profile, nua_handle_t *nh, sofia_
                                                 (long) switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : DEFAULT_NONCE_TTL),
                                                 profile->name, mod_sofia_globals.hostname);
        switch_assert(sql != NULL);
-       sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-       switch_safe_free(sql);
+       sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
        auth_str = switch_mprintf("Digest realm=\"%q\", nonce=\"%q\",%s algorithm=MD5, qop=\"auth\"", realm, uuid_str, stale ? " stale=true," : "");
 
@@ -2802,8 +2796,7 @@ auth_res_t sofia_reg_parse_auth(sofia_profile_t *profile,
                                                         switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : exptime + 10), ncl, nonce);
 
                switch_assert(sql != NULL);
-               sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-               switch_safe_free(sql);
+               sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
 
                if (ret == AUTH_OK)
                        ret = AUTH_RENEWED;
index 4c16fadc5193e91a91d3c03662d72c6f41db9b8a..4148267da6245faa19f0f162b0e099e1a34468e8 100644 (file)
@@ -1530,25 +1530,25 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
                }
 
                if (check_status == SWITCH_STATUS_SUCCESS) {
-                       switch_core_session_t *session = (switch_core_session_t *) pop;
-                       switch_size_t id;
+                       switch_thread_data_t *td = (switch_thread_data_t *) pop;
                        
-                       if (!session) break;
+                       if (!td) break;
 
-                       id = session->id;
-                       
                        switch_mutex_lock(session_manager.mutex);
                        session_manager.busy++;
                        switch_mutex_unlock(session_manager.mutex);
                        
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing session %"SWITCH_SIZE_T_FMT" %s\n",
-                                                         (long) thread, id, switch_core_session_get_name(session));
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
 
-                       switch_core_session_thread(thread, (void *) session);
-                       
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing session %"SWITCH_SIZE_T_FMT"\n",
-                                                         (long) thread, id);
 
+                       td->func(thread, td->obj);
+
+                       if (td->alloc) {
+                               free(td);
+                       }
+                       
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread);
+                       
                        switch_mutex_lock(session_manager.mutex);
                        session_manager.busy--;
                        switch_mutex_unlock(session_manager.mutex);
@@ -1656,11 +1656,27 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t
        return NULL;
 }
 
+SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+       switch_thread_data_t *td;
+
+       switch_assert(tdp);
+
+       td = *tdp;
+       *tdp = NULL;
+
+       switch_queue_push(session_manager.thread_queue, td);
+       check_queue();
+
+       return status;  
+}
 
 SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
 {
        switch_status_t status = SWITCH_STATUS_INUSE;
-       
+       switch_thread_data_t *td;
+
        switch_mutex_lock(session->mutex);
        if (switch_test_flag(session, SSF_THREAD_RUNNING)) {
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot double-launch thread!\n");
@@ -1670,7 +1686,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
                status = SWITCH_STATUS_SUCCESS;
                switch_set_flag(session, SSF_THREAD_RUNNING);
                switch_set_flag(session, SSF_THREAD_STARTED);
-               switch_queue_push(session_manager.thread_queue, session);
+               td = switch_core_session_alloc(session, sizeof(*td));
+               td->obj = session;
+               td->func = switch_core_session_thread;
+               switch_queue_push(session_manager.thread_queue, td);
                check_queue();
        }
        switch_mutex_unlock(session->mutex);
index 80ab68c110a6a2417f88ad265ce9aec379e44406..2ebfd5b464e710e402859c33cefabb709e22da80 100644 (file)
@@ -1214,14 +1214,19 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
 static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
 
 struct switch_sql_queue_manager {
+       const char *name;
        switch_cache_db_handle_t *event_db;
        switch_queue_t **sql_queue;
+       uint32_t *pre_written;
+       uint32_t *written;
+       int *sizes;
        uint32_t numq;
        char *dsn;
        switch_thread_t *thread;
        int thread_running;
        switch_thread_cond_t *cond;
        switch_mutex_t *cond_mutex;
+       switch_mutex_t *mutex;
        char *pre_trans_execute;
        char *post_trans_execute;
        char *inner_pre_trans_execute;
@@ -1229,12 +1234,15 @@ struct switch_sql_queue_manager {
        switch_memory_pool_t *pool;
 };
 
-static void qm_wake(switch_sql_queue_manager_t *qm)
+static int qm_wake(switch_sql_queue_manager_t *qm)
 {
        if (switch_mutex_trylock(qm->cond_mutex) == SWITCH_STATUS_SUCCESS) {
                switch_thread_cond_signal(qm->cond);
                switch_mutex_unlock(qm->cond_mutex);
+               return 1;
        }
+
+       return 0;
 }
 
 static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
@@ -1335,15 +1343,59 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_
 }
 
 
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
+{
+       int want, size, x = 0, sanity = 0;
+       uint32_t written;
+
+       if (!qm->thread_running) {
+               return SWITCH_STATUS_FALSE;
+       }
+
+       if (sql_manager.thread_running != 1) {
+               return SWITCH_STATUS_FALSE;
+       }
+       
+       if (pos > qm->numq - 1) {
+               pos = 0;
+       }
+
+       switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
+
+       switch_mutex_lock(qm->mutex);
+       written = qm->written[pos];
+       size = qm->sizes[pos];
+       want = written + size;
+       switch_mutex_unlock(qm->mutex);
+
+       qm_wake(qm);
+
+       while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) {
+               switch_yield(5000);
+
+               if (++x == 200) {
+                       qm_wake(qm);
+                       x = 0;
+                       if (++sanity == 20) {
+                               break;
+                       }
+               }
+       }
+
+       return SWITCH_STATUS_SUCCESS;
+}
+
+
 
 
 
-SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp, 
-                                                                                                                                        uint32_t numq, const char *dsn,
-                                                                                                                                        const char *pre_trans_execute,
-                                                                                                                                        const char *post_trans_execute,
-                                                                                                                                        const char *inner_pre_trans_execute,
-                                                                                                                                        const char *inner_post_trans_execute)
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
+                                                                                                                                                 switch_sql_queue_manager_t **qmp, 
+                                                                                                                                                 uint32_t numq, const char *dsn,
+                                                                                                                                                 const char *pre_trans_execute,
+                                                                                                                                                 const char *post_trans_execute,
+                                                                                                                                                 const char *inner_pre_trans_execute,
+                                                                                                                                                 const char *inner_post_trans_execute)
 {
        switch_memory_pool_t *pool;
        switch_sql_queue_manager_t *qm;
@@ -1357,11 +1409,16 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_
        qm->pool = pool;
        qm->numq = numq;
        qm->dsn = switch_core_strdup(qm->pool, dsn);
+       qm->name = switch_core_strdup(qm->pool, name);
 
        switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
+       switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
        switch_thread_cond_create(&qm->cond, qm->pool);
        
        qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
+       qm->sizes = switch_core_alloc(qm->pool, sizeof(int) * numq);
+       qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
+       qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
 
        for (i = 0; i < qm->numq; i++) {
                switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
@@ -1400,13 +1457,13 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
        while (!qm->event_db) {
                if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
                        break;
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
                switch_yield(500000);
                sanity--;
        }
 
        if (!qm->event_db) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db\n");
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
                return NULL;
        }
 
@@ -1431,14 +1488,19 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
        while (qm->thread_running == 1) {
                int proceed = !!save_sql;
+               int pindex = -1;
 
                if (!proceed) {
                        for (i = 0; i < qm->numq; i++) {
                                if (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
                                        if (sql_manager.thread_running != 1) {
-                                               free(pop);
-                                               pop = NULL;
+                                               if (pop) {
+                                                       switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
+                                                       free(pop);
+                                                       pop = NULL;
+                                               }
                                        } else {
+                                               pindex = i;
                                                proceed = 1;
                                                break;
                                        }
@@ -1470,7 +1532,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                                if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
                                                                                                                for (i = 0; i < qm->numq; i++) {
                                                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                                 "REALLOC QUEUE %ld %d %d\n", 
+                                                                                                 "%s REALLOC QUEUE %ld %d %d\n", 
+                                                                                                 qm->name,
                                                                                                  (long int)sql_len,
                                                                                                  i,
                                                                                                  switch_queue_size(qm->sql_queue[i]));
@@ -1478,7 +1541,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                                        }
                                                }
                                                if (!(tmp = realloc(sqlbuf, sql_len))) {
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
                                                        abort();
                                                        break;
                                                }
@@ -1487,7 +1550,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                                if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
                                                        for (i = 0; i < qm->numq; i++) {
                                                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                                 "SAVE QUEUE %d %d\n", 
+                                                                                                 "%s SAVE QUEUE %d %d\n", 
+                                                                                                 qm->name,
                                                                                                  i,
                                                                                                  switch_queue_size(qm->sql_queue[i]));
                                                                
@@ -1499,6 +1563,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                                goto skip;
                                        }
                                }
+                               
+                               switch_mutex_lock(qm->mutex);
+                               qm->pre_written[pindex]++;
+                               switch_mutex_unlock(qm->mutex);
 
                                iterations++;                           
                                sprintf(sqlbuf + len, "%s;\n", sql);
@@ -1506,7 +1574,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                free(sql);
                                sql = NULL;
                        } else {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
                                break;
                        }
                }
@@ -1519,14 +1587,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                auto_pause = 1;
                                switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
                                auto_pause = 1;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
                        }
                } else {
                        if (auto_pause && lc < 1000) {
                                auto_pause = 0;
                                switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
                                auto_pause = 0;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
                        }
                }
        
@@ -1535,14 +1603,23 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                wrote = 0;
 
                if (trans && iterations && (iterations > target || !lc)) {
+
                        if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
+                               char line[128] = "";
+                               int l;
+                               
+                               switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
+                               
                                for (i = 0; i < qm->numq; i++) {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                         "RUN QUEUE %d %d %d\n", 
-                                                                         i,
-                                                                         switch_queue_size(qm->sql_queue[i]), 
-                                                                         iterations);
+                                       l = strlen(line);
+                                       switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
                                }
+                               
+                               l = strlen(line);
+                               switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
+
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
+
                        }
                        if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
                                                                                                                          qm->pre_trans_execute,
@@ -1550,13 +1627,12 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
                                                                                                                          qm->inner_pre_trans_execute,
                                                                                                                          qm->inner_post_trans_execute
                                                                                                                          ) != SWITCH_STATUS_SUCCESS) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
                        }
                        if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { 
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
                        }
 
-
                        iterations = 0;
                        trans = 0;
                        len = 0;
@@ -1572,6 +1648,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
                lc = qm_ttl(qm);
                
+               switch_mutex_lock(qm->mutex);
+               for (i = 0; i < qm->numq; i++) {
+                       qm->sizes[i] = switch_queue_size(qm->sql_queue[i]);
+                       qm->written[i] += qm->pre_written[i];
+                       qm->pre_written[i] = 0;
+               }
+               switch_mutex_unlock(qm->mutex);
+               
                if (!lc) {
                        switch_thread_cond_wait(qm->cond, qm->cond_mutex);
                } else if (wrote) {
@@ -1587,7 +1671,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
        for(i = 0; i < qm->numq; i++) {
                while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
-                       switch_safe_free(pop);
+                       if (pop) {
+                               switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
+                               free(pop);
+                       }
                }
        }