static struct {
switch_cache_db_handle_t *event_db;
- switch_queue_t *sql_queue[2];
+ switch_queue_t *sql_queue[4];
switch_memory_pool_t *memory_pool;
switch_thread_t *thread;
switch_thread_t *db_thread;
}
-#define SWITCH_CORE_RECOVERY_DB "core_recovery"
-SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
-{
- switch_cache_db_connection_options_t options = { {0} };
- switch_status_t r;
-
- if (!sql_manager.manage) {
- return SWITCH_STATUS_FALSE;
- }
-
- if (zstr(runtime.recovery_odbc_dsn)) {
- if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
- return SWITCH_STATUS_FALSE;
- }
-
- if (runtime.recovery_dbname) {
- options.core_db_options.db_path = runtime.recovery_dbname;
- } else {
- options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB;
- }
- r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
-
- } else {
- char *dsn;
- if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) {
- options.pgsql_options.dsn = (char*)(dsn + 6);
-
- r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
- } else {
- options.odbc_options.dsn = runtime.recovery_odbc_dsn;
- options.odbc_options.user = runtime.recovery_odbc_user;
- options.odbc_options.pass = runtime.recovery_odbc_pass;
-
- r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
- }
- }
-
- /* I *think* we can do without this now, if not let me know
- if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
- (*dbh)->io_mutex = sql_manager.io_mutex;
- }
- */
-
- return r;
-}
-
-
#define SQL_CACHE_TIMEOUT 30
#define SQL_REG_TIMEOUT 15
}
while (sql_manager.thread_running == 1) {
- if (save_sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
- switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
+ if (save_sql ||
+ switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
+ switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS ||
+ switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS ||
+ switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS
+ ) {
if (save_sql) {
sql = save_sql;
} else {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
+ "SAVE %d %d %d %d\n",
+ switch_queue_size(sql_manager.sql_queue[0]),
+ switch_queue_size(sql_manager.sql_queue[1]),
+ switch_queue_size(sql_manager.sql_queue[2]),
+ switch_queue_size(sql_manager.sql_queue[3])
+ );
}
save_sql = sql;
sql = NULL;
}
}
- lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
+ lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
+ switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
if (trans && iterations && (iterations > target || !lc)) {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations);
+ "RUN %d %d %d %d %d\n",
+ switch_queue_size(sql_manager.sql_queue[0]),
+ switch_queue_size(sql_manager.sql_queue[1]),
+ switch_queue_size(sql_manager.sql_queue[2]),
+ switch_queue_size(sql_manager.sql_queue[3]),
+ iterations);
}
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
wrote = 1;
}
- lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
+ lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
+ switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
free(pop);
}
+ while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) {
+ free(pop);
+ }
+
+ while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) {
+ free(pop);
+ }
+
free(sqlbuf);
sql_manager.thread_running = 0;
char *sql = NULL;
switch_cache_db_handle_t *dbh;
- if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
+ if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
switch_cache_db_handle_t *dbh;
int r = 0;
- if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
+ if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return 0;
}
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
{
char *sql = NULL;
- switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
if (!switch_channel_test_flag(channel, CF_TRACKABLE)) {
return;
}
- if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
- return;
- }
-
if ((switch_channel_test_flag(channel, CF_RECOVERING))) {
return;
}
switch_core_get_uuid(), switch_core_session_get_uuid(session));
}
- switch_cache_db_execute_sql(dbh, sql, NULL);
+ switch_queue_push(sql_manager.sql_queue[3], sql);
switch_channel_clear_flag(channel, CF_TRACKED);
-
- switch_safe_free(sql);
}
- switch_cache_db_release_db_handle(&dbh);
-
}
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
switch_xml_t cdr = NULL;
char *xml_cdr_text = NULL;
char *sql = NULL;
- switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *profile_name;
const char *technology;
return;
}
-
profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1);
technology = session->endpoint_interface->interface_name;
- if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
- return;
- }
-
-
if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) {
xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE);
switch_xml_free(cdr);
switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
}
- switch_cache_db_execute_sql(dbh, sql, NULL);
- switch_safe_free(sql);
-
+ switch_queue_push(sql_manager.sql_queue[2], sql);
+
free(xml_cdr_text);
switch_channel_set_flag(channel, CF_TRACKED);
}
- switch_cache_db_release_db_handle(&dbh);
-
}
switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
+ switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
+ switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
if (sql_manager.manage) {
switch_queue_push(sql_manager.sql_queue[0], NULL);
switch_queue_push(sql_manager.sql_queue[1], NULL);
+ switch_queue_push(sql_manager.sql_queue[2], NULL);
+ switch_queue_push(sql_manager.sql_queue[3], NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
wake_thread(0);
sql_manager.thread_running = -1;
switch_mutex_lock(sql_manager.ctl_mutex);
- if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
+ if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {