};
static struct {
- switch_cache_db_handle_t *event_db;
- switch_queue_t *sql_queue[4];
switch_memory_pool_t *memory_pool;
- switch_thread_t *thread;
switch_thread_t *db_thread;
- int thread_running;
int db_thread_running;
switch_bool_t manage;
switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex;
switch_mutex_t *ctl_mutex;
switch_cache_db_handle_t *handle_pool;
- switch_thread_cond_t *cond;
- switch_mutex_t *cond_mutex;
uint32_t total_handles;
uint32_t total_used_handles;
switch_cache_db_handle_t *dbh;
+ switch_sql_queue_manager_t *qm;
+ int paused;
} sql_manager;
+static void switch_core_sqldb_start_thread(void);
+static void switch_core_sqldb_stop_thread(void);
+
static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
{
switch_cache_db_handle_t *new_dbh = NULL;
return status;
}
-static void wake_thread(int force)
-{
- if (force) {
- switch_thread_cond_signal(sql_manager.cond);
- return;
- }
-
- if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) {
- switch_thread_cond_signal(sql_manager.cond);
- switch_mutex_unlock(sql_manager.cond_mutex);
- }
-}
-
/**
OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
**/
while (sql_manager.db_thread_running == 1) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
- wake_thread(0);
sec = 0;
}
const char *name;
switch_cache_db_handle_t *event_db;
switch_queue_t **sql_queue;
- uint32_t *pre_written;
uint32_t *written;
uint32_t numq;
char *dsn;
}
if (qm->thread) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name);
switch_thread_join(&status, qm->thread);
qm->thread = NULL;
status = SWITCH_STATUS_SUCCESS;
switch_threadattr_t *thd_attr;
if (!qm->thread_running) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
switch_threadattr_create(&thd_attr, qm->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
}
+static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
+{
+ void *pop = NULL;
+
+ while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
+ if (pop) {
+ if (dbh) {
+ switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
+ }
+ free(pop);
+ }
+ }
+
+}
+
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
{
switch_sql_queue_manager_t *qm;
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_memory_pool_t *pool;
- void *pop;
uint32_t i;
switch_assert(qmp);
qm = *qmp;
*qmp = NULL;
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name);
+
switch_sql_queue_manager_stop(qm);
+
+
for(i = 0; i < qm->numq; i++) {
- while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
- switch_safe_free(pop);
- }
+ do_flush(qm->sql_queue[i], NULL);
}
pool = qm->pool;
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
{
- if (!qm->thread_running) {
- return SWITCH_STATUS_FALSE;
+ if (sql_manager.paused) {
+ if (!dup) free((char *)sql);
+ qm_wake(qm);
+ return SWITCH_STATUS_SUCCESS;
}
- if (sql_manager.thread_running != 1) {
+ if (!qm->thread_running) {
+ if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
-
+
if (pos > qm->numq - 1) {
pos = 0;
}
int size, x = 0, sanity = 0;
uint32_t written, want;
- if (!qm->thread_running) {
- return SWITCH_STATUS_FALSE;
+ if (sql_manager.paused) {
+ if (!dup) free((char *)sql);
+ qm_wake(qm);
+ return SWITCH_STATUS_SUCCESS;
}
- if (sql_manager.thread_running != 1) {
+ if (!qm->thread_running) {
+ if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
-
+
if (pos > qm->numq - 1) {
pos = 0;
}
qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
- qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
for (i = 0; i < qm->numq; i++) {
switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
return SWITCH_STATUS_SUCCESS;
}
-static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
-{
- void *pop = NULL;
- uint32_t iterations = 0;
- uint8_t trans = 0;
- uint32_t target = 20000;
- switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
- char *tmp, *sqlbuf = (char *) malloc(sql_len);
- char *sql = NULL, *save_sql = NULL;
- switch_size_t newlen;
- int lc = 0, wrote = 0, do_sleep = 1;
- uint32_t sanity = 120;
- int auto_pause = 0;
- switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
- uint32_t i;
-
- switch_assert(sqlbuf);
- while (!qm->event_db) {
- if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
- break;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
- switch_yield(500000);
- sanity--;
- }
+static uint32_t do_trans(switch_cache_db_handle_t *dbh,
+ switch_queue_t *q,
+ switch_mutex_t *mutex,
+ const char *pre_trans_execute,
+ const char *post_trans_execute,
+ const char *inner_pre_trans_execute,
+ const char *inner_post_trans_execute)
+{
+ char *errmsg = NULL;
+ void *pop;
+ switch_status_t status;
+ uint32_t ttl = 0;
- if (!qm->event_db) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
- return NULL;
+ if (!switch_queue_size(q)) {
+ return 0;
}
- qm->thread_running = 1;
-
- switch_mutex_lock(qm->cond_mutex);
-
- switch (qm->event_db->type) {
- case SCDB_TYPE_PGSQL:
- break;
- case SCDB_TYPE_ODBC:
- break;
+ switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
- switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
- switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
- switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
- switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
+ switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
}
break;
- }
-
- while (qm->thread_running == 1) {
- int proceed = !!save_sql;
- int pindex = -1;
-
- if (!proceed) {
- for (i = 0; i < qm->numq; i++) {
- switch_status_t status;
-
- switch_mutex_lock(qm->mutex);
- status = switch_queue_trypop(qm->sql_queue[i], &pop);
- switch_mutex_unlock(qm->mutex);
-
- if (status == SWITCH_STATUS_SUCCESS) {
- if (sql_manager.thread_running != 1) {
- if (pop) {
- switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
- free(pop);
- pop = NULL;
- }
- } else {
- pindex = i;
- proceed = 1;
- break;
- }
- }
- }
- }
-
- if (proceed) {
-
- if (save_sql) {
- sql = save_sql;
- save_sql = NULL;
- } else if ((sql = (char *) pop)) {
- pop = NULL;
- }
+ case SCDB_TYPE_ODBC:
+ {
+ switch_odbc_status_t result;
- if (sql) {
- newlen = strlen(sql) + 2;
-
- if (iterations == 0) {
- trans = 1;
- }
-
- if (len + newlen + 1 > sql_len) {
- int new_mlen = len + newlen + 10240;
-
- if (new_mlen < runtime.max_sql_buffer_len) {
- sql_len = new_mlen;
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- for (i = 0; i < qm->numq; i++) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "%s REALLOC QUEUE %ld %d %d\n",
- qm->name,
- (long int)sql_len,
- i,
- switch_queue_size(qm->sql_queue[i]));
-
- }
- }
- if (!(tmp = realloc(sqlbuf, sql_len))) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
- abort();
- break;
- }
- sqlbuf = tmp;
- } else {
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- for (i = 0; i < qm->numq; i++) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "%s SAVE QUEUE %d %d\n",
- qm->name,
- i,
- switch_queue_size(qm->sql_queue[i]));
-
- }
- }
- save_sql = sql;
- sql = NULL;
- lc = 0;
- goto skip;
- }
- }
-
- switch_mutex_lock(qm->mutex);
- qm->pre_written[pindex]++;
- switch_mutex_unlock(qm->mutex);
-
- iterations++;
- sprintf(sqlbuf + len, "%s;\n", sql);
- len += newlen;
- free(sql);
- sql = NULL;
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
- break;
+ if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
+ char tmp[100];
+ switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
+ errmsg = strdup(tmp);
}
}
-
- lc = qm_ttl(qm);
-
-
- if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
- if (!auto_pause) {
- auto_pause = 1;
- switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
- auto_pause = 1;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
- }
- } else {
- if (auto_pause && lc < 1000) {
- auto_pause = 0;
- switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
- auto_pause = 0;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
+ break;
+ case SCDB_TYPE_PGSQL:
+ {
+ switch_pgsql_status_t result;
+
+ if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
+ char tmp[100];
+ switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
+ errmsg = strdup(tmp);
}
}
-
- skip:
-
- wrote = 0;
-
- if (trans && iterations && (iterations > target || !lc)) {
+ break;
+ }
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- char line[128] = "";
- int l;
-
- switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
-
- for (i = 0; i < qm->numq; i++) {
- l = strlen(line);
- switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
- }
-
- l = strlen(line);
- switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
+ if (errmsg) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s]\n", errmsg);
+ free(errmsg);
+ goto end;
+ }
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
- }
- if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
- qm->pre_trans_execute,
- qm->post_trans_execute,
- qm->inner_pre_trans_execute,
- qm->inner_post_trans_execute
- ) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
- }
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
- }
+ for(;;) {
+ if (mutex) switch_mutex_lock(mutex);
+ status = switch_queue_trypop(q, &pop);
+ if (mutex) switch_mutex_unlock(mutex);
+
+ if (status != SWITCH_STATUS_SUCCESS || !pop) break;
- iterations = 0;
- trans = 0;
- len = 0;
- *sqlbuf = '\0';
- lc = 0;
- if (do_sleep) {
- switch_yield(200000);
- } else {
- switch_yield(1000);
- }
- wrote = 1;
+ if ((status = switch_cache_db_execute_sql(dbh, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
+ ttl++;
}
+ free(pop);
- lc = qm_ttl(qm);
-
- switch_mutex_lock(qm->mutex);
- for (i = 0; i < qm->numq; i++) {
- qm->written[i] += qm->pre_written[i];
- qm->pre_written[i] = 0;
- }
- switch_mutex_unlock(qm->mutex);
-
- if (!lc) {
- switch_thread_cond_wait(qm->cond, qm->cond_mutex);
- } else if (wrote) {
- if (lc > 2000) {
- do_sleep = 0;
- } else {
- do_sleep = 1;
- }
- }
+ if (status != SWITCH_STATUS_SUCCESS) break;
}
- switch_mutex_unlock(qm->cond_mutex);
+ end:
- for(i = 0; i < qm->numq; i++) {
- while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
- if (pop) {
- switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
- free(pop);
- }
+ switch(dbh->type) {
+ case SCDB_TYPE_CORE_DB:
+ {
+ switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
}
+ break;
+ case SCDB_TYPE_ODBC:
+ {
+ switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
+ switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
+ }
+ break;
+ case SCDB_TYPE_PGSQL:
+ {
+ switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
+ switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
+ switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
+ }
+ break;
}
- free(sqlbuf);
- qm->thread_running = 0;
-
- switch_cache_db_release_db_handle(&qm->event_db);
- return NULL;
+ return ttl;
}
-static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
+static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
{
- void *pop = NULL;
- uint32_t iterations = 0;
- uint8_t trans = 0;
- uint32_t target = 20000;
- switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
- char *tmp, *sqlbuf = (char *) malloc(sql_len);
- char *sql = NULL, *save_sql = NULL;
- switch_size_t newlen;
- int lc = 0, wrote = 0, do_sleep = 1;
- uint32_t sanity = 120;
- int auto_pause = 0;
- switch_assert(sqlbuf);
+ uint32_t sanity = 120;
+ switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
+ uint32_t i;
- while (!sql_manager.event_db) {
- if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
+ while (!qm->event_db) {
+ if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
break;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
switch_yield(500000);
sanity--;
}
- if (!sql_manager.event_db) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n");
+ if (!qm->event_db) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
return NULL;
}
- sql_manager.thread_running = 1;
+ qm->thread_running = 1;
- switch_mutex_lock(sql_manager.cond_mutex);
+ switch_mutex_lock(qm->cond_mutex);
- switch (sql_manager.event_db->type) {
+ switch (qm->event_db->type) {
case SCDB_TYPE_PGSQL:
break;
case SCDB_TYPE_ODBC:
break;
case SCDB_TYPE_CORE_DB:
{
- switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL);
- switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL);
- switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL);
- switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL);
+ switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
+ switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
+ switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
+ switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
}
break;
}
- while (sql_manager.thread_running == 1) {
- if (save_sql ||
- switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
- switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS ||
- switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS ||
- switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS
- ) {
-
- if (save_sql) {
- sql = save_sql;
- save_sql = NULL;
- } else if ((sql = (char *) pop)) {
- pop = NULL;
- }
-
- if (sql) {
- newlen = strlen(sql) + 2;
-
- if (iterations == 0) {
- trans = 1;
- }
-
- if (len + newlen + 1 > sql_len) {
- int new_mlen = len + newlen + 10240;
-
- if (new_mlen < runtime.max_sql_buffer_len) {
- sql_len = new_mlen;
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]),
- switch_queue_size(sql_manager.sql_queue[1]));
- }
- if (!(tmp = realloc(sqlbuf, sql_len))) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
- abort();
- break;
- }
- sqlbuf = tmp;
- } else {
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "SAVE %d %d %d %d\n",
- switch_queue_size(sql_manager.sql_queue[0]),
- switch_queue_size(sql_manager.sql_queue[1]),
- switch_queue_size(sql_manager.sql_queue[2]),
- switch_queue_size(sql_manager.sql_queue[3])
- );
- }
- save_sql = sql;
- sql = NULL;
- lc = 0;
- goto skip;
- }
- }
-
- iterations++;
- sprintf(sqlbuf + len, "%s;\n", sql);
- len += newlen;
- free(sql);
- sql = NULL;
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
- break;
- }
- }
-
- lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
- switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
+ while (qm->thread_running == 1) {
+ int lc;
+ int i;
+ uint32_t iterations = 0;
- if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
- if (!auto_pause) {
- auto_pause = 1;
- switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
- auto_pause = 1;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
- }
- } else {
- if (auto_pause && lc < 1000) {
- auto_pause = 0;
- switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
- auto_pause = 0;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
+ if (sql_manager.paused) {
+ for (i = 0; i < qm->numq; i++) {
+ do_flush(qm->sql_queue[i], NULL);
}
+ goto check;
}
-
- skip:
- wrote = 0;
-
- if (trans && iterations && (iterations > target || !lc)) {
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "RUN %d %d %d %d %d\n",
- switch_queue_size(sql_manager.sql_queue[0]),
- switch_queue_size(sql_manager.sql_queue[1]),
- switch_queue_size(sql_manager.sql_queue[2]),
- switch_queue_size(sql_manager.sql_queue[3]),
- iterations);
- }
- if (switch_cache_db_persistant_execute_trans_full(sql_manager.event_db, sqlbuf, 1,
- runtime.core_db_pre_trans_execute,
- runtime.core_db_post_trans_execute,
- runtime.core_db_inner_pre_trans_execute,
- runtime.core_db_inner_post_trans_execute
- ) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
- }
- if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
- }
-
+ for (i = 0; i < qm->numq; i++) {
+ uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], NULL,
+ qm->pre_trans_execute,
+ qm->post_trans_execute,
+ qm->inner_pre_trans_execute,
+ qm->inner_post_trans_execute);
- iterations = 0;
- trans = 0;
- len = 0;
- *sqlbuf = '\0';
- lc = 0;
- if (do_sleep) {
- switch_yield(200000);
- } else {
- switch_yield(1000);
- }
- wrote = 1;
+ iterations += written;
+
+ switch_mutex_lock(qm->mutex);
+ qm->written[i] += written;
+ switch_mutex_unlock(qm->mutex);
}
-
- lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
- switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
- if (!lc) {
- switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
- } else if (wrote) {
- if (lc > 2000) {
- do_sleep = 0;
- } else {
- do_sleep = 1;
+ if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
+ char line[128] = "";
+ int l;
+
+ switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name);
+
+ for (i = 0; i < qm->numq; i++) {
+ l = strlen(line);
+ switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|");
}
+
+ l = strlen(line);
+ switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
+
}
-
-
- }
- switch_mutex_unlock(sql_manager.cond_mutex);
+ check:
- while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
- }
+ lc = qm_ttl(qm);
- while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
+ if (!lc) {
+ switch_thread_cond_wait(qm->cond, qm->cond_mutex);
+ } else if (lc < 2000) {
+ switch_yield(200000);
+ }
}
- while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
- }
+ switch_mutex_unlock(qm->cond_mutex);
- while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) {
- free(pop);
+ for(i = 0; i < qm->numq; i++) {
+ do_flush(qm->sql_queue[i], qm->event_db);
}
- free(sqlbuf);
-
- sql_manager.thread_running = 0;
-
- switch_cache_db_release_db_handle(&sql_manager.event_db);
+ qm->thread_running = 0;
+ switch_cache_db_release_db_handle(&qm->event_db);
+
return NULL;
}
+
static char *parse_presence_data_cols(switch_event_t *event)
{
char *cols[128] = { 0 };
for (i = 0; i < sql_idx; i++) {
if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
- switch_queue_push(sql_manager.sql_queue[1], sql[i]);
+ switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
} else {
- switch_queue_push(sql_manager.sql_queue[0], sql[i]);
+ switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
}
sql[i] = NULL;
- wake_thread(0);
}
}
}
switch_cache_db_handle_t *dbh;
int r = 0;
+ if (!sql_manager.manage) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n");
+ return 0;
+ }
+
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return 0;
SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
{
- return sql_manager.event_db->type;
+ return sql_manager.qm ? sql_manager.qm->event_db->type : SCDB_TYPE_CORE_DB;
}
SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
{
+ if (!sql_manager.manage) {
+ return;
+ }
+
if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
return;
}
- switch_queue_push(sql_manager.sql_queue[3], strdup(sql));
+
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE);
}
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
char *sql = NULL;
switch_channel_t *channel = switch_core_session_get_channel(session);
+ if (!sql_manager.manage) {
+ return;
+ }
+
if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
return;
}
switch_core_get_uuid(), switch_core_session_get_uuid(session));
}
- switch_queue_push(sql_manager.sql_queue[3], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE);
switch_channel_clear_flag(channel, CF_TRACKED);
}
const char *profile_name;
const char *technology;
+ if (!sql_manager.manage) {
+ return;
+ }
if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
return;
switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
}
- switch_queue_push(sql_manager.sql_queue[2], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE);
free(xml_cdr_text);
switch_channel_set_flag(channel, CF_TRACKED);
user, realm, switch_core_get_switchname());
}
- switch_queue_push(sql_manager.sql_queue[0], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
if ( !zstr(metadata) ) {
sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
}
- switch_queue_push(sql_manager.sql_queue[0], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS;
}
sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
}
- switch_queue_push(sql_manager.sql_queue[0], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
+
return SWITCH_STATUS_SUCCESS;
}
sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
}
- switch_queue_push(sql_manager.sql_queue[0], sql);
+ switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS;
switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
{
switch_threadattr_t *thd_attr;
- uint32_t sanity = 400;
sql_manager.memory_pool = pool;
sql_manager.manage = manage;
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
- switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
- switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
-
-
-
if (!sql_manager.manage) goto skip;
top:
switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
+ switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
+ switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL);
+ switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL);
+ switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL);
+ switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
+
+
+
+
switch (sql_manager.dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
#endif
- switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
- switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
- switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
- switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
-
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_core_sqldb_start_thread();
switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
- while (sql_manager.manage && !sql_manager.thread_running && --sanity) {
- switch_yield(10000);
- }
}
return SWITCH_STATUS_SUCCESS;
}
-
-SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
+SWITCH_DECLARE(void) switch_core_sqldb_pause(void)
{
- switch_mutex_lock(sql_manager.ctl_mutex);
- if (sql_manager.thread && sql_manager.thread_running) {
- switch_status_t st;
-
- if (sql_manager.manage) {
- switch_queue_push(sql_manager.sql_queue[0], NULL);
- switch_queue_push(sql_manager.sql_queue[1], NULL);
- switch_queue_push(sql_manager.sql_queue[2], NULL);
- switch_queue_push(sql_manager.sql_queue[3], NULL);
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
- wake_thread(0);
- sql_manager.thread_running = -1;
- switch_thread_join(&st, sql_manager.thread);
- sql_manager.thread = NULL;
- switch_cache_db_release_db_handle(&sql_manager.dbh);
- sql_manager.dbh = NULL;
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
- }
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n");
+ if (sql_manager.paused) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n");
}
- switch_mutex_unlock(sql_manager.ctl_mutex);
+ sql_manager.paused = 1;
}
-SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
+SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
{
- switch_cache_db_handle_t *dbh;
+ if (!sql_manager.paused) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n");
+ }
+ sql_manager.paused = 0;
+}
- switch_mutex_lock(sql_manager.ctl_mutex);
- if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
-
- if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
- int arg = 1;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
- switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg);
+static void switch_core_sqldb_stop_thread(void)
+{
+ switch_mutex_lock(sql_manager.ctl_mutex);
+ if (sql_manager.manage) {
+ if (sql_manager.qm) {
+ switch_sql_queue_manager_destroy(&sql_manager.qm);
}
-
-
} else {
- switch_cache_db_test_reactive(dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
- switch_cache_db_execute_sql(dbh, "create index recovery1 on recovery(technology)", NULL);
- switch_cache_db_execute_sql(dbh, "create index recovery2 on recovery(profile_name)", NULL);
- switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(uuid)", NULL);
- switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
- switch_cache_db_release_db_handle(&dbh);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
+
+ switch_mutex_unlock(sql_manager.ctl_mutex);
+}
+static void switch_core_sqldb_start_thread(void)
+{
+ switch_mutex_lock(sql_manager.ctl_mutex);
if (sql_manager.manage) {
+ if (!sql_manager.qm) {
+ char *dbname = runtime.odbc_dsn;
- top:
-
- if (!sql_manager.dbh) {
- /* Activate SQL database */
- if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
-
- if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
- goto end;
- }
-
- if (runtime.odbc_dsn) {
- runtime.odbc_dsn = NULL;
- runtime.odbc_dbtype = DBTYPE_DEFAULT;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
- sql_manager.dbh = NULL;
- goto top;
+ if (zstr(dbname)) {
+ dbname = runtime.dbname;
+ if (zstr(dbname)) {
+ dbname = "core";
}
-
-
- switch_clear_flag((&runtime), SCF_USE_SQL);
- goto end;
}
- switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL);
- switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL);
- }
-
+ switch_sql_queue_manager_init_name("CORE",
+ &sql_manager.qm,
+ 4,
+ dbname,
+ runtime.core_db_pre_trans_execute,
+ runtime.core_db_post_trans_execute,
+ runtime.core_db_inner_pre_trans_execute,
+ runtime.core_db_inner_post_trans_execute);
- if (!sql_manager.thread) {
- switch_threadattr_t *thd_attr;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
- switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
- switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
- switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
- } else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n");
}
+ switch_sql_queue_manager_start(sql_manager.qm);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
-
- end:
-
switch_mutex_unlock(sql_manager.ctl_mutex);
}