SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
- switch_sql_queue_manager_t **qmp,
- uint32_t numq, const char *dsn,
- const char *pre_trans_execute,
- const char *post_trans_execute,
- const char *inner_pre_trans_execute,
- const char *inner_post_trans_execute);
-
-#define switch_sql_queue_manager_init(_q, _n, _d, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _p1, _p2, _ip1, _ip2)
+ switch_sql_queue_manager_t **qmp,
+ uint32_t numq, const char *dsn, uint32_t max_trans,
+ const char *pre_trans_execute,
+ const char *post_trans_execute,
+ const char *inner_pre_trans_execute,
+ const char *inner_post_trans_execute);
+
+#define switch_sql_queue_manager_init(_q, _n, _d, _m, _p1, _p2, _ip1, _ip2) switch_sql_queue_manager_init_name(__FILE__, _q, _n, _d, _m, _p1, _p2, _ip1, _ip2)
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
if (pop) {
sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
sofia_process_dispatch_event(&de);
- switch_os_yield();
+ switch_cond_next();
} else {
break;
}
end:
- if (profile->pres_type) {
- switch_cond_next();
- } else {
- switch_os_yield();
- }
-
+ switch_cond_next();
return;
}
switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
switch_sql_queue_manager_init_name(qname,
- &profile->qm,
- 1,
- profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
- profile->pre_trans_execute,
- profile->post_trans_execute,
- profile->inner_pre_trans_execute,
- profile->inner_post_trans_execute);
+ &profile->qm,
+ 1,
+ profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
+ SWITCH_MAX_TRANS,
+ profile->pre_trans_execute,
+ profile->post_trans_execute,
+ profile->inner_pre_trans_execute,
+ profile->inner_post_trans_execute);
switch_sql_queue_manager_start(profile->qm);
if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {
char *inner_pre_trans_execute;
char *inner_post_trans_execute;
switch_memory_pool_t *pool;
+ uint32_t max_trans;
};
static int qm_wake(switch_sql_queue_manager_t *qm)
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
- switch_sql_queue_manager_t **qmp,
- uint32_t numq, const char *dsn,
- const char *pre_trans_execute,
- const char *post_trans_execute,
- const char *inner_pre_trans_execute,
- const char *inner_post_trans_execute)
+ switch_sql_queue_manager_t **qmp,
+ uint32_t numq, const char *dsn, uint32_t max_trans,
+ const char *pre_trans_execute,
+ const char *post_trans_execute,
+ const char *inner_pre_trans_execute,
+ const char *inner_post_trans_execute)
{
switch_memory_pool_t *pool;
switch_sql_queue_manager_t *qm;
qm->numq = numq;
qm->dsn = switch_core_strdup(qm->pool, dsn);
qm->name = switch_core_strdup(qm->pool, name);
+ qm->max_trans = max_trans;
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
static uint32_t do_trans(switch_cache_db_handle_t *dbh,
switch_queue_t *q,
switch_mutex_t *mutex,
+ uint32_t max,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
void *pop;
switch_status_t status;
uint32_t ttl = 0;
+ switch_mutex_t *io_mutex = dbh->io_mutex;
if (!switch_queue_size(q)) {
return 0;
}
+ if (io_mutex) switch_mutex_lock(io_mutex);
+
+ if (!zstr(pre_trans_execute)) {
+ switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
+ if (errmsg) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
+ free(errmsg);
+ }
+ }
+
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
}
- for(;;) {
+ if (!zstr(inner_pre_trans_execute)) {
+ switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
+ if (errmsg) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
+ free(errmsg);
+ }
+ }
+
+ while(max == 0 || ttl <= max) {
if (mutex) switch_mutex_lock(mutex);
status = switch_queue_trypop(q, &pop);
if (mutex) switch_mutex_unlock(mutex);
if (status != SWITCH_STATUS_SUCCESS) break;
}
+ if (!zstr(inner_post_trans_execute)) {
+ switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
+ if (errmsg) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
+ free(errmsg);
+ }
+ }
+
+
end:
switch(dbh->type) {
}
+ if (!zstr(post_trans_execute)) {
+ switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
+ if (errmsg) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
+ free(errmsg);
+ }
+ }
+
+ if (io_mutex) switch_mutex_unlock(io_mutex);
return ttl;
}
uint32_t sanity = 120;
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
- uint32_t i;
+ uint32_t i, countdown = 0;
while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
}
for (i = 0; i < qm->numq; i++) {
- uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex,
- qm->pre_trans_execute,
- qm->post_trans_execute,
- qm->inner_pre_trans_execute,
- qm->inner_post_trans_execute);
-
- iterations += written;
+ while(switch_queue_size(qm->sql_queue[i])) {
+ uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], qm->mutex, qm->max_trans,
+ qm->pre_trans_execute,
+ qm->post_trans_execute,
+ qm->inner_pre_trans_execute,
+ qm->inner_post_trans_execute);
+
+ iterations += written;
- switch_mutex_lock(qm->mutex);
- qm->written[i] += written;
- switch_mutex_unlock(qm->mutex);
+ switch_mutex_lock(qm->mutex);
+ qm->written[i] += written;
+ switch_mutex_unlock(qm->mutex);
+
+ if (written < qm->max_trans) {
+ break;
+ }
+ }
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
check:
- lc = qm_ttl(qm);
+ countdown = 40;
- if (!lc) {
- switch_thread_cond_wait(qm->cond, qm->cond_mutex);
- } else if (lc < 2000) {
- switch_yield(200000);
+ while (--countdown && (lc = qm_ttl(qm)) < qm->max_trans / 4) {
+ if (lc == 0) {
+ switch_thread_cond_wait(qm->cond, qm->cond_mutex);
+ break;
+ }
+ switch_yield(5000);
}
}
&sql_manager.qm,
4,
dbname,
+ SWITCH_MAX_TRANS,
runtime.core_db_pre_trans_execute,
runtime.core_db_post_trans_execute,
runtime.core_db_inner_pre_trans_execute,