]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fold the last round of sql manager changes back into the core and use it for the...
authorAnthony Minessale <anthm@freeswitch.org>
Sat, 27 Oct 2012 01:51:51 +0000 (20:51 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Sat, 27 Oct 2012 01:52:09 +0000 (20:52 -0500)
src/include/private/switch_core_pvt.h
src/include/switch_core.h
src/switch_core.c
src/switch_core_sqldb.c

index b7d5f758684eb6044f12c78662dd1694a0de66a0..40f51ba61a01318777832d03c6b98a69fe542de2 100644 (file)
@@ -256,8 +256,6 @@ struct switch_runtime {
        switch_profile_timer_t *profile_timer;
        double profile_time;
        double min_idle_time;
-       int sql_buffer_len;
-       int max_sql_buffer_len;
        switch_dbtype_t odbc_dbtype;
        char hostname[256];
        char *switchname;
index 69a9b07576c1f2283512c77cfccec67b7f324926..73a8fc8f17e1aeb79271121439c833db5ebefea1 100644 (file)
@@ -2205,8 +2205,9 @@ SWITCH_DECLARE(switch_status_t) switch_core_chat_send(const char *dest_proto, sw
 SWITCH_DECLARE(switch_status_t) switch_core_chat_deliver(const char *dest_proto, switch_event_t **message_event);
 
 SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_session_t *session, const char *cmds);
-SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void);
-SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void);
+SWITCH_DECLARE(void) switch_core_sqldb_pause(void);
+SWITCH_DECLARE(void) switch_core_sqldb_resume(void);
+
 
 ///\}
 
index 431ed8d3a192e2be5f6e637ad577610cb99b6723..73d96b2869cdfd11485a888d1f7f5b5289c1c8c3 100644 (file)
@@ -1470,11 +1470,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
        runtime.db_handle_timeout = 5000000;
        
        runtime.runlevel++;
-       runtime.sql_buffer_len = 1024 * 32;
-       runtime.max_sql_buffer_len = 1024 * 1024;
        runtime.dummy_cng_frame.data = runtime.dummy_data;
        runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data);
        runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data);
+       runtime.dbname = "core";
        switch_set_flag((&runtime.dummy_cng_frame), SFF_CNG);
        switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
        switch_set_flag((&runtime), SCF_CLEAR_SQL);
@@ -1754,37 +1753,6 @@ static void switch_load_core_config(const char *file)
                                        
                                } else if (!strcasecmp(var, "multiple-registrations")) {
                                        runtime.multiple_registrations = switch_true(val);
-                               } else if (!strcasecmp(var, "sql-buffer-len")) {
-                                       int tmp = atoi(val);
-
-                                       if (end_of(val) == 'k') {
-                                               tmp *= 1024;
-                                       } else if (end_of(val) == 'm') {
-                                               tmp *= (1024 * 1024);
-                                       }
-
-                                       if (tmp >= 32000 && tmp < 10500000) {
-                                               runtime.sql_buffer_len = tmp;
-                                       } else {
-                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n");
-                                       }
-                               } else if (!strcasecmp(var, "max-sql-buffer-len")) {
-                                       int tmp = atoi(val);
-
-                                       if (end_of(val) == 'k') {
-                                               tmp *= 1024;
-                                       } else if (end_of(val) == 'm') {
-                                               tmp *= (1024 * 1024);
-                                       }
-
-                                       if (tmp < runtime.sql_buffer_len) {
-                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n");
-                                       } else if (tmp >= 32000 && tmp < 10500000) {
-                                               runtime.max_sql_buffer_len = tmp;
-                                       } else {
-                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n");
-                                       }
-
                                } else if (!strcasecmp(var, "auto-create-schemas")) {
                                        if (switch_true(val)) {
                                                switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
@@ -2256,9 +2224,9 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void *
                break;
        case SCSC_SQL:
                if (oldintval) {
-                       switch_core_sqldb_start_thread();
+                       switch_core_sqldb_resume();
                } else {
-                       switch_core_sqldb_stop_thread();
+                       switch_core_sqldb_pause();
                }
                break;
        case SCSC_PAUSE_ALL:
index 45ea99ba290674dbbcfa2b9849ac3f0022955f4e..6e449b6f7b0bebc0fb51b172a7b4ebf520e507a5 100644 (file)
@@ -56,26 +56,25 @@ struct switch_cache_db_handle {
 };
 
 static struct {
-       switch_cache_db_handle_t *event_db;
-       switch_queue_t *sql_queue[4];
        switch_memory_pool_t *memory_pool;
-       switch_thread_t *thread;
        switch_thread_t *db_thread;
-       int thread_running;
        int db_thread_running;
        switch_bool_t manage;
        switch_mutex_t *io_mutex;
        switch_mutex_t *dbh_mutex;
        switch_mutex_t *ctl_mutex;
        switch_cache_db_handle_t *handle_pool;
-       switch_thread_cond_t *cond;
-       switch_mutex_t *cond_mutex;
        uint32_t total_handles;
        uint32_t total_used_handles;
        switch_cache_db_handle_t *dbh;
+       switch_sql_queue_manager_t *qm;
+       int paused;
 } sql_manager;
 
 
+static void switch_core_sqldb_start_thread(void);
+static void switch_core_sqldb_stop_thread(void);
+
 static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
 {
        switch_cache_db_handle_t *new_dbh = NULL;
@@ -575,19 +574,6 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
        return status;
 }
 
-static void wake_thread(int force)
-{
-       if (force) {
-               switch_thread_cond_signal(sql_manager.cond);
-               return;
-       }
-
-       if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) {
-               switch_thread_cond_signal(sql_manager.cond);
-               switch_mutex_unlock(sql_manager.cond_mutex);
-       }
-}
-
 /**
    OMFG you cruel bastards.  Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
 **/
@@ -1195,7 +1181,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
        while (sql_manager.db_thread_running == 1) {
                if (++sec == SQL_CACHE_TIMEOUT) {
                        sql_close(switch_epoch_time_now(NULL));         
-                       wake_thread(0);
                        sec = 0;
                }
 
@@ -1217,7 +1202,6 @@ struct switch_sql_queue_manager {
        const char *name;
        switch_cache_db_handle_t *event_db;
        switch_queue_t **sql_queue;
-       uint32_t *pre_written;
        uint32_t *written;
        uint32_t numq;
        char *dsn;
@@ -1281,6 +1265,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m
        }
 
        if (qm->thread) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name);
                switch_thread_join(&status, qm->thread);
                qm->thread = NULL;
                status = SWITCH_STATUS_SUCCESS;
@@ -1294,7 +1279,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
        switch_threadattr_t *thd_attr;
 
        if (!qm->thread_running) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
                switch_threadattr_create(&thd_attr, qm->pool);
                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
@@ -1306,24 +1291,40 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
 }
 
 
+static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
+{
+       void *pop = NULL;
+       
+       while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
+               if (pop) {
+                       if (dbh) {
+                               switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
+                       }
+                       free(pop);
+               }
+       }
+
+}
+
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
 {
        switch_sql_queue_manager_t *qm;
        switch_status_t status = SWITCH_STATUS_SUCCESS;
        switch_memory_pool_t *pool;
-       void *pop;
        uint32_t i;
 
        switch_assert(qmp);
        qm = *qmp;
        *qmp = NULL;
 
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name);
+
        switch_sql_queue_manager_stop(qm);
 
+
+
        for(i = 0; i < qm->numq; i++) {
-               while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
-                       switch_safe_free(pop);
-               }
+               do_flush(qm->sql_queue[i], NULL);
        }
 
        pool = qm->pool;
@@ -1335,14 +1336,17 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queu
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
 {
 
-       if (!qm->thread_running) {
-               return SWITCH_STATUS_FALSE;
+       if (sql_manager.paused) {
+               if (!dup) free((char *)sql);
+               qm_wake(qm);
+               return SWITCH_STATUS_SUCCESS;
        }
 
-       if (sql_manager.thread_running != 1) {
+       if (!qm->thread_running) {
+               if (!dup) free((char *)sql);
                return SWITCH_STATUS_FALSE;
        }
-       
+
        if (pos > qm->numq - 1) {
                pos = 0;
        }
@@ -1362,14 +1366,17 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
        int size, x = 0, sanity = 0;
        uint32_t written, want;
 
-       if (!qm->thread_running) {
-               return SWITCH_STATUS_FALSE;
+       if (sql_manager.paused) {
+               if (!dup) free((char *)sql);
+               qm_wake(qm);
+               return SWITCH_STATUS_SUCCESS;
        }
 
-       if (sql_manager.thread_running != 1) {
+       if (!qm->thread_running) {
+               if (!dup) free((char *)sql);
                return SWITCH_STATUS_FALSE;
        }
-       
+
        if (pos > qm->numq - 1) {
                pos = 0;
        }
@@ -1430,7 +1437,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
        
        qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
        qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
-       qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
 
        for (i = 0; i < qm->numq; i++) {
                switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
@@ -1448,479 +1454,213 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
        return SWITCH_STATUS_SUCCESS;
 
 }
-static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
-{
-       void *pop = NULL;
-       uint32_t iterations = 0;
-       uint8_t trans = 0;
-       uint32_t target = 20000;
-       switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
-       char *tmp, *sqlbuf = (char *) malloc(sql_len);
-       char *sql = NULL, *save_sql = NULL;
-       switch_size_t newlen;
-       int lc = 0, wrote = 0, do_sleep = 1;
-       uint32_t sanity = 120;
-       int auto_pause = 0;
-       switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
-       uint32_t i;
-
-       switch_assert(sqlbuf);
 
-       while (!qm->event_db) {
-               if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
-                       break;
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
-               switch_yield(500000);
-               sanity--;
-       }
+static uint32_t do_trans(switch_cache_db_handle_t *dbh, 
+                                                switch_queue_t *q,
+                                                switch_mutex_t *mutex,
+                                                const char *pre_trans_execute,
+                                                const char *post_trans_execute,
+                                                const char *inner_pre_trans_execute,
+                                                const char *inner_post_trans_execute)
+{
+       char *errmsg = NULL;
+       void *pop;
+       switch_status_t status;
+       uint32_t ttl = 0;
 
-       if (!qm->event_db) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
-               return NULL;
+       if (!switch_queue_size(q)) {
+               return 0;
        }
 
-       qm->thread_running = 1;
-
-       switch_mutex_lock(qm->cond_mutex);
-
-       switch (qm->event_db->type) {
-       case SCDB_TYPE_PGSQL:
-               break;
-       case SCDB_TYPE_ODBC:
-               break;
+       switch(dbh->type) {
        case SCDB_TYPE_CORE_DB:
                {
-                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
-                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
-                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
-                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
+                       switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
                }
                break;
-       }
-
-       while (qm->thread_running == 1) {
-               int proceed = !!save_sql;
-               int pindex = -1;
-
-               if (!proceed) {
-                       for (i = 0; i < qm->numq; i++) {
-                               switch_status_t status;
-
-                               switch_mutex_lock(qm->mutex);
-                               status = switch_queue_trypop(qm->sql_queue[i], &pop);
-                               switch_mutex_unlock(qm->mutex);
-
-                               if (status == SWITCH_STATUS_SUCCESS) {
-                                       if (sql_manager.thread_running != 1) {
-                                               if (pop) {
-                                                       switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
-                                                       free(pop);
-                                                       pop = NULL;
-                                               }
-                                       } else {
-                                               pindex = i;
-                                               proceed = 1;
-                                               break;
-                                       }
-                               }
-                       }
-               }
-
-               if (proceed) {
-
-                       if (save_sql) {
-                               sql = save_sql;
-                               save_sql = NULL;
-                       } else if ((sql = (char *) pop)) {
-                               pop = NULL;
-                       }
+       case SCDB_TYPE_ODBC:
+               {
+                       switch_odbc_status_t result;
                        
-                       if (sql) {
-                               newlen = strlen(sql) + 2;
-
-                               if (iterations == 0) {
-                                       trans = 1;
-                               }
-
-                               if (len + newlen + 1 > sql_len) {
-                                       int new_mlen = len + newlen + 10240;
-                                       
-                                       if (new_mlen < runtime.max_sql_buffer_len) {
-                                               sql_len = new_mlen;
-                                               if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                                                                                                               for (i = 0; i < qm->numq; i++) {
-                                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                                 "%s REALLOC QUEUE %ld %d %d\n", 
-                                                                                                 qm->name,
-                                                                                                 (long int)sql_len,
-                                                                                                 i,
-                                                                                                 switch_queue_size(qm->sql_queue[i]));
-                                                               
-                                                       }
-                                               }
-                                               if (!(tmp = realloc(sqlbuf, sql_len))) {
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
-                                                       abort();
-                                                       break;
-                                               }
-                                               sqlbuf = tmp;
-                                       } else {
-                                               if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                                                       for (i = 0; i < qm->numq; i++) {
-                                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                                 "%s SAVE QUEUE %d %d\n", 
-                                                                                                 qm->name,
-                                                                                                 i,
-                                                                                                 switch_queue_size(qm->sql_queue[i]));
-                                                               
-                                                       }
-                                               }
-                                               save_sql = sql;
-                                               sql = NULL;
-                                               lc = 0;
-                                               goto skip;
-                                       }
-                               }
-                               
-                               switch_mutex_lock(qm->mutex);
-                               qm->pre_written[pindex]++;
-                               switch_mutex_unlock(qm->mutex);
-
-                               iterations++;                           
-                               sprintf(sqlbuf + len, "%s;\n", sql);
-                               len += newlen;
-                               free(sql);
-                               sql = NULL;
-                       } else {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
-                               break;
+                       if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
+                               char tmp[100];
+                               switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
+                               errmsg = strdup(tmp);
                        }
                }
-
-               lc = qm_ttl(qm);
-               
-
-               if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
-                       if (!auto_pause) {
-                               auto_pause = 1;
-                               switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
-                               auto_pause = 1;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
-                       }
-               } else {
-                       if (auto_pause && lc < 1000) {
-                               auto_pause = 0;
-                               switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
-                               auto_pause = 0;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
+               break;
+       case SCDB_TYPE_PGSQL:
+               {
+                       switch_pgsql_status_t result;
+                       
+                       if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
+                               char tmp[100];
+                               switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
+                               errmsg = strdup(tmp);
                        }
                }
-       
-       skip:
-               
-               wrote = 0;
-
-               if (trans && iterations && (iterations > target || !lc)) {
+               break;
+       }
 
-                       if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                               char line[128] = "";
-                               int l;
-                               
-                               switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
-                               
-                               for (i = 0; i < qm->numq; i++) {
-                                       l = strlen(line);
-                                       switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
-                               }
-                               
-                               l = strlen(line);
-                               switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
+       if (errmsg) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s]\n", errmsg);
+               free(errmsg);
+               goto end;
+       }
 
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
 
-                       }
-                       if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
-                                                                                                                         qm->pre_trans_execute,
-                                                                                                                         qm->post_trans_execute,
-                                                                                                                         qm->inner_pre_trans_execute,
-                                                                                                                         qm->inner_post_trans_execute
-                                                                                                                         ) != SWITCH_STATUS_SUCCESS) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
-                       }
-                       if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { 
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
-                       }
+       for(;;) {
+               if (mutex) switch_mutex_lock(mutex);
+               status = switch_queue_trypop(q, &pop);
+               if (mutex) switch_mutex_unlock(mutex);
+               
+               if (status != SWITCH_STATUS_SUCCESS || !pop) break;
 
-                       iterations = 0;
-                       trans = 0;
-                       len = 0;
-                       *sqlbuf = '\0';
-                       lc = 0;
-                       if (do_sleep) {
-                               switch_yield(200000);
-                       } else {
-                               switch_yield(1000);
-                       }
-                       wrote = 1;
+               if ((status = switch_cache_db_execute_sql(dbh, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
+                       ttl++;
                }
+               free(pop);
 
-               lc = qm_ttl(qm);
-               
-               switch_mutex_lock(qm->mutex);
-               for (i = 0; i < qm->numq; i++) {
-                       qm->written[i] += qm->pre_written[i];
-                       qm->pre_written[i] = 0;
-               }
-               switch_mutex_unlock(qm->mutex);
-               
-               if (!lc) {
-                       switch_thread_cond_wait(qm->cond, qm->cond_mutex);
-               } else if (wrote) {
-                       if (lc > 2000) {
-                               do_sleep = 0;
-                       } else {
-                               do_sleep = 1;
-                       }
-               }
+               if (status != SWITCH_STATUS_SUCCESS) break;
        }
 
-       switch_mutex_unlock(qm->cond_mutex);
+ end:
 
-       for(i = 0; i < qm->numq; i++) {
-               while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
-                       if (pop) {
-                               switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
-                               free(pop);
-                       }
+       switch(dbh->type) {
+       case SCDB_TYPE_CORE_DB:
+               {
+                       switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
                }
+               break;
+       case SCDB_TYPE_ODBC:
+               {
+                       switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
+                       switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
+               }
+               break;
+       case SCDB_TYPE_PGSQL:
+               {
+                       switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
+                       switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
+                       switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
+               }
+               break;
        }
 
-       free(sqlbuf);
 
-       qm->thread_running = 0;
-
-       switch_cache_db_release_db_handle(&qm->event_db);
 
-       return NULL;
+       return ttl;
 }
 
-static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
+static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
 {
-       void *pop = NULL;
-       uint32_t iterations = 0;
-       uint8_t trans = 0;
-       uint32_t target = 20000;
-       switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
-       char *tmp, *sqlbuf = (char *) malloc(sql_len);
-       char *sql = NULL, *save_sql = NULL;
-       switch_size_t newlen;
-       int lc = 0, wrote = 0, do_sleep = 1;
-       uint32_t sanity = 120;
-       int auto_pause = 0;
 
-       switch_assert(sqlbuf);
+       uint32_t sanity = 120;
+       switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
+       uint32_t i;
 
-       while (!sql_manager.event_db) {
-               if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
+       while (!qm->event_db) {
+               if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
                        break;
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
                switch_yield(500000);
                sanity--;
        }
 
-       if (!sql_manager.event_db) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n");
+       if (!qm->event_db) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
                return NULL;
        }
 
-       sql_manager.thread_running = 1;
+       qm->thread_running = 1;
 
-       switch_mutex_lock(sql_manager.cond_mutex);
+       switch_mutex_lock(qm->cond_mutex);
 
-       switch (sql_manager.event_db->type) {
+       switch (qm->event_db->type) {
        case SCDB_TYPE_PGSQL:
                break;
        case SCDB_TYPE_ODBC:
                break;
        case SCDB_TYPE_CORE_DB:
                {
-                       switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL);
-                       switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL);
-                       switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL);
-                       switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL);
+                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
+                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
+                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
+                       switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
                }
                break;
        }
 
-       while (sql_manager.thread_running == 1) {
-               if (save_sql || 
-                       switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
-                       switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS ||
-                       switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS ||
-                       switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS 
-                       ) {
-
-                       if (save_sql) {
-                               sql = save_sql;
-                               save_sql = NULL;
-                       } else if ((sql = (char *) pop)) {
-                               pop = NULL;
-                       }
-                       
-                       if (sql) {
-                               newlen = strlen(sql) + 2;
-
-                               if (iterations == 0) {
-                                       trans = 1;
-                               }
-
-                               if (len + newlen + 1 > sql_len) {
-                                       int new_mlen = len + newlen + 10240;
-                                       
-                                       if (new_mlen < runtime.max_sql_buffer_len) {
-                                               sql_len = new_mlen;
-                                               if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                         "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]), 
-                                                                                         switch_queue_size(sql_manager.sql_queue[1]));
-                                               }
-                                               if (!(tmp = realloc(sqlbuf, sql_len))) {
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
-                                                       abort();
-                                                       break;
-                                               }
-                                               sqlbuf = tmp;
-                                       } else {
-                                               if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                                         "SAVE %d %d %d %d\n", 
-                                                                                         switch_queue_size(sql_manager.sql_queue[0]), 
-                                                                                         switch_queue_size(sql_manager.sql_queue[1]),
-                                                                                         switch_queue_size(sql_manager.sql_queue[2]), 
-                                                                                         switch_queue_size(sql_manager.sql_queue[3])
-                                                                                         );
-                                               }
-                                               save_sql = sql;
-                                               sql = NULL;
-                                               lc = 0;
-                                               goto skip;
-                                       }
-                               }
-
-                               iterations++;                           
-                               sprintf(sqlbuf + len, "%s;\n", sql);
-                               len += newlen;
-                               free(sql);
-                               sql = NULL;
-                       } else {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
-                               break;
-                       }
-               }
-
-               lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) + 
-                       switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
 
+       while (qm->thread_running == 1) {
+               int lc;
+               int i;
+               uint32_t iterations = 0;
 
-               if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
-                       if (!auto_pause) {
-                               auto_pause = 1;
-                               switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
-                               auto_pause = 1;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
-                       }
-               } else {
-                       if (auto_pause && lc < 1000) {
-                               auto_pause = 0;
-                               switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
-                               auto_pause = 0;
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
+               if (sql_manager.paused) {
+                       for (i = 0; i < qm->numq; i++) {
+                               do_flush(qm->sql_queue[i], NULL);
                        }
+                       goto check;
                }
-
-       skip:
                
-               wrote = 0;
-
-               if (trans && iterations && (iterations > target || !lc)) {
-                       if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
-                                                                 "RUN %d %d %d %d %d\n", 
-                                                                 switch_queue_size(sql_manager.sql_queue[0]), 
-                                                                 switch_queue_size(sql_manager.sql_queue[1]), 
-                                                                 switch_queue_size(sql_manager.sql_queue[2]), 
-                                                                 switch_queue_size(sql_manager.sql_queue[3]), 
-                                                                 iterations);
-                       }
-                       if (switch_cache_db_persistant_execute_trans_full(sql_manager.event_db, sqlbuf, 1,
-                                                                                                                         runtime.core_db_pre_trans_execute,
-                                                                                                                         runtime.core_db_post_trans_execute,
-                                                                                                                         runtime.core_db_inner_pre_trans_execute,
-                                                                                                                         runtime.core_db_inner_post_trans_execute
-                                                                                                                         ) != SWITCH_STATUS_SUCCESS) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
-                       }
-                       if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { 
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
-                       }
-
+               for (i = 0; i < qm->numq; i++) {
+                       uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], NULL,
+                                                                               qm->pre_trans_execute,
+                                                                               qm->post_trans_execute,
+                                                                               qm->inner_pre_trans_execute,
+                                                                               qm->inner_post_trans_execute);
 
-                       iterations = 0;
-                       trans = 0;
-                       len = 0;
-                       *sqlbuf = '\0';
-                       lc = 0;
-                       if (do_sleep) {
-                               switch_yield(200000);
-                       } else {
-                               switch_yield(1000);
-                       }
-                       wrote = 1;
+                       iterations += written;
+                       
+                       switch_mutex_lock(qm->mutex);
+                       qm->written[i] += written;
+                       switch_mutex_unlock(qm->mutex);
                }
-
-               lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
-                       switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
                
-               if (!lc) {
-                       switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
-               } else if (wrote) {
-                       if (lc > 2000) {
-                               do_sleep = 0;
-                       } else {
-                               do_sleep = 1;
+               if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
+                       char line[128] = "";
+                       int l;
+                       
+                       switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name);
+                       
+                       for (i = 0; i < qm->numq; i++) {
+                               l = strlen(line);
+                               switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|");
                        }
+                       
+                       l = strlen(line);
+                       switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations);
+                       
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
+                       
                }
-               
-               
-       }
 
-       switch_mutex_unlock(sql_manager.cond_mutex);
+       check:
 
-       while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
-               free(pop);
-       }
+               lc = qm_ttl(qm);                
 
-       while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
-               free(pop);
+               if (!lc) {
+                       switch_thread_cond_wait(qm->cond, qm->cond_mutex);
+               } else if (lc < 2000) {
+                       switch_yield(200000);
+               }
        }
 
-       while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) {
-               free(pop);
-       }
+       switch_mutex_unlock(qm->cond_mutex);
 
-       while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) {
-               free(pop);
+       for(i = 0; i < qm->numq; i++) {
+               do_flush(qm->sql_queue[i], qm->event_db);
        }
 
-       free(sqlbuf);
-
-       sql_manager.thread_running = 0;
-
-       switch_cache_db_release_db_handle(&sql_manager.event_db);
+       qm->thread_running = 0;
 
+       switch_cache_db_release_db_handle(&qm->event_db);
+       
        return NULL;
 }
 
+
 static char *parse_presence_data_cols(switch_event_t *event)
 {
        char *cols[128] = { 0 };
@@ -2388,12 +2128,11 @@ static void core_event_handler(switch_event_t *event)
 
                for (i = 0; i < sql_idx; i++) {
                        if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
-                               switch_queue_push(sql_manager.sql_queue[1], sql[i]);
+                               switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
                        } else {
-                               switch_queue_push(sql_manager.sql_queue[0], sql[i]);
+                               switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
                        }
                        sql[i] = NULL;
-                       wake_thread(0);
                }
        }
 }
@@ -2770,6 +2509,11 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
        switch_cache_db_handle_t *dbh;
        int r = 0;
 
+       if (!sql_manager.manage) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n");
+               return 0;
+       }
+
        if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
                return 0;
@@ -2839,16 +2583,21 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
 
 SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
 {
-       return sql_manager.event_db->type;
+       return sql_manager.qm ?  sql_manager.qm->event_db->type : SCDB_TYPE_CORE_DB;
 }
 
 SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
 {
+       if (!sql_manager.manage) {
+               return;
+       }
+
        if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
                return;
        }
 
-       switch_queue_push(sql_manager.sql_queue[3], strdup(sql));
+
+       switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE);
 }
 
 SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
@@ -2856,6 +2605,10 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
        char *sql = NULL;
        switch_channel_t *channel = switch_core_session_get_channel(session);
 
+       if (!sql_manager.manage) {
+               return;
+       }
+
        if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
                return;
        }
@@ -2878,7 +2631,7 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
                                                                 switch_core_get_uuid(), switch_core_session_get_uuid(session));
                }
 
-               switch_queue_push(sql_manager.sql_queue[3], sql);
+               switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE);
                
                switch_channel_clear_flag(channel, CF_TRACKED);
        }
@@ -2894,6 +2647,9 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
        const char *profile_name;
        const char *technology;
 
+       if (!sql_manager.manage) {
+               return;
+       }
 
        if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
                return;
@@ -2921,7 +2677,7 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
                                                                 switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
                }
 
-               switch_queue_push(sql_manager.sql_queue[2], sql);
+               switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE);
 
                free(xml_cdr_text);
                switch_channel_set_flag(channel, CF_TRACKED);
@@ -2950,7 +2706,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
                                                         user, realm, switch_core_get_switchname());
        }
 
-       switch_queue_push(sql_manager.sql_queue[0], sql);
+       switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
 
        if ( !zstr(metadata) ) {
                sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
@@ -2982,7 +2738,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
        }
 
        
-       switch_queue_push(sql_manager.sql_queue[0], sql);
+       switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
        
        return SWITCH_STATUS_SUCCESS;
 }
@@ -3002,7 +2758,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, c
                sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
        }
 
-       switch_queue_push(sql_manager.sql_queue[0], sql);
+       switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
+
 
        return SWITCH_STATUS_SUCCESS;
 }
@@ -3025,7 +2782,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
                sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
        }
 
-       switch_queue_push(sql_manager.sql_queue[0], sql);
+       switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
 
        return SWITCH_STATUS_SUCCESS;
 
@@ -3034,20 +2791,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
 switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
 {
        switch_threadattr_t *thd_attr;
-       uint32_t sanity = 400;
 
        sql_manager.memory_pool = pool;
        sql_manager.manage = manage;
 
        switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
        switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
-       switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
        switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
 
-       switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
-
-
-
        if (!sql_manager.manage) goto skip;
 
  top:  
@@ -3117,6 +2868,15 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
        switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
 
 
+       switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
+       switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL);
+       switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL);
+       switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL);
+       switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
+
+
+
+
        switch (sql_manager.dbh->type) {
        case SCDB_TYPE_PGSQL:
        case SCDB_TYPE_ODBC:
@@ -3227,126 +2987,76 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
                switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
 #endif 
 
-               switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
-               switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
-               switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
-               switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
-
                switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
                switch_core_sqldb_start_thread();
                switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
 
-               while (sql_manager.manage && !sql_manager.thread_running && --sanity) {
-                       switch_yield(10000);
-               }
        }
        return SWITCH_STATUS_SUCCESS;
 }
 
