]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
add nitrus boost to sql thread
authorAnthony Minessale <anthm@freeswitch.org>
Tue, 14 Sep 2010 21:19:03 +0000 (16:19 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Tue, 14 Sep 2010 21:19:13 +0000 (16:19 -0500)
src/include/private/switch_core_pvt.h
src/mod/endpoints/mod_sofia/sofia.c
src/switch_core.c
src/switch_core_sqldb.c

index 50dde95dc74e556a4d4fe9cc48c6235abae8dcfb..e77d5f74efb895dfc1c6e0314100a583efb948e4 100644 (file)
@@ -233,6 +233,8 @@ struct switch_runtime {
        switch_profile_timer_t *profile_timer;
        double profile_time;
        double min_idle_time;
+       int sql_buffer_len;
+       int max_sql_buffer_len;
 };
 
 extern struct switch_runtime runtime;
index 4d9ce9eea53166074d45310c7f301f6759288afe..2ae5048b0697b68dc3a939bed51c982eca2b70ae 100644 (file)
@@ -1225,7 +1225,7 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char *
 #define sofia_profile_start_failure(p, xp) sofia_perform_profile_start_failure(p, xp, __FILE__, __LINE__)
 
 
-#define SQLLEN 1024 * 32
+#define SQLLEN 1024 * 1024
 void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
 {
        sofia_profile_t *profile = (sofia_profile_t *) obj;
@@ -1235,10 +1235,10 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
        uint32_t qsize;
        void *pop;
        int loop_count = 0;
-       switch_size_t sql_len = SQLLEN;
-       char *sqlbuf = NULL;
+       switch_size_t sql_len = 1024 * 32;
+       char *tmp, *sqlbuf = NULL;
        char *sql = NULL;
-
+       
        if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
                sqlbuf = (char *) malloc(sql_len);
        }
@@ -1254,33 +1254,43 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
 
        while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) {
                if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
-                       if ((qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) || sql) {
+                       if (qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) {
                                switch_size_t newlen;
                                uint32_t itterations = 0;
                                switch_size_t len = 0;
 
                                switch_mutex_lock(profile->ireg_mutex);
                                
-                               //sofia_glue_actually_execute_sql(profile, "begin;\n", NULL);
-
                                while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) {
-                                       
-                                       if (!sql) {
-                                               sql = (char *) pop;
-                                       }
-                                       
+                                       if (!sql) sql = (char *) pop;
+
                                        newlen = strlen(sql) + 2;
                                        itterations++;
-                                       
-                                       if (len + newlen + 10 < sql_len) {
-                                               sprintf(sqlbuf + len, "%s;\n", sql);
-                                               len += newlen;
-                                               switch_safe_free(sql);
-                                       } else {
-                                               break;
+
+                                       if (len + newlen + 10 > sql_len) {
+                                               int new_mlen = len + newlen + 10 + 10240;
+                                               
+                                               if (new_mlen < SQLLEN) {
+                                                       sql_len = new_mlen;
+                                                       
+                                                       if (!(tmp = realloc(sqlbuf, sql_len))) {
+                                                               abort();
+                                                               break;
+                                                       }
+                                                       sqlbuf = tmp;
+                                               } else {
+                                                       goto skip;
+                                               }
                                        }
+
+                                       sprintf(sqlbuf + len, "%s;\n", sql);
+                                       len += newlen;
+                                       free(sql);
+                                       sql = NULL;
                                }
-                               
+
+                       skip:
+
                                //printf("TRANS:\n%s\n", sqlbuf);
                                sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
                                //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
index 0a457238709d7eeed9cf709dcaa78e75e15cfb64..2c33b341de86efcb8198eae7266ee56ca7b7877b 100644 (file)
@@ -1222,7 +1222,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
        }
 
        runtime.runlevel++;
-
+       runtime.sql_buffer_len = 1024 * 32;
+       runtime.max_sql_buffer_len = 1024 * 1024;
        runtime.dummy_cng_frame.data = runtime.dummy_data;
        runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data);
        runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data);
@@ -1440,6 +1441,37 @@ static void switch_load_core_config(const char *file)
                                        if (tmp > -1 && tmp < 11) {
                                                switch_core_session_ctl(SCSC_DEBUG_LEVEL, &tmp);
                                        }
+                               } else if (!strcasecmp(var, "sql-buffer-len")) {
+                                       int tmp = atoi(val);
+
+                                       if (end_of(val) == 'k') {
+                                               tmp *= 1024;
+                                       } else if (end_of(val) == 'm') {
+                                               tmp *= (1024 * 1024);
+                                       }
+                                       
+                                       if (tmp >= 32000 && tmp < 10500000) {
+                                               runtime.sql_buffer_len = tmp;
+                                       } else {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n");
+                                       }
+                               } else if (!strcasecmp(var, "max-sql-buffer-len")) {
+                                       int tmp = atoi(val);
+
+                                       if (end_of(val) == 'k') {
+                                               tmp *= 1024;
+                                       } else if (end_of(val) == 'm') {
+                                               tmp *= (1024 * 1024);
+                                       }
+
+                                       if (tmp < runtime.sql_buffer_len) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n");
+                                       } else if (tmp >= 32000 && tmp < 10500000) {
+                                               runtime.sql_buffer_len = tmp;
+                                       } else {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n");
+                                       }
+
                                } else if (!strcasecmp(var, "auto-create-schemas")) {
                                        if (switch_true(val)) {
                                                switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
index b1815b0f22a701390258ff3994ba30e4a4d912c4..dfb4134b9bac3f61817f8525024edf9e7dd7cf3a 100644 (file)
@@ -35,8 +35,6 @@
 #include <switch.h>
 #include "private/switch_core_pvt.h"
 
-#define SQLLEN 32768
-
 static struct {
        switch_cache_db_handle_t *event_db;
        switch_queue_t *sql_queue[2];
@@ -48,6 +46,8 @@ static struct {
        switch_mutex_t *io_mutex;
        switch_mutex_t *dbh_mutex;
        switch_hash_t *dbh_hash;
+       switch_thread_cond_t *cond;
+       switch_mutex_t *cond_mutex;
 } sql_manager;
 
 
@@ -539,7 +539,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
        switch (dbh->type) {
        default:
                {
-                       status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, SQLLEN, err);
+                       status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err);
                }
                break;
        }
@@ -850,19 +850,18 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
 
 static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
 {
-       void *pop = NULL;
+       void *pop;
        uint32_t itterations = 0;
-       uint8_t trans = 0, nothing_in_queue = 0;
-       uint32_t target = 100000;
-       switch_size_t len = 0, sql_len = SQLLEN;
-       char *sqlbuf = (char *) malloc(sql_len);
+       uint8_t trans = 0;
+       uint32_t target = 20000;
+       switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
+       char *tmp, *sqlbuf = (char *) malloc(sql_len);
        char *sql = NULL;
        switch_size_t newlen;
        int lc = 0;
        uint32_t loops = 0, sec = 0;
        uint32_t l1 = 1000;
        uint32_t sanity = 120;
-       int item_remained = 0;
 
        switch_assert(sqlbuf);
 
@@ -887,6 +886,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
        sql_manager.thread_running = 1;
 
        while (sql_manager.thread_running == 1) {
+
+               switch_mutex_lock(sql_manager.cond_mutex);
+
                if (++loops == l1) {
                        if (++sec == SQL_CACHE_TIMEOUT) {
                                sql_close(switch_epoch_time_now(NULL));
@@ -900,17 +902,11 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
                        continue;
                }
 
-               //printf("SIZE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
-
-               if (item_remained || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
+               if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
                        switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
 
-                       if (item_remained) {
-                               item_remained = 0;
-                       } else {
-                               sql = (char *) pop;
-                       }
-                       
+                       if (!sql) sql = (char *) pop;
+
                        if (sql) {
                                newlen = strlen(sql) + 2;
 
@@ -918,45 +914,59 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
                                        trans = 1;
                                }
 
-                               /* ignore abnormally large strings sql strings as potential buffer overflow */
-                               if (newlen < SQLLEN) {
-                                       itterations++;
+                               if (len + newlen > sql_len) {
+                                       int new_mlen = len + newlen + 10240;
                                        
-                                       if (len + newlen < sql_len) {
-                                               sprintf(sqlbuf + len, "%s;\n", sql);
-                                               len += newlen;
+                                       if (new_mlen < runtime.max_sql_buffer_len) {
+                                               sql_len = new_mlen;
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
+                                                                                 "REALLOC %ld %d %d\n", sql_len, switch_queue_size(sql_manager.sql_queue[0]), 
+                                                                                 switch_queue_size(sql_manager.sql_queue[1]));
+                                               if (!(tmp = realloc(sqlbuf, sql_len))) {
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
+                                                       abort();
+                                                       break;
+                                               }
+                                               sqlbuf = tmp;
                                        } else {
-                                               item_remained = 1;
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
+                                                                                 "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
+                                               goto skip;
                                        }
                                }
-                               
-                               if (!item_remained) {
-                                       free(sql);
-                               }
+
+                               itterations++;                          
+                               sprintf(sqlbuf + len, "%s;\n", sql);
+                               len += newlen;
+                               free(sql);
+                               sql = NULL;
                        } else {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
                                break;
                        }
-               } else {
-                       nothing_in_queue = 1;
                }
 
-
-               if ((item_remained || (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500)))) &&
-                       (sql_manager.event_db->native_handle.core_db_dbh)) {
+       skip:
+               
+               lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
+               
+               if (trans && itterations && (itterations > target || !lc)) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
+                                                         "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), itterations);
                        if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
                        }
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n");
                        itterations = 0;
                        trans = 0;
-                       nothing_in_queue = 0;
                        len = 0;
                        *sqlbuf = '\0';
                        lc = 0;
+                       switch_yield(400000);
                }
 
-               if (nothing_in_queue) {
-                       switch_cond_next();
+               if (!lc) {
+                       switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
                }
        }
 
@@ -1389,6 +1399,7 @@ static void core_event_handler(switch_event_t *event)
                                switch_queue_push(sql_manager.sql_queue[0], sql[i]);
                        }
                        sql[i] = NULL;
+                       switch_thread_cond_broadcast(sql_manager.cond);
                }
        }
 }
@@ -1511,6 +1522,9 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
 
        switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
        switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
+       switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
+
+       switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
 
        switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool);