} switch_hold_record_t;
+typedef struct switch_thread_data_s {
+ switch_thread_start_t func;
+ void *obj;
+ int alloc;
+} switch_thread_data_t;
+
#define MESSAGE_STAMP_FFL(_m) _m->_file = __FILE__; _m->_func = __SWITCH_FUNC__; _m->_line = __LINE__
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
+SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp);
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session);
/*!
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp);
-SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp,
- uint32_t numq, const char *dsn,
- const char *pre_trans_execute,
- const char *post_trans_execute,
- const char *inner_pre_trans_execute,
- const char *inner_post_trans_execute);
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
+ switch_sql_queue_manager_t **qmp,
+ uint32_t numq, const char *dsn,
+ const char *pre_trans_execute,
+ const char *post_trans_execute,
+ const char *inner_pre_trans_execute,
+ const char *inner_post_trans_execute);
+
+#define switch_switch_sql_queue_manager_init(_q, _d, _p1, _p2, _ip1, _ip2) switch_switch_sql_queue_manager_init_name(__FILE__, _q, _d, _p1, _p2, _ip1, _ip2)
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
sql = switch_mprintf("insert into sip_dialogs (uuid,presence_id,presence_data,profile_name,hostname,rcd,call_info_state) "
"values ('%q', '%q', '%q', '%q', '%q', %ld, '')", switch_core_session_get_uuid(nsession),
switch_str_nil(presence_id), switch_str_nil(presence_data), profile->name, mod_sofia_globals.hostname, (long) now);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
#endif
PFLAG_DISABLE_HOLD,
PFLAG_AUTO_NAT,
PFLAG_SIPCOMPACT,
- PFLAG_SQL_IN_TRANS,
+ PFLAG_USE_ME,
PFLAG_PRESENCE_PRIVACY,
PFLAG_PASS_CALLEE_ID,
PFLAG_LOG_AUTH_FAIL,
PFLAG_MWI_USE_REG_CALLID,
PFLAG_FIRE_MESSAGE_EVENTS,
PFLAG_SEND_DISPLAY_UPDATE,
+ PFLAG_RUNNING_TRANS,
/* No new flags below this line */
PFLAG_MAX
} PFLAGS;
char *post_trans_execute;
char *inner_pre_trans_execute;
char *inner_post_trans_execute;
- switch_queue_t *sql_queue;
+ switch_sql_queue_manager_t *qm;
char *acl[SOFIA_MAX_ACL];
char *acl_pass_context[SOFIA_MAX_ACL];
char *acl_fail_context[SOFIA_MAX_ACL];
return NULL;
}
+void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
+{
+ sofia_dispatch_event_t *de = *dep;
+ switch_memory_pool_t *pool;
+ sofia_profile_t *profile = (*dep)->profile;
+ switch_thread_data_t *td;
+
+ switch_core_new_memory_pool(&pool);
+
+ *dep = NULL;
+ de->pool = pool;
+
+ td = switch_core_alloc(pool, sizeof(*td));
+ td->func = sofia_msg_thread_run_once;
+ td->obj = de;
+
+ switch_mutex_lock(profile->ireg_mutex);
+ switch_thread_pool_launch_thread(&td);
+ switch_mutex_unlock(profile->ireg_mutex);
+}
+
+#if 0
void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
{
sofia_dispatch_event_t *de = *dep;
sofia_process_dispatch_event(&de);
}
}
+#endif
void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
{
sofia_profile_t *profile = (sofia_profile_t *) obj;
uint32_t ireg_loops = profile->ireg_seconds; /* Number of loop iterations done when we haven't checked for registrations */
uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */
- void *pop = NULL; /* queue_pop placeholder */
- switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */
- char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */
- char *sql = NULL; /* Current SQL statement */
- switch_time_t last_commit; /* Last time we committed stuff to the DB */
- switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */
- switch_size_t len = 0; /* Current length of sqlbuf */
- uint32_t statements = 0; /* Number of statements in the current sql buffer */
-
- last_commit = last_check = switch_micro_time_now();
-
- if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
- sqlbuf = (char *) malloc(sql_len);
- }
sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
- switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
-
- /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
- while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
-
- if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
- /* Do we have enough statements or is the timeout expired */
- while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
- switch_micro_time_now() - last_check < 1000000 &&
- (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
+ while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING))) {
+
+ if (profile->watchdog_enabled) {
+ uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
+
+ if (profile->step_timeout) {
+ step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
- switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
-
- if (sleepy_time < 1000 || sleepy_time > 1000000) {
- sleepy_time = 1000;
+ if (step_diff > profile->step_timeout) {
+ step_fail = 1;
}
+ }
+
+ if (profile->event_timeout) {
+ event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
- if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
- switch_size_t newlen;
-
- if (!sql) sql = (char *) pop;
-
- newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
-
- if (len + newlen + 10 > sql_len) {
- switch_size_t new_mlen = len + newlen + 10 + 10240;
-
- if (new_mlen < SQLLEN) {
- sql_len = new_mlen;
-
- if (!(tmp = realloc(sqlbuf, sql_len))) {
- abort();
- break;
- }
- sqlbuf = tmp;
- } else {
- break;
- }
- }
-
- sprintf(sqlbuf + len, "%s;\n", sql);
- len += newlen;
- free(sql);
- sql = NULL;
-
- statements++;
+ if (event_diff > profile->event_timeout) {
+ event_fail = 1;
}
}
- /* Execute here */
- last_commit = switch_micro_time_now();
-
- if (len) {
- //printf("TRANS:\n%s\n", sqlbuf);
- switch_mutex_lock(profile->ireg_mutex);
- sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
- //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
- switch_mutex_unlock(profile->ireg_mutex);
- statements = 0;
- len = 0;
+ if (step_fail && profile->event_timeout && !event_fail) {
+ step_fail = 0;
}
-
- } else {
- if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
- sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
- free(pop);
+
+ if (event_fail || step_fail) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
+ "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
+ switch_yield(2000000);
+ watchdog_triggered_abort();
}
}
- if (switch_micro_time_now() - last_check >= 1000000) {
- if (profile->watchdog_enabled) {
- uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
-
- if (profile->step_timeout) {
- step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
-
- if (step_diff > profile->step_timeout) {
- step_fail = 1;
- }
- }
-
- if (profile->event_timeout) {
- event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
- if (event_diff > profile->event_timeout) {
- event_fail = 1;
- }
- }
-
- if (step_fail && profile->event_timeout && !event_fail) {
- step_fail = 0;
- }
-
- if (event_fail || step_fail) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
- "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
- switch_yield(2000000);
- watchdog_triggered_abort();
- }
+ if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
+ if (++ireg_loops >= IREG_SECONDS) {
+ time_t now = switch_epoch_time_now(NULL);
+ sofia_reg_check_expire(profile, now, 0);
+ ireg_loops = 0;
}
-
-
- if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
- if (++ireg_loops >= IREG_SECONDS) {
- time_t now = switch_epoch_time_now(NULL);
- sofia_reg_check_expire(profile, now, 0);
- ireg_loops = 0;
- }
-
- if (++gateway_loops >= GATEWAY_SECONDS) {
- sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
- gateway_loops = 0;
- }
- sofia_sub_check_gateway(profile, time(NULL));
+ if (++gateway_loops >= GATEWAY_SECONDS) {
+ sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
+ gateway_loops = 0;
}
- last_check = switch_micro_time_now();
+ sofia_sub_check_gateway(profile, time(NULL));
}
- }
- switch_mutex_lock(profile->ireg_mutex);
- while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
- sofia_glue_actually_execute_sql(profile, (char *) pop, NULL);
- free(pop);
+ switch_yield(1000000);
+
}
- switch_mutex_unlock(profile->ireg_mutex);
sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING);
- switch_safe_free(sqlbuf);
return NULL;
}
int sanity;
switch_thread_t *worker_thread;
switch_status_t st;
+ char qname [128] = "";
switch_mutex_lock(mod_sofia_globals.mutex);
mod_sofia_globals.threads++;
switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
+ switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
+ switch_switch_sql_queue_manager_init_name(qname,
+ &profile->qm,
+ 1,
+ profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
+ profile->pre_trans_execute,
+ profile->post_trans_execute,
+ profile->inner_pre_trans_execute,
+ profile->inner_post_trans_execute);
+ switch_switch_sql_queue_manager_start(profile->qm);
+
if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
(sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
switch_mutex_lock(profile->flag_mutex);
switch_mutex_unlock(profile->flag_mutex);
+ switch_switch_sql_queue_manager_stop(profile->qm);
+
if (switch_event_create(&s_event, SWITCH_EVENT_UNPUBLISH) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
(sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
sofia_set_pflag(profile, PFLAG_SEND_DISPLAY_UPDATE);
sofia_set_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER);
//sofia_set_pflag(profile, PFLAG_PRESENCE_ON_FIRST_REGISTER);
- sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
profile->shutdown_type = "false";
profile->local_network = "localnet.auto";
} else {
sofia_clear_pflag(profile, PFLAG_PASS_CALLEE_ID);
}
- } else if (!strcasecmp(var, "sql-in-transactions")) {
- int tmp = atoi(val);
-
- if (switch_true(val)) {
- tmp = 500;
- }
-
- if (tmp > 0) {
- profile->trans_timeout = tmp;
- sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
- } else {
- sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
- }
-
} else if (!strcasecmp(var, "enable-soa")) {
if (switch_true(val)) {
sofia_set_flag(profile, TFLAG_ENABLE_SOA);
switch_str_nil(presence_id), switch_str_nil(presence_data), switch_str_nil(p), (long) now);
switch_assert(sql);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
} else if (status == 200 && (profile->pres_type)) {
switch_assert(sql);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
-
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
if (is_nat) {
};
switch_cache_db_handle_t *dbh = sofia_glue_get_db_handle(profile);
-
+ char *test2;
+
if (!dbh) {
return 0;
}
switch_cache_db_test_reactive(dbh, test_sql, "drop table sip_registrations", reg_sql);
-
-
- if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
- char *test2 = switch_mprintf("%s;%s", test_sql, test_sql);
+
+ test2 = switch_mprintf("%s;%s", test_sql, test_sql);
- if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "GREAT SCOTT!!! Cannot execute batched statements!\n"
- "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
- sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
-
- }
+ if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "GREAT SCOTT!!! Cannot execute batched statements!\n"
+ "If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
+
+ switch_cache_db_release_db_handle(&dbh);
free(test2);
+ free(test_sql);
+ return 0;
}
+ free(test2);
+
+
free(test_sql);
test_sql = switch_mprintf("delete from sip_subscriptions where hostname='%q' and full_to='XXX'", mod_sofia_globals.hostname);
void sofia_glue_execute_sql(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
{
- switch_status_t status = SWITCH_STATUS_FALSE;
- char *d_sql = NULL, *sql;
+ char *sql;
switch_assert(sqlp && *sqlp);
- sql = *sqlp;
+ sql = *sqlp;
- if (profile->sql_queue) {
- if (sql_already_dynamic) {
- d_sql = sql;
- } else {
- d_sql = strdup(sql);
- }
-
- switch_assert(d_sql);
- if ((status = switch_queue_trypush(profile->sql_queue, d_sql)) == SWITCH_STATUS_SUCCESS) {
- d_sql = NULL;
- }
- } else if (sql_already_dynamic) {
- d_sql = sql;
- }
-
- if (status != SWITCH_STATUS_SUCCESS) {
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- }
-
- switch_safe_free(d_sql);
+ switch_switch_sql_queue_manager_push(profile->qm, sql, 0, !sql_already_dynamic);
if (sql_already_dynamic) {
*sqlp = NULL;
}
}
+
void sofia_glue_execute_sql_now(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
{
- sofia_glue_actually_execute_sql(profile, *sqlp, profile->ireg_mutex);
+ char *sql;
+
+ switch_assert(sqlp && *sqlp);
+ sql = *sqlp;
+
+ switch_switch_sql_queue_manager_push_confirm(profile->qm, sql, 0, !sql_already_dynamic);
+
if (sql_already_dynamic) {
- switch_safe_free(*sqlp);
+ *sqlp = NULL;
}
- *sqlp = NULL;
}
}
switch_assert(sql != NULL);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
-
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
sstr = switch_mprintf("terminated;reason=noresource");
} else {
"sub del sql: %s\n", sql);
}
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
}
switch_safe_free(sql);
sql = switch_mprintf("delete from sip_registrations where call_id='%q' %s", call_id, sqlextra);
- sofia_glue_execute_sql_now(profile, &sql, SWITCH_FALSE);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_safe_free(sqlextra);
switch_safe_free(sql);
void sofia_reg_check_expire(sofia_profile_t *profile, time_t now, int reboot)
{
- char sql[1024];
-
-
+ char *sql;
if (now) {
- switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+ sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip"
",%d from sip_registrations where expires > 0 and expires <= %ld", reboot, (long) now);
} else {
- switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+ sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip" ",%d from sip_registrations where expires > 0", reboot);
}
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
if (now) {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
+ sql = switch_mprintf("delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
-
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
+
if (now) {
- switch_snprintfv(sql, sizeof(sql), "select call_id from sip_shared_appearance_dialogs where hostname='%q' "
+ sql = switch_mprintf("select call_id from sip_shared_appearance_dialogs where hostname='%q' "
"and profile_name='%s' and expires <= %ld", mod_sofia_globals.hostname, profile->name, (long) now);
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_sla_dialog_del_callback, profile);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
+ sql = switch_mprintf("delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
mod_sofia_globals.hostname, (long) now);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
}
if (now) {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
+ sql = switch_mprintf("delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
+ sql = switch_mprintf("delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
-
- sofia_presence_check_subscriptions(profile, now);
+ sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
+ sql = switch_mprintf("delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
- switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+ sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
}
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
if (sofia_test_pflag(profile, PFLAG_ALL_REG_OPTIONS_PING)) {
- switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
+ sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
"expires,user_agent,server_user,server_host,profile_name"
" from sip_registrations where hostname='%s' and "
"profile_name='%s'", mod_sofia_globals.hostname, profile->name);
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_nat_callback, profile);
} else if (sofia_test_pflag(profile, PFLAG_NAT_OPTIONS_PING)) {
- switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
+ sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
"expires,user_agent,server_user,server_host,profile_name"
" from sip_registrations where (status like '%%NAT%%' "
"or contact like '%%fs_nat=yes%%') and hostname='%s' "
void sofia_reg_check_sync(sofia_profile_t *profile)
{
- char sql[1024];
-
+ char *sql;
- switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
+ sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip"
" from sip_registrations where expires > 0");
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sql = switch_mprintf("delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
- switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
+ sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const char *host, char *val, switch_size_t len)
{
struct callback_t cbt = { 0 };
- char sql[512] = "";
+ char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
cbt.len = len;
if (host) {
- switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+ sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
- switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
+ sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
}
switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *profile, const char *user, const char *host)
{
struct callback_t cbt = { 0 };
- char sql[512] = "";
+ char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
}
if (host) {
- switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+ sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
- switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
+ sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
}
switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_multi(sofia_profile_t *profile, const char *user, const char *host)
{
struct callback_t cbt = { 0 };
- char sql[512] = "";
+ char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
}
if (host) {
- switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
+ sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
- switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q'", user);
+ sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q'", user);
}
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_find_reg_with_positive_expires_callback, &cbt);
(long) switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : DEFAULT_NONCE_TTL),
profile->name, mod_sofia_globals.hostname);
switch_assert(sql != NULL);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
auth_str = switch_mprintf("Digest realm=\"%q\", nonce=\"%q\",%s algorithm=MD5, qop=\"auth\"", realm, uuid_str, stale ? " stale=true," : "");
switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : exptime + 10), ncl, nonce);
switch_assert(sql != NULL);
- sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
- switch_safe_free(sql);
+ sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
if (ret == AUTH_OK)
ret = AUTH_RENEWED;
}
if (check_status == SWITCH_STATUS_SUCCESS) {
- switch_core_session_t *session = (switch_core_session_t *) pop;
- switch_size_t id;
+ switch_thread_data_t *td = (switch_thread_data_t *) pop;
- if (!session) break;
+ if (!td) break;
- id = session->id;
-
switch_mutex_lock(session_manager.mutex);
session_manager.busy++;
switch_mutex_unlock(session_manager.mutex);
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing session %"SWITCH_SIZE_T_FMT" %s\n",
- (long) thread, id, switch_core_session_get_name(session));
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
- switch_core_session_thread(thread, (void *) session);
-
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing session %"SWITCH_SIZE_T_FMT"\n",
- (long) thread, id);
+ td->func(thread, td->obj);
+
+ if (td->alloc) {
+ free(td);
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread);
+
switch_mutex_lock(session_manager.mutex);
session_manager.busy--;
switch_mutex_unlock(session_manager.mutex);
return NULL;
}
+SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
+{
+ switch_status_t status = SWITCH_STATUS_SUCCESS;
+ switch_thread_data_t *td;
+
+ switch_assert(tdp);
+
+ td = *tdp;
+ *tdp = NULL;
+
+ switch_queue_push(session_manager.thread_queue, td);
+ check_queue();
+
+ return status;
+}
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
{
switch_status_t status = SWITCH_STATUS_INUSE;
-
+ switch_thread_data_t *td;
+
switch_mutex_lock(session->mutex);
if (switch_test_flag(session, SSF_THREAD_RUNNING)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot double-launch thread!\n");
status = SWITCH_STATUS_SUCCESS;
switch_set_flag(session, SSF_THREAD_RUNNING);
switch_set_flag(session, SSF_THREAD_STARTED);
- switch_queue_push(session_manager.thread_queue, session);
+ td = switch_core_session_alloc(session, sizeof(*td));
+ td->obj = session;
+ td->func = switch_core_session_thread;
+ switch_queue_push(session_manager.thread_queue, td);
check_queue();
}
switch_mutex_unlock(session->mutex);
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
struct switch_sql_queue_manager {
+ const char *name;
switch_cache_db_handle_t *event_db;
switch_queue_t **sql_queue;
+ uint32_t *pre_written;
+ uint32_t *written;
+ int *sizes;
uint32_t numq;
char *dsn;
switch_thread_t *thread;
int thread_running;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
+ switch_mutex_t *mutex;
char *pre_trans_execute;
char *post_trans_execute;
char *inner_pre_trans_execute;
switch_memory_pool_t *pool;
};
-static void qm_wake(switch_sql_queue_manager_t *qm)
+static int qm_wake(switch_sql_queue_manager_t *qm)
{
if (switch_mutex_trylock(qm->cond_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(qm->cond);
switch_mutex_unlock(qm->cond_mutex);
+ return 1;
}
+
+ return 0;
}
static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
}
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
+{
+ int want, size, x = 0, sanity = 0;
+ uint32_t written;
+
+ if (!qm->thread_running) {
+ return SWITCH_STATUS_FALSE;
+ }
+
+ if (sql_manager.thread_running != 1) {
+ return SWITCH_STATUS_FALSE;
+ }
+
+ if (pos > qm->numq - 1) {
+ pos = 0;
+ }
+
+ switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
+
+ switch_mutex_lock(qm->mutex);
+ written = qm->written[pos];
+ size = qm->sizes[pos];
+ want = written + size;
+ switch_mutex_unlock(qm->mutex);
+
+ qm_wake(qm);
+
+ while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) {
+ switch_yield(5000);
+
+ if (++x == 200) {
+ qm_wake(qm);
+ x = 0;
+ if (++sanity == 20) {
+ break;
+ }
+ }
+ }
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
-SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp,
- uint32_t numq, const char *dsn,
- const char *pre_trans_execute,
- const char *post_trans_execute,
- const char *inner_pre_trans_execute,
- const char *inner_post_trans_execute)
+SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
+ switch_sql_queue_manager_t **qmp,
+ uint32_t numq, const char *dsn,
+ const char *pre_trans_execute,
+ const char *post_trans_execute,
+ const char *inner_pre_trans_execute,
+ const char *inner_post_trans_execute)
{
switch_memory_pool_t *pool;
switch_sql_queue_manager_t *qm;
qm->pool = pool;
qm->numq = numq;
qm->dsn = switch_core_strdup(qm->pool, dsn);
+ qm->name = switch_core_strdup(qm->pool, name);
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
+ switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_thread_cond_create(&qm->cond, qm->pool);
qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
+ qm->sizes = switch_core_alloc(qm->pool, sizeof(int) * numq);
+ qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
+ qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
for (i = 0; i < qm->numq; i++) {
switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
break;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
switch_yield(500000);
sanity--;
}
if (!qm->event_db) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
return NULL;
}
while (qm->thread_running == 1) {
int proceed = !!save_sql;
+ int pindex = -1;
if (!proceed) {
for (i = 0; i < qm->numq; i++) {
if (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
if (sql_manager.thread_running != 1) {
- free(pop);
- pop = NULL;
+ if (pop) {
+ switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
+ free(pop);
+ pop = NULL;
+ }
} else {
+ pindex = i;
proceed = 1;
break;
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "REALLOC QUEUE %ld %d %d\n",
+ "%s REALLOC QUEUE %ld %d %d\n",
+ qm->name,
(long int)sql_len,
i,
switch_queue_size(qm->sql_queue[i]));
}
}
if (!(tmp = realloc(sqlbuf, sql_len))) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
abort();
break;
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "SAVE QUEUE %d %d\n",
+ "%s SAVE QUEUE %d %d\n",
+ qm->name,
i,
switch_queue_size(qm->sql_queue[i]));
goto skip;
}
}
+
+ switch_mutex_lock(qm->mutex);
+ qm->pre_written[pindex]++;
+ switch_mutex_unlock(qm->mutex);
iterations++;
sprintf(sqlbuf + len, "%s;\n", sql);
free(sql);
sql = NULL;
} else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
break;
}
}
auto_pause = 1;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 1;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
}
} else {
if (auto_pause && lc < 1000) {
auto_pause = 0;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 0;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
}
}
wrote = 0;
if (trans && iterations && (iterations > target || !lc)) {
+
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
+ char line[128] = "";
+ int l;
+
+ switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
+
for (i = 0; i < qm->numq; i++) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
- "RUN QUEUE %d %d %d\n",
- i,
- switch_queue_size(qm->sql_queue[i]),
- iterations);
+ l = strlen(line);
+ switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
}
+
+ l = strlen(line);
+ switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
+
}
if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
qm->pre_trans_execute,
qm->inner_pre_trans_execute,
qm->inner_post_trans_execute
) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
}
-
iterations = 0;
trans = 0;
len = 0;
lc = qm_ttl(qm);
+ switch_mutex_lock(qm->mutex);
+ for (i = 0; i < qm->numq; i++) {
+ qm->sizes[i] = switch_queue_size(qm->sql_queue[i]);
+ qm->written[i] += qm->pre_written[i];
+ qm->pre_written[i] = 0;
+ }
+ switch_mutex_unlock(qm->mutex);
+
if (!lc) {
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
} else if (wrote) {
for(i = 0; i < qm->numq; i++) {
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
- switch_safe_free(pop);
+ if (pop) {
+ switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
+ free(pop);
+ }
}
}