-
-SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
+SWITCH_DECLARE(void) switch_core_sqldb_pause(void)
 {
-       switch_mutex_lock(sql_manager.ctl_mutex);
-       if (sql_manager.thread && sql_manager.thread_running) {
-               switch_status_t st;
-
-               if (sql_manager.manage) {
-                       switch_queue_push(sql_manager.sql_queue[0], NULL);
-                       switch_queue_push(sql_manager.sql_queue[1], NULL);
-                       switch_queue_push(sql_manager.sql_queue[2], NULL);
-                       switch_queue_push(sql_manager.sql_queue[3], NULL);
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
-                       wake_thread(0);
-                       sql_manager.thread_running = -1;
-                       switch_thread_join(&st, sql_manager.thread);
-                       sql_manager.thread = NULL;
-                       switch_cache_db_release_db_handle(&sql_manager.dbh);
-                       sql_manager.dbh = NULL;
-               } else {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
-               }
-       } else {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n");
+       if (sql_manager.paused) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n");
        }
-       switch_mutex_unlock(sql_manager.ctl_mutex);
+       sql_manager.paused = 1;
 }
 
-SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
+SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
 {
-       switch_cache_db_handle_t *dbh;
+       if (!sql_manager.paused) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n");
+       }
+       sql_manager.paused = 0;
+}
 
-       switch_mutex_lock(sql_manager.ctl_mutex);
 
-       if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
-                       
-               if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
-                       int arg = 1;
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
-                       switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg);
+static void switch_core_sqldb_stop_thread(void)
+{
+       switch_mutex_lock(sql_manager.ctl_mutex);
+       if (sql_manager.manage) {
+               if (sql_manager.qm) {
+                       switch_sql_queue_manager_destroy(&sql_manager.qm);
                }
-                       
-               
        } else {
-               switch_cache_db_test_reactive(dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
-               switch_cache_db_execute_sql(dbh, "create index recovery1 on recovery(technology)", NULL);
-               switch_cache_db_execute_sql(dbh, "create index recovery2 on recovery(profile_name)", NULL);
-               switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(uuid)", NULL);
-               switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
-               switch_cache_db_release_db_handle(&dbh);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
        }
+       
+       switch_mutex_unlock(sql_manager.ctl_mutex);
+}
 
+static void switch_core_sqldb_start_thread(void)
+{
 
+       switch_mutex_lock(sql_manager.ctl_mutex);
        if (sql_manager.manage) {
+               if (!sql_manager.qm) {
+                       char *dbname = runtime.odbc_dsn;
 
-       top:
-
-               if (!sql_manager.dbh) {
-                       /* Activate SQL database */
-                       if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
-                               
-                               if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
-                                       goto end;
-                               }
-                               
-                               if (runtime.odbc_dsn) {
-                                       runtime.odbc_dsn = NULL;
-                                       runtime.odbc_dbtype = DBTYPE_DEFAULT;
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
-                                       sql_manager.dbh = NULL;
-                                       goto top;
+                       if (zstr(dbname)) {
+                               dbname = runtime.dbname;
+                               if (zstr(dbname)) {
+                                       dbname = "core";
                                }
-
-
-                               switch_clear_flag((&runtime), SCF_USE_SQL);
-                               goto end;
                        }
 
-                       switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL);
-                       switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL);
-               }
-
+                       switch_sql_queue_manager_init_name("CORE",
+                                                                                          &sql_manager.qm,
+                                                                                          4,
+                                                                                          dbname,
+                                                                                          runtime.core_db_pre_trans_execute,
+                                                                                          runtime.core_db_post_trans_execute,
+                                                                                          runtime.core_db_inner_pre_trans_execute,
+                                                                                          runtime.core_db_inner_post_trans_execute);
 
-               if (!sql_manager.thread) {
-                       switch_threadattr_t *thd_attr;
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
-                       switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
-                       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-                       switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
-                       switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
-               } else {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n");
                }
+               switch_sql_queue_manager_start(sql_manager.qm);
        } else {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
        }
-
- end:
-
        switch_mutex_unlock(sql_manager.ctl_mutex);
 }