]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
Implement switch_queue_pop_timeout() and refactor sofia_profile_worker_thread_run...
authorMathieu Rene <mrene@avgs.ca>
Fri, 19 Nov 2010 02:00:21 +0000 (21:00 -0500)
committerMathieu Rene <mrene@avgs.ca>
Fri, 19 Nov 2010 02:00:35 +0000 (21:00 -0500)
libs/apr-util/.update
libs/apr-util/include/apr_queue.h
libs/apr-util/misc/apr_queue.c
src/include/switch_apr.h
src/mod/endpoints/mod_sofia/sofia.c
src/switch_apr.c

index f430e64b3bbe6c3ee1a06375ccb1a65542dcba5e..a573c805041b54b12d9a949f04cb5e9823739a3b 100644 (file)
@@ -1 +1 @@
-Mon Dec 28 14:55:57 EST 2009
+Thu 18 Nov 2010 20:56:38 EST
index 5a0181b29e0d36eef8ac67533bfeed508a9ad8b9..dcf0c137ed7d96cfbeab185e3608f294c8747750 100644 (file)
@@ -78,6 +78,22 @@ APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data);
  */
 APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data);
 
+/**
+ * pop/get an object from the queue, blocking if the queue is already empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout The amount of time in microseconds to wait. This is 
+ *        a maximum, not a minimum. If the condition is signaled, we 
+ *        will wake up before this time, otherwise the error APR_TIMEUP
+ *        is returned.
+ * @returns APR_TIMEUP the request timed out
+ * @returns APR_EINTR the blocking was interrupted (try again)
+ * @returns APR_EOF if the queue has been terminated
+ * @returns APR_SUCCESS on a successfull pop
+ */
+APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout);
+
 /**
  * push/add a object to the queue, returning immediatly if the queue is full
  *
index 28d79afcb5fcf158467e0fbe158e15edd7453a08..e905a53ebba4a6c97a2211d6f790d86b47bc6a84 100644 (file)
@@ -313,6 +313,71 @@ APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
     return rv;
 }
 
+/**
+ * Retrieves the next item from the queue. If there are no
+ * items available, it will block until one becomes available, or 
+ * until timeout is elapsed. Once retrieved, the item is placed into
+ * the address specified by'data'.
+ */
+APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout)
+{
+    apr_status_t rv;
+
+    if (queue->terminated) {
+        return APR_EOF; /* no more elements ever again */
+    }
+
+    rv = apr_thread_mutex_lock(queue->one_big_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    /* Keep waiting until we wake up and find that the queue is not empty. */
+    if (apr_queue_empty(queue)) {
+        if (!queue->terminated) {
+            queue->empty_waiters++;
+            rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout);
+            queue->empty_waiters--;
+                       /* In the event of a timemout, APR_TIMEUP will be returned */
+            if (rv != APR_SUCCESS) {
+                apr_thread_mutex_unlock(queue->one_big_mutex);
+                return rv;
+            }
+        }
+        /* If we wake up and it's still empty, then we were interrupted */
+        if (apr_queue_empty(queue)) {
+            Q_DBG("queue empty (intr)", queue);
+            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
+            if (rv != APR_SUCCESS) {
+                return rv;
+            }
+            if (queue->terminated) {
+                return APR_EOF; /* no more elements ever again */
+            }
+            else {
+                return APR_EINTR;
+            }
+        }
+    } 
+
+    *data = queue->data[queue->out];
+    queue->nelts--;
+
+    queue->out = (queue->out + 1) % queue->bounds;
+    if (queue->full_waiters) {
+        Q_DBG("signal !full", queue);
+        rv = apr_thread_cond_signal(queue->not_full);
+        if (rv != APR_SUCCESS) {
+            apr_thread_mutex_unlock(queue->one_big_mutex);
+            return rv;
+        }
+    }
+
+    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
+    return rv;
+}
+
+
 /**
  * Retrieves the next item from the queue. If there are no
  * items available, return APR_EAGAIN.  Once retrieved,
index f82b81a7f8d746ee8de736d110910331252671bf..dc81d1a522e89ffdefb8989d5894b0dc41fce33e 100644 (file)
@@ -592,6 +592,22 @@ SWITCH_DECLARE(switch_status_t) switch_queue_create(switch_queue_t ** queue, uns
  */
 SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **data);
 
+/**
+ * pop/get an object from the queue, blocking if the queue is already empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout The amount of time in microseconds to wait. This is
+ *        a maximum, not a minimum. If the condition is signaled, we
+ *        will wake up before this time, otherwise the error APR_TIMEUP
+ *        is returned.
+ * @returns APR_TIMEUP the request timed out
+ * @returns APR_EINTR the blocking was interrupted (try again)
+ * @returns APR_EOF if the queue has been terminated
+ * @returns APR_SUCCESS on a successfull pop
+ */
+SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout);
+
 /**
  * push/add a object to the queue, blocking if the queue is already full
  *
index 5bb2ff0d66be712570c19f958a70368d67ee3960..9fadced17f9b228d753f94af217838b1abacdad5 100644 (file)
@@ -1230,43 +1230,42 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char *
 void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
 {
        sofia_profile_t *profile = (sofia_profile_t *) obj;
-       uint32_t ireg_loops = 0;
-       uint32_t gateway_loops = 0;
-       int loops = 0;
-       uint32_t qsize;
-       void *pop = NULL;
-       int loop_count = 0;
-       switch_size_t sql_len = 1024 * 32;
-       char *tmp, *sqlbuf = NULL;
-       char *sql = NULL;
+       uint32_t ireg_loops = IREG_SECONDS;                                     /* Number of loop iterations done when we haven't checked for registrations */
+       uint32_t gateway_loops = GATEWAY_SECONDS;                       /* Number of loop iterations done when we haven't checked for gateways */
+       void *pop = NULL;                                       /* queue_pop placeholder */
+       switch_size_t sql_len = 1024 * 32;      /* length of sqlbuf */
+       char *tmp, *sqlbuf = NULL;                      /* Buffer for SQL statements */
+       char *sql = NULL;                                       /* Current SQL statement */
+       switch_time_t last_commit;                      /* Last time we committed stuff to the DB */
+       switch_time_t last_check;                       /* Last time we did the second-resolution loop that checks various stuff */
+       switch_size_t len = 0;                          /* Current length of sqlbuf */
+       uint32_t statements = 0;                        /* Number of statements in the current sql buffer */
+       
+       last_commit = last_check = switch_micro_time_now();
        
        if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
                sqlbuf = (char *) malloc(sql_len);
        }
 
-       ireg_loops = IREG_SECONDS;
-       gateway_loops = GATEWAY_SECONDS;
-
        sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
 
        switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
 
-       qsize = switch_queue_size(profile->sql_queue);
-
-       while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) {
+       /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
+       while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
                if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
-                       if (qsize > 0 && (qsize >= 1024 || ++loop_count >= (int)profile->trans_timeout)) {
-                               switch_size_t newlen;
-                               uint32_t iterations = 0;
-                               switch_size_t len = 0;
-
-                               switch_mutex_lock(profile->ireg_mutex);
+                       /* Do we have enough statements or is the timeout expired */
+                       while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
+                                       (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
+                               
+                               switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
                                
-                               while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) {
+                               if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
+                                       switch_size_t newlen;
+                                       
                                        if (!sql) sql = (char *) pop;
 
-                                       newlen = strlen(sql) + 2;
-                                       iterations++;
+                                       newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
 
                                        if (len + newlen + 10 > sql_len) {
                                                switch_size_t new_mlen = len + newlen + 10 + 10240;
@@ -1280,7 +1279,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
                                                        }
                                                        sqlbuf = tmp;
                                                } else {
-                                                       goto skip;
+                                                       break;
                                                }
                                        }
 
@@ -1288,31 +1287,32 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
                                        len += newlen;
                                        free(sql);
                                        sql = NULL;
+                                       
+                                       statements++;
                                }
-
-                       skip:
-
+                       }
+                       
+                       /* Execute here */
+                       last_commit = switch_micro_time_now();
+                       
+                       if (len) {
                                //printf("TRANS:\n%s\n", sqlbuf);
+                               switch_mutex_lock(profile->ireg_mutex);
                                sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
                                //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
                                switch_mutex_unlock(profile->ireg_mutex);
-                               loop_count = 0;
+                               statements = 0;
+                               len = 0;
                        }
+
                } else {
-                       if (qsize) {
-                               //switch_mutex_lock(profile->ireg_mutex);
-                               while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
-                                       sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
-                                       free(pop);
-                               }
-                               //switch_mutex_unlock(profile->ireg_mutex);
+                       if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
+                               sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
+                               free(pop);
                        }
                }
 
-               if (++loops >= 1000) {
-
-                       
-
+               if (switch_micro_time_now() - last_check >= 1000000) {
                        if (profile->watchdog_enabled) {
                                uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
                                
@@ -1339,7 +1339,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
                                if (event_fail || step_fail) {
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED!\n"
                                                                          "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
-                                       switch_yield(2000);
+                                       switch_yield(2000000);
                                        abort();
                                }
                        }
@@ -1354,12 +1354,11 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
                                sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
                                gateway_loops = 0;
                        }
+                       
                        sofia_sub_check_gateway(profile, time(NULL));
-                       loops = 0;
+                       
+                       last_check = switch_micro_time_now();
                }
-
-               switch_cond_next();
-               qsize = switch_queue_size(profile->sql_queue);
        }
 
        switch_mutex_lock(profile->ireg_mutex);
index bdff179b4a0782b6c66bbb27cfe6cc3772947d61..dccc37be9625fc20c7d99b7776d0791fbe70a83e 100644 (file)
@@ -986,6 +986,12 @@ SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **d
        return apr_queue_pop(queue, data);
 }
 
+SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
+{
+       return apr_queue_pop_timeout(queue, data, timeout);
+}
+
+
 SWITCH_DECLARE(switch_status_t) switch_queue_push(switch_queue_t *queue, void *data)
 {
        apr_status_t s;