switch_thread_t *node_thread;
int debug;
struct fifo_node *nodes;
+ char *pre_trans_execute;
+ char *post_trans_execute;
+ char *inner_pre_trans_execute;
+ char *inner_post_trans_execute;
+ switch_sql_queue_manager_t *qm;
} globals;
return dbh;
}
+static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block)
+{
+ int index = 1;
+ char *sql;
+
+ switch_assert(sqlp && *sqlp);
+ sql = *sqlp;
+
+
+ if (switch_stristr("insert", sql)) {
+ index = 0;
+ }
+
+ switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic);
+
+ if (sql_already_dynamic) {
+ *sqlp = NULL;
+ }
+
+ return SWITCH_STATUS_SUCCESS;
+}
+#if 0
static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
{
switch_cache_db_handle_t *dbh = NULL;
return status;
}
+#endif
static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
-
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
switch_str_nil(msg->string_array_arg[0]),
switch_str_nil(msg->string_array_arg[1]),
switch_core_session_get_uuid(session));
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
goto end;
default:
goto end;
);
}
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
epoch_start = (long)switch_epoch_time_now(NULL);
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 "
"where uuid='%q' and ring_count > 0", h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
}
"outbound_fail_total_count = outbound_fail_total_count+1, "
"next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
(long) switch_epoch_time_now(NULL), h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
}
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
end:
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
free(originate_string);
sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
"outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
(long) switch_epoch_time_now(NULL), h->uuid);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
del_bridge_call(outbound_id);
sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
now, now, outbound_id);
-
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
if (send_event) {
sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
(long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
- fifo_execute_sql(sql, globals.sql_mutex);
-
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")),
switch_epoch_time_now(NULL));
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
}
static void fifo_caller_del(const char *uuid)
sql = switch_mprintf("delete from fifo_callers");
}
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
}
switch_epoch_time_now(NULL), outbound_id);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
}
add_bridge_call(switch_core_session_get_uuid(other_session));
);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
"outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0",
now, now, outbound_id);
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
del_bridge_call(outbound_id);
switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
switch_core_media_bug_pause(session);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
}
+ } else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) {
+ globals.pre_trans_execute = switch_core_strdup(globals.pool, val);
+ } else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) {
+ globals.post_trans_execute = switch_core_strdup(globals.pool, val);
+ } else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) {
+ globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val);
+ } else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
+ globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
} else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
delete_all_outbound_member_on_startup = switch_true(val);
}
globals.dbname = "fifo";
}
+ switch_sql_queue_manager_init_name("fifo",
+ &globals.qm,
+ 2,
+ globals.odbc_dsn ? globals.odbc_dsn : globals.dbname,
+ SWITCH_MAX_TRANS,
+ globals.pre_trans_execute,
+ globals.post_trans_execute,
+ globals.inner_pre_trans_execute,
+ globals.inner_post_trans_execute);
+
+ switch_sql_queue_manager_start(globals.qm);
+
if (!(dbh = fifo_get_db_handle())) {
switch_cache_db_release_db_handle(&dbh);
if (!reload) {
- fifo_execute_sql("update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,"
- "outbound_call_count=0,outbound_fail_count=0 where static=0", globals.sql_mutex);
+ char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0";
+ fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE);
}
if (reload) {
sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
}
- fifo_execute_sql(sql, globals.sql_mutex);
- switch_safe_free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
if (!(node = switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME))) {
node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex);
(long) switch_epoch_time_now(NULL));
switch_assert(sql);
- fifo_execute_sql(sql, globals.sql_mutex);
- free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(name_dup);
node->has_outbound = 1;
node->member_count++;
sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest);
switch_assert(sql);
- fifo_execute_sql(sql, globals.sql_mutex);
- free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_mutex_lock(globals.mutex);
digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls,
(long)switch_epoch_time_now(NULL));
switch_assert(sql);
- fifo_execute_sql(sql, globals.sql_mutex);
- free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
free(name_dup);
cbt.buf = outbound_count;
sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname);
switch_assert(sql);
- fifo_execute_sql(sql, globals.sql_mutex);
- free(sql);
+ fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {