uint32_t numq;
char *dsn;
switch_thread_t *thread;
+ int thread_initiated;
int thread_running;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
switch_threadattr_create(&thd_attr, qm->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
- switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool);
- return SWITCH_STATUS_SUCCESS;
+ if (switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool) == SWITCH_STATUS_SUCCESS) {
+ while (!qm->thread_initiated) {
+ switch_cond_next();
+ }
+
+ if (qm->event_db) {
+ return SWITCH_STATUS_SUCCESS;
+ }
+ }
}
return SWITCH_STATUS_FALSE;
if (!qm->event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
+ qm->thread_initiated = 1;
return NULL;
}
- qm->thread_running = 1;
-
switch_mutex_lock(qm->cond_mutex);
switch (qm->event_db->type) {
break;
}
+ qm->thread_initiated = 1;
+ qm->thread_running = 1;
while (qm->thread_running == 1) {
uint32_t i, lc;
#include <test/switch_test.h>
+int max_rows = 150;
+
+int status = 0;
+
+int table_count_func(void *pArg, int argc, char **argv, char **columnNames){
+ if (argc > 0) {
+ status = atoi(argv[0]);
+ }
+
+ return -1;
+}
+
FST_CORE_DB_BEGIN("./conf")
{
FST_SUITE_BEGIN(switch_core_db)
fst_check_string_equals(res2, "");
}
FST_TEST_END()
+
+ FST_TEST_BEGIN(test_switch_cache_db_queue_manager_race)
+ {
+ int i;
+ switch_sql_queue_manager_t *qm = NULL;
+
+ switch_sql_queue_manager_init_name("TEST",
+ &qm,
+ 4,
+ "test_switch_cache_db_queue_manager_race",
+ SWITCH_MAX_TRANS,
+ NULL, NULL, NULL, NULL);
+
+ switch_sql_queue_manager_start(qm);
+
+ switch_sql_queue_manager_push_confirm(qm, "DROP TABLE IF EXISTS t;", 0, SWITCH_TRUE);
+ switch_sql_queue_manager_push_confirm(qm, "CREATE TABLE t (col1 INT);", 0, SWITCH_TRUE);
+
+ for (i = 0; i < max_rows; i++) {
+ switch_sql_queue_manager_push(qm, "INSERT INTO t (col1) VALUES (1);", 0, SWITCH_TRUE);
+ }
+
+ switch_sleep(1 * 1000 * 1000);
+
+ switch_sql_queue_manager_execute_sql_callback(qm, "SELECT COUNT(col1) FROM t;", table_count_func, NULL);
+
+ while (switch_sql_queue_manager_size(qm, 0)) {
+ switch_cond_next();
+ }
+
+ switch_sql_queue_manager_stop(qm);
+ switch_sql_queue_manager_destroy(&qm);
+
+ fst_check_int_equals(status, max_rows);
+ }
+ FST_TEST_END()
+
+
}
FST_SUITE_END()
}