]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
let mod_fifo use sql_queue_manager
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 29 Nov 2012 05:11:31 +0000 (23:11 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 29 Nov 2012 05:11:31 +0000 (23:11 -0600)
src/mod/applications/mod_fifo/mod_fifo.c

index 78e8950b402ecbb3a467afb88a5a3c91c91ed362..8970803f44dc25870a7b4579dc460da8c6dd3f24 100644 (file)
@@ -594,6 +594,11 @@ static struct {
        switch_thread_t *node_thread;
        int debug;
        struct fifo_node *nodes;
+       char *pre_trans_execute;
+       char *post_trans_execute;
+       char *inner_pre_trans_execute;
+       char *inner_post_trans_execute; 
+       switch_sql_queue_manager_t *qm;
 } globals;
 
 
@@ -742,7 +747,29 @@ switch_cache_db_handle_t *fifo_get_db_handle(void)
        return dbh;
 }
 
+static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block)
+{
+       int index = 1;
+       char *sql;
+
+       switch_assert(sqlp && *sqlp);
+       sql = *sqlp;    
+
+
+       if (switch_stristr("insert", sql)) {
+               index = 0;
+       }
+
+       switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic);
+
+       if (sql_already_dynamic) {
+               *sqlp = NULL;
+       }
+
+       return SWITCH_STATUS_SUCCESS;
 
+}
+#if 0
 static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
 {
        switch_cache_db_handle_t *dbh = NULL;
@@ -771,6 +798,7 @@ static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
 
        return status;
 }
+#endif
 
 static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
 {
@@ -937,9 +965,7 @@ static void do_unbridge(switch_core_session_t *consumer_session, switch_core_ses
                switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
 
                sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
-
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
 
                switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
@@ -1025,8 +1051,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
                                                         switch_str_nil(msg->string_array_arg[0]),
                                                         switch_str_nil(msg->string_array_arg[1]),
                                                         switch_core_session_get_uuid(session));
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
                goto end;
        default:
                goto end;
@@ -1124,8 +1149,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_
                                                                         );
                        }
 
-                       fifo_execute_sql(sql, globals.sql_mutex);
-                       switch_safe_free(sql);
+                       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
 
                        epoch_start = (long)switch_epoch_time_now(NULL);
@@ -1381,8 +1405,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                struct call_helper *h = cbh->rows[i];
                char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
 
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
        }
 
@@ -1424,8 +1447,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                                        struct call_helper *h = cbh->rows[i];
                                        char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 "
                                                                                           "where uuid='%q' and ring_count > 0", h->uuid);
-                                       fifo_execute_sql(sql, globals.sql_mutex);
-                                       switch_safe_free(sql);
+                                       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
                                }
 
                        }
@@ -1439,8 +1461,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                                                                                           "outbound_fail_total_count = outbound_fail_total_count+1, "
                                                                                           "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
                                                                                           (long) switch_epoch_time_now(NULL), h->uuid);
-                                       fifo_execute_sql(sql, globals.sql_mutex);
-                                       switch_safe_free(sql);
+                                       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
                                }
                        }
@@ -1497,8 +1518,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        for (i = 0; i < cbh->rowcount; i++) {
                struct call_helper *h = cbh->rows[i];
                char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0",  h->uuid);
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
        }
 
   end:
@@ -1608,8 +1628,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
 
 
        sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid);
-       fifo_execute_sql(sql, globals.sql_mutex);
-       switch_safe_free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
        status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
        free(originate_string);
@@ -1619,8 +1638,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
                sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
                                                         "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
                                                         (long) switch_epoch_time_now(NULL), h->uuid);
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
                if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
                        switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
@@ -2123,15 +2141,12 @@ static void dec_use_count(switch_core_session_t *session, switch_bool_t send_eve
 
 
                sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
                del_bridge_call(outbound_id);
                sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
                                                         now, now, outbound_id);
-
-               fifo_execute_sql(sql, globals.sql_mutex);
-               switch_safe_free(sql);
+               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
        }
 
        if (send_event) {
@@ -2198,9 +2213,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
 
        sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
                                                 (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
-       fifo_execute_sql(sql, globals.sql_mutex);
-
-       switch_safe_free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
 
        if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
@@ -2235,8 +2248,7 @@ static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session)
                                                 switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")),
                                                 switch_epoch_time_now(NULL));
 
-       fifo_execute_sql(sql, globals.sql_mutex);
-       switch_safe_free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 }
 
 static void fifo_caller_del(const char *uuid)
