}
-static uint32_t do_trans(switch_cache_db_handle_t *dbh,
- switch_queue_t *q,
- switch_mutex_t *mutex,
- uint32_t max,
- const char *pre_trans_execute,
- const char *post_trans_execute,
- const char *inner_pre_trans_execute,
- const char *inner_post_trans_execute)
+static uint32_t do_trans(switch_sql_queue_manager_t *qm)
{
char *errmsg = NULL;
void *pop;
switch_status_t status;
uint32_t ttl = 0;
- switch_mutex_t *io_mutex = dbh->io_mutex;
-
- if (!switch_queue_size(q)) {
- return 0;
- }
+ switch_mutex_t *io_mutex = qm->event_db->io_mutex;
+ int i;
if (io_mutex) switch_mutex_lock(io_mutex);
- if (!zstr(pre_trans_execute)) {
- switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
+ if (!zstr(qm->pre_trans_execute)) {
+ switch_cache_db_execute_sql_real(qm->event_db, qm->pre_trans_execute, &errmsg);
if (errmsg) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->pre_trans_execute, errmsg);
free(errmsg);
}
}
- switch(dbh->type) {
+ switch(qm->event_db->type) {
case SCDB_TYPE_CORE_DB:
{
- switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
+ switch_cache_db_execute_sql_real(qm->event_db, "BEGIN", &errmsg);
}
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_status_t result;
- if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
+ if ((result = switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
{
switch_pgsql_status_t result;
- if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
+ if ((result = switch_pgsql_SQLSetAutoCommitAttr(qm->event_db->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
}
- if (!zstr(inner_pre_trans_execute)) {
- switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
+ if (!zstr(qm->inner_pre_trans_execute)) {
+ switch_cache_db_execute_sql_real(qm->event_db, qm->inner_pre_trans_execute, &errmsg);
if (errmsg) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->inner_pre_trans_execute, errmsg);
free(errmsg);
}
}
- while(max == 0 || ttl <= max) {
- if (mutex) switch_mutex_lock(mutex);
- status = switch_queue_trypop(q, &pop);
- if (mutex) switch_mutex_unlock(mutex);
-
- if (status != SWITCH_STATUS_SUCCESS || !pop) break;
- if ((status = switch_cache_db_execute_sql(dbh, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
- ttl++;
+ while(qm->max_trans == 0 || ttl <= qm->max_trans) {
+ pop = NULL;
+
+ for (i = 0; (qm->max_trans == 0 || ttl <= qm->max_trans) && (i < qm->numq); i++) {
+ switch_mutex_lock(qm->mutex);
+ switch_queue_trypop(qm->sql_queue[i], &pop);
+ switch_mutex_unlock(qm->mutex);
+ if (pop) break;
}
- free(pop);
- if (status != SWITCH_STATUS_SUCCESS) break;
+ if (pop) {
+ if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
+ switch_mutex_lock(qm->mutex);
+ qm->written[i]++;
+ switch_mutex_unlock(qm->mutex);
+ ttl++;
+ }
+ free(pop);
+ pop = NULL;
+ if (status != SWITCH_STATUS_SUCCESS) break;
+ } else {
+ break;
+ }
}
- if (!zstr(inner_post_trans_execute)) {
- switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
+ if (!zstr(qm->inner_post_trans_execute)) {
+ switch_cache_db_execute_sql_real(qm->event_db, qm->inner_post_trans_execute, &errmsg);
if (errmsg) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->inner_post_trans_execute, errmsg);
free(errmsg);
}
}
end:
- switch(dbh->type) {
+ switch(qm->event_db->type) {
case SCDB_TYPE_CORE_DB:
{
- switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
+ switch_cache_db_execute_sql_real(qm->event_db, "COMMIT", NULL);
}
break;
case SCDB_TYPE_ODBC:
{
- switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
- switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
+ switch_odbc_SQLEndTran(qm->event_db->native_handle.odbc_dbh, 1);
+ switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 1);
}
break;
case SCDB_TYPE_PGSQL:
{
- switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
- switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
- switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
+ switch_pgsql_SQLEndTran(qm->event_db->native_handle.pgsql_dbh, 1);
+ switch_pgsql_SQLSetAutoCommitAttr(qm->event_db->native_handle.pgsql_dbh, 1);
+ switch_pgsql_finish_results(qm->event_db->native_handle.pgsql_dbh);
}
break;
}
- if (!zstr(post_trans_execute)) {
- switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
+ if (!zstr(qm->post_trans_execute)) {
+ switch_cache_db_execute_sql_real(qm->event_db, qm->post_trans_execute, &errmsg);
if (errmsg) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->post_trans_execute, errmsg);
free(errmsg);
}
}
while (qm->thread_running == 1) {
int lc;
uint32_t i;
- uint32_t iterations = 0;
+ uint32_t written, iterations = 0;
if (sql_manager.paused) {
for (i = 0; i < qm->numq; i++) {
}
goto check;
}
-
- for (i = 0; i < qm->numq; i++) {
- while(switch_queue_size(qm->sql_queue[i])) {
- uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, qm->max_trans,
- qm->pre_trans_execute,
- qm->post_trans_execute,
- qm->inner_pre_trans_execute,
- qm->inner_post_trans_execute);
-
- iterations += written;
-
- switch_mutex_lock(qm->mutex);
- qm->written[i] += written;
- switch_mutex_unlock(qm->mutex);
- if (written < qm->max_trans) {
- break;
- }
- }
- }
+ do {
+ written = do_trans(qm);
+ iterations += written;
+ } while(written == qm->max_trans);
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
char line[128] = "";