]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fifo tweaks
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Jul 2010 00:40:13 +0000 (19:40 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Jul 2010 00:40:13 +0000 (19:40 -0500)
src/mod/applications/mod_fifo/mod_fifo.c

index e5771b5a11316090ec73e8c5502c27a282d3b714..d4e572030a70ee37ab2f6f926ffc7e4352cc9a93 100644 (file)
@@ -1149,10 +1149,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
 
 
        if (node) {
-               switch_mutex_lock(node->mutex);
-               //node->busy = switch_epoch_time_now(NULL) + 600;
+               switch_thread_rwlock_wrlock(node->rwlock);
+               node->busy = 0;
                node->ring_consumer_count = 1;
-               switch_mutex_unlock(node->mutex);
+               switch_thread_rwlock_unlock(node->rwlock);
        } else {
                goto end;
        }
@@ -1319,7 +1319,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
                                        char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
                                                                                           "outbound_fail_count=outbound_fail_count+1, "
                                                                                           "outbound_fail_total_count = outbound_fail_total_count+1, "
-                                                                                          "next_avail=%ld + lag where uuid='%q' and ring_count > 0",
+                                                                                          "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
                                                                                           (long) switch_epoch_time_now(NULL), h->uuid);
                                        fifo_execute_sql(sql, globals.sql_mutex);
                                        switch_safe_free(sql);
@@ -1389,6 +1389,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
 
        cbh->ready = 1;
 
+       if (node) {
+               switch_thread_rwlock_wrlock(node->rwlock);
+               node->ring_consumer_count = 0;
+               node->busy = 0;
+               switch_thread_rwlock_unlock(node->rwlock);
+       }
+
+
        for (i = 0; i < cbh->rowcount; i++) {
                struct call_helper *h = cbh->rows[i];
                del_consumer_outbound_call(h->uuid);
@@ -1404,14 +1412,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
        if (pop_dup) {
                switch_event_destroy(&pop_dup);
        }
-
-       if (node) {
-               switch_mutex_lock(node->mutex);
-               node->ring_consumer_count = 0;
-               //node->busy = switch_epoch_time_now(NULL) + connected;
-               switch_mutex_unlock(node->mutex);
-       }
-
+       
        switch_core_destroy_memory_pool(&cbh->pool);
 
        return NULL;
@@ -1439,10 +1440,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
        switch_mutex_unlock(globals.mutex);
 
        if (node) {
-               switch_mutex_lock(node->mutex);
+               switch_thread_rwlock_wrlock(node->rwlock);
                node->ring_consumer_count++;
-               //node->busy = switch_epoch_time_now(NULL) + 600;
-               switch_mutex_unlock(node->mutex);
+               node->busy = 0;
+               switch_thread_rwlock_unlock(node->rwlock);
        }
 
        switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
@@ -1485,7 +1486,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
        if (status != SWITCH_STATUS_SUCCESS) {
 
                sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
-                                                        "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%q' and use_count > 0",
+                                                        "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q' and use_count > 0",
                                                         (long) switch_epoch_time_now(NULL), h->uuid);
                fifo_execute_sql(sql, globals.sql_mutex);
                switch_safe_free(sql);
@@ -1539,12 +1540,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
 
        switch_event_destroy(&ovars);
        if (node) {
-               switch_mutex_lock(node->mutex);
+               switch_thread_rwlock_wrlock(node->rwlock);
                if (node->ring_consumer_count-- < 0) {
                        node->ring_consumer_count = 0;
                }
-               //node->busy = switch_epoch_time_now(NULL) + connected;
-               switch_mutex_unlock(node->mutex);
+               node->busy = 0;
+               switch_thread_rwlock_unlock(node->rwlock);
        }
        switch_core_destroy_memory_pool(&h->pool);
 
@@ -1652,15 +1653,10 @@ static void find_consumers(fifo_node_t *node)
                        fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
 
                        if (cbh->rowcount) {
-                               int sanity = 40;
-
                                switch_threadattr_create(&thd_attr, cbh->pool);
                                switch_threadattr_detach_set(thd_attr, 1);
                                switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
                                switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
-                               while(--sanity > 0 && !cbh->ready) {
-                                       switch_yield(100000);
-                               }
                        }
 
                }
@@ -1689,8 +1685,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
                for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
                        switch_hash_this(hi, &var, NULL, &val);
                        if ((node = (fifo_node_t *) val)) {
-                               switch_mutex_lock(node->mutex);
-                               if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) {
+                               if (node->has_outbound && node->ready && !node->busy) {
                                        ppl_waiting = node_consumer_wait_count(node);
                                        consumer_total = node->consumer_count;
                                        idle_consumers = node_idle_consumers(node);
@@ -1704,7 +1699,6 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
                                                switch_yield(1000000);
                                        }
                                }
-                               switch_mutex_unlock(node->mutex);
                        }
                }
                switch_mutex_unlock(globals.mutex);
@@ -1922,8 +1916,9 @@ static void dec_use_count(switch_channel_t *channel)
 
        if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
                del_bridge_call(outbound_id);
-               sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'", 
+               sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'", 
                                                         now, now, outbound_id);
+               
                fifo_execute_sql(sql, globals.sql_mutex);
                switch_safe_free(sql);
        }
@@ -1963,9 +1958,15 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
                return;
        }
 
+       if (switch_true(switch_channel_get_variable(channel, "fifo_track_call"))) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s trying to double-track call!\n", switch_channel_get_name(channel));
+               return;
+       }
+
        add_bridge_call(data);
 
        switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
+       switch_channel_set_variable(channel, "fifo_track_call", "true");
        
        if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
                col1 = "manual_calls_in_count";
@@ -1978,6 +1979,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function)
        sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'", 
                                                 (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
        fifo_execute_sql(sql, globals.sql_mutex);
+
        switch_safe_free(sql);
 
        
@@ -2218,7 +2220,7 @@ SWITCH_STANDARD_APP(fifo_function)
 
                switch_channel_answer(channel);
 
-               switch_mutex_lock(node->mutex);
+               switch_thread_rwlock_wrlock(node->rwlock);
                node->caller_count++;
 
                if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
@@ -2257,7 +2259,7 @@ SWITCH_STANDARD_APP(fifo_function)
                        switch_channel_set_variable(channel, "fifo_priority", tmp);
                }
 
-               switch_mutex_unlock(node->mutex);
+               switch_thread_rwlock_unlock(node->rwlock);
 
                ts = switch_micro_time_now();
                switch_time_exp_lt(&tm, ts);
@@ -2342,6 +2344,8 @@ SWITCH_STANDARD_APP(fifo_function)
                switch_channel_clear_app_flag(channel, CF_APP_TAGGED);
 
          abort:
+               
+               fifo_caller_del(switch_core_session_get_uuid(session));
 
                if (!aborted && switch_channel_ready(channel)) {
                        switch_channel_set_state(channel, CS_HIBERNATE);
@@ -2361,10 +2365,10 @@ SWITCH_STANDARD_APP(fifo_function)
                        }
 
                        switch_mutex_lock(globals.mutex);
-                       switch_mutex_lock(node->mutex);
+                       switch_thread_rwlock_wrlock(node->rwlock);
                        node_remove_uuid(node, uuid);
                        node->caller_count--;
-                       switch_mutex_unlock(node->mutex);
+                       switch_thread_rwlock_unlock(node->rwlock);
                        send_presence(node);
                        check_cancel(node);
                        switch_mutex_unlock(globals.mutex);
@@ -2588,15 +2592,10 @@ SWITCH_STANDARD_APP(fifo_function)
                                        }
                                }
 
-                               if (pop) {
-                                       fifo_caller_del(switch_str_nil(switch_event_get_header(pop, "unique-id")));
-                               }
-
-
                                if (pop && !node_consumer_wait_count(node)) {
-                                       switch_mutex_lock(node->mutex);
+                                       switch_thread_rwlock_wrlock(node->rwlock);
                                        node->start_waiting = 0;
-                                       switch_mutex_unlock(node->mutex);
+                                       switch_thread_rwlock_unlock(node->rwlock);
                                }
                        }
 
@@ -2710,9 +2709,9 @@ SWITCH_STANDARD_APP(fifo_function)
                                        const char *arg = switch_channel_get_variable(other_channel, "current_application_data");
                                        switch_caller_extension_t *extension = NULL;
 
-                                       switch_mutex_lock(node->mutex);
+                                       switch_thread_rwlock_wrlock(node->rwlock);
                                        node->caller_count--;
-                                       switch_mutex_unlock(node->mutex);
+                                       switch_thread_rwlock_unlock(node->rwlock);
                                        send_presence(node);
                                        check_cancel(node);
 
@@ -2786,6 +2785,8 @@ SWITCH_STANDARD_APP(fifo_function)
 
                                        sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'", 
                                                                                 switch_epoch_time_now(NULL), outbound_id);
+
+
                                        fifo_execute_sql(sql, globals.sql_mutex);
                                        switch_safe_free(sql);
                                }
@@ -2817,7 +2818,7 @@ SWITCH_STANDARD_APP(fifo_function)
                                        
                                        sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, "
                                                                                 "outbound_call_total_count=outbound_call_total_count+1, "
-                                                                                "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag where uuid='%s' and use_count > 0", 
+                                                                                "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0", 
                                                                                 now, now, outbound_id);
 
                                        fifo_execute_sql(sql, globals.sql_mutex);
@@ -2876,9 +2877,9 @@ SWITCH_STANDARD_APP(fifo_function)
                                switch_channel_set_variable(other_channel, "fifo_status", "DONE");
                                switch_channel_set_variable(other_channel, "fifo_timestamp", date);
 
-                               switch_mutex_lock(node->mutex);
+                               switch_thread_rwlock_wrlock(node->rwlock);
                                node->caller_count--;
-                               switch_mutex_unlock(node->mutex);
+                               switch_thread_rwlock_unlock(node->rwlock);
                                send_presence(node);
                                check_cancel(node);
                                switch_core_session_rwunlock(other_session);
@@ -3523,9 +3524,9 @@ SWITCH_STANDARD_API(fifo_api_function)
                                switch_hash_this(hi, &var, NULL, &val);
                                node = (fifo_node_t *) val;
                                len = node_consumer_wait_count(node);
-                               switch_mutex_lock(node->mutex);
+                               switch_thread_rwlock_wrlock(node->rwlock);
                                stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node->caller_count, len);
-                               switch_mutex_unlock(node->mutex);
+                               switch_thread_rwlock_unlock(node->rwlock);
                                x++;
                        }
 
@@ -3534,9 +3535,9 @@ SWITCH_STANDARD_API(fifo_api_function)
                        }
                } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
                        len = node_consumer_wait_count(node);
-                       switch_mutex_lock(node->mutex);
+                       switch_thread_rwlock_wrlock(node->rwlock);
                        stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len);
-                       switch_mutex_unlock(node->mutex);
+                       switch_thread_rwlock_unlock(node->rwlock);
                } else {
                        stream->write_function(stream, "none\n");
                }
@@ -3546,9 +3547,9 @@ SWITCH_STANDARD_API(fifo_api_function)
                                switch_hash_this(hi, &var, NULL, &val);
                                node = (fifo_node_t *) val;
                                len = node_consumer_wait_count(node);
-                               switch_mutex_lock(node->mutex);
+                               switch_thread_rwlock_wrlock(node->rwlock);
                                stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
-                               switch_mutex_unlock(node->mutex);
+                               switch_thread_rwlock_unlock(node->rwlock);
                                x++;
                        }
 
@@ -3557,9 +3558,9 @@ SWITCH_STANDARD_API(fifo_api_function)
                        }
                } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
                        len = node_consumer_wait_count(node);
-                       switch_mutex_lock(node->mutex);
+                       switch_thread_rwlock_wrlock(node->rwlock);
                        stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
-                       switch_mutex_unlock(node->mutex);
+                       switch_thread_rwlock_unlock(node->rwlock);
                } else {
                        stream->write_function(stream, "none\n");
                }