SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm)
{
switch_status_t status = SWITCH_STATUS_FALSE;
- uint32_t i;
+ uint32_t i, sanity = 100;
- if (qm->thread_running) {
- qm->thread_running = 0;
+ if (qm->thread_running == 1) {
+ qm->thread_running = -1;
- for(i = 0; i < qm->numq; i++) {
- switch_queue_push(qm->sql_queue[i], NULL);
- switch_queue_interrupt_all(qm->sql_queue[i]);
+ while(--sanity && qm->thread_running == -1) {
+ for(i = 0; i < qm->numq; i++) {
+ switch_queue_push(qm->sql_queue[i], NULL);
+ switch_queue_interrupt_all(qm->sql_queue[i]);
+ }
+ qm_wake(qm);
+
+ if (qm->thread_running == -1) {
+ switch_yield(100000);
+ }
}
- qm_wake(qm);
status = SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
{
- if (sql_manager.paused) {
+ if (sql_manager.paused || qm->thread_running != 1) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
if (!dup) free((char *)sql);
qm_wake(qm);
return SWITCH_STATUS_SUCCESS;
}
- if (!qm->thread_running) {
+ if (qm->thread_running != 1) {
if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
#ifdef EXEC_NOW
switch_cache_db_handle_t *dbh;
+ if (sql_manager.paused || qm->thread_running != 1) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
+ if (!dup) free((char *)sql);
+ qm_wake(qm);
+ return SWITCH_STATUS_SUCCESS;
+ }
+
if (switch_cache_db_get_db_handle_dsn(&dbh, qm->dsn) == SWITCH_STATUS_SUCCESS) {
switch_cache_db_execute_sql(dbh, (char *)sql, NULL);
switch_cache_db_release_db_handle(&dbh);
return SWITCH_STATUS_SUCCESS;
}
- if (!qm->thread_running) {
+ if (qm->thread_running != 1) {
if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
do_flush(qm, i, qm->event_db);
}
- qm->thread_running = 0;
-
switch_cache_db_release_db_handle(&qm->event_db);
+
+ qm->thread_running = 0;
return NULL;
}