@@ -2249,8 +2261,7 @@ static void fifo_caller_del(const char *uuid)
                sql = switch_mprintf("delete from fifo_callers");
        }
 
-       fifo_execute_sql(sql, globals.sql_mutex);
-       switch_safe_free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
 }
 
@@ -3018,8 +3029,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                                                                 switch_epoch_time_now(NULL), outbound_id);
 
 
-                                       fifo_execute_sql(sql, globals.sql_mutex);
-                                       switch_safe_free(sql);
+                                       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
                                }
 
                                add_bridge_call(switch_core_session_get_uuid(other_session));
@@ -3038,8 +3048,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                                                         );
 
 
-                               fifo_execute_sql(sql, globals.sql_mutex);
-                               switch_safe_free(sql);
+                               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
 
                                switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
@@ -3055,8 +3064,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                                                                 "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0",
                                                                                 now, now, outbound_id);
 
-                                       fifo_execute_sql(sql, globals.sql_mutex);
-                                       switch_safe_free(sql);
+                                       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
                                        del_bridge_call(outbound_id);
 
@@ -3088,8 +3096,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
 
                                sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
-                               fifo_execute_sql(sql, globals.sql_mutex);
-                               switch_safe_free(sql);
+                               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
 
 
                                switch_core_media_bug_pause(session);
@@ -4009,6 +4016,14 @@ static switch_status_t load_config(int reload, int del_all)
                                } else {
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
                                }
+                       } else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) {
+                               globals.pre_trans_execute = switch_core_strdup(globals.pool, val);
+                       } else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) {
+                               globals.post_trans_execute = switch_core_strdup(globals.pool, val);
+                       } else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) {
+                               globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val);
+                       } else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
+                               globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
                        } else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
                                delete_all_outbound_member_on_startup = switch_true(val);
                        }
@@ -4019,6 +4034,18 @@ static switch_status_t load_config(int reload, int del_all)
                globals.dbname = "fifo";
        }
 
+       switch_sql_queue_manager_init_name("fifo",
+                                                                          &globals.qm,
+                                                                          2,
+                                                                          globals.odbc_dsn ? globals.odbc_dsn : globals.dbname,
+                                                                          SWITCH_MAX_TRANS,
+                                                                          globals.pre_trans_execute,
+                                                                          globals.post_trans_execute,
+                                                                          globals.inner_pre_trans_execute,
+                                                                          globals.inner_post_trans_execute);
+
+       switch_sql_queue_manager_start(globals.qm);
+
 
 
        if (!(dbh = fifo_get_db_handle())) {
@@ -4036,8 +4063,8 @@ static switch_status_t load_config(int reload, int del_all)
        switch_cache_db_release_db_handle(&dbh);
 
        if (!reload) {
-               fifo_execute_sql("update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,"
-                                                "outbound_call_count=0,outbound_fail_count=0 where static=0", globals.sql_mutex);
+               char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0";
+               fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE);
        }
 
        if (reload) {
@@ -4060,8 +4087,7 @@ static switch_status_t load_config(int reload, int del_all)
                sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
        }
 
-       fifo_execute_sql(sql, globals.sql_mutex);
-       switch_safe_free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
        if (!(node = switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME))) {
                node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex);
@@ -4214,8 +4240,7 @@ static switch_status_t load_config(int reload, int del_all)
                                                                         (long) switch_epoch_time_now(NULL));
 
                                switch_assert(sql);
-                               fifo_execute_sql(sql, globals.sql_mutex);
-                               free(sql);
+                               fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
                                free(name_dup);
                                node->has_outbound = 1;
                                node->member_count++;
@@ -4270,8 +4295,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
 
        sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest);
        switch_assert(sql);
-       fifo_execute_sql(sql, globals.sql_mutex);
-       free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
 
        switch_mutex_lock(globals.mutex);
@@ -4293,8 +4317,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
                                                 digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls,
                                                 (long)switch_epoch_time_now(NULL));
        switch_assert(sql);
-       fifo_execute_sql(sql, globals.sql_mutex);
-       free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
        free(name_dup);
 
     cbt.buf = outbound_count; 
@@ -4329,8 +4352,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string)
 
        sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname);
        switch_assert(sql);
-       fifo_execute_sql(sql, globals.sql_mutex);
-       free(sql);
+       fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
 
        switch_mutex_lock(globals.mutex);
        if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {