#include <switch.h>
#include "private/switch_core_pvt.h"
+//*#define DEBUG_SQL 1
static struct {
switch_cache_db_handle_t *event_db;
switch_memory_pool_t *memory_pool;
switch_event_node_t *event_node;
switch_thread_t *thread;
+ switch_thread_t *db_thread;
int thread_running;
+ int db_thread_running;
switch_bool_t manage;
switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex;
}
+static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj)
+{
+ int sec = 0;
+ sql_manager.db_thread_running = 1;
+
+ while (sql_manager.db_thread_running == 1) {
+ if (++sec == SQL_CACHE_TIMEOUT) {
+ sql_close(switch_epoch_time_now(NULL));
+ wake_thread(1);
+ sec = 0;
+ }
+ switch_yield(1000);
+ }
+
+
+ return NULL;
+}
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
{
char *sql = NULL;
switch_size_t newlen;
int lc = 0;
- uint32_t loops = 0, sec = 0;
- uint32_t l1 = 1000;
uint32_t sanity = 120;
switch_assert(sqlbuf);
- if (!sql_manager.manage) {
- l1 = 10;
- }
-
while (!sql_manager.event_db) {
if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
break;
return NULL;
}
-
sql_manager.thread_running = 1;
while (sql_manager.thread_running == 1) {
switch_mutex_lock(sql_manager.cond_mutex);
- if (++loops == l1) {
- if (++sec == SQL_CACHE_TIMEOUT) {
- sql_close(switch_epoch_time_now(NULL));
- sec = 0;
- }
- loops = 0;
- }
-
- if (!sql_manager.manage) {
- switch_yield(100000);
- continue;
- }
-
if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
if (new_mlen < runtime.max_sql_buffer_len) {
sql_len = new_mlen;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
+#ifdef DEBUG_SQL
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]));
+#endif
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
abort();
}
sqlbuf = tmp;
} else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
+#ifdef DEBUG_SQL
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
+#endif
goto skip;
}
}
lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
if (trans && iterations && (iterations > target || !lc)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
+#ifdef DEBUG_SQL
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations);
+#endif
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
}
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n");
+#ifdef DEBUG_SQL
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
+#endif
iterations = 0;
trans = 0;
len = 0;
switch_yield(400000);
}
+ lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
+
if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
}
switch_queue_push(sql_manager.sql_queue[0], sql[i]);
}
sql[i] = NULL;
- wake_thread(0);
+ wake_thread(1);
}
}
}
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
+ if (sql_manager.manage) {
+ switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
+ }
+ switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
while (!sql_manager.thread_running) {
switch_yield(10000);
switch_thread_join(&st, sql_manager.thread);
}
+
+ if (sql_manager.thread && sql_manager.db_thread_running) {
+ sql_manager.db_thread_running = -1;
+ switch_thread_join(&st, sql_manager.db_thread);
+ }
+
switch_cache_db_flush_handles();
sql_close(0);