return rv;
}
+/**
+ * Retrieves the next item from the queue. If there are no
+ * items available, it will block until one becomes available, or
+ * until timeout is elapsed. Once retrieved, the item is placed into
+ * the address specified by'data'.
+ */
+APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout)
+{
+ apr_status_t rv;
+
+ if (queue->terminated) {
+ return APR_EOF; /* no more elements ever again */
+ }
+
+ rv = apr_thread_mutex_lock(queue->one_big_mutex);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ /* Keep waiting until we wake up and find that the queue is not empty. */
+ if (apr_queue_empty(queue)) {
+ if (!queue->terminated) {
+ queue->empty_waiters++;
+ rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout);
+ queue->empty_waiters--;
+ /* In the event of a timemout, APR_TIMEUP will be returned */
+ if (rv != APR_SUCCESS) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return rv;
+ }
+ }
+ /* If we wake up and it's still empty, then we were interrupted */
+ if (apr_queue_empty(queue)) {
+ Q_DBG("queue empty (intr)", queue);
+ rv = apr_thread_mutex_unlock(queue->one_big_mutex);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ if (queue->terminated) {
+ return APR_EOF; /* no more elements ever again */
+ }
+ else {
+ return APR_EINTR;
+ }
+ }
+ }
+
+ *data = queue->data[queue->out];
+ queue->nelts--;
+
+ queue->out = (queue->out + 1) % queue->bounds;
+ if (queue->full_waiters) {
+ Q_DBG("signal !full", queue);
+ rv = apr_thread_cond_signal(queue->not_full);
+ if (rv != APR_SUCCESS) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return rv;
+ }
+ }
+
+ rv = apr_thread_mutex_unlock(queue->one_big_mutex);
+ return rv;
+}
+
+
/**
* Retrieves the next item from the queue. If there are no
* items available, return APR_EAGAIN. Once retrieved,
*/
SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **data);
+/**
+ * pop/get an object from the queue, blocking if the queue is already empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout The amount of time in microseconds to wait. This is
+ * a maximum, not a minimum. If the condition is signaled, we
+ * will wake up before this time, otherwise the error APR_TIMEUP
+ * is returned.
+ * @returns APR_TIMEUP the request timed out
+ * @returns APR_EINTR the blocking was interrupted (try again)
+ * @returns APR_EOF if the queue has been terminated
+ * @returns APR_SUCCESS on a successfull pop
+ */
+SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout);
+
/**
* push/add a object to the queue, blocking if the queue is already full
*
void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
{
sofia_profile_t *profile = (sofia_profile_t *) obj;
- uint32_t ireg_loops = 0;
- uint32_t gateway_loops = 0;
- int loops = 0;
- uint32_t qsize;
- void *pop = NULL;
- int loop_count = 0;
- switch_size_t sql_len = 1024 * 32;
- char *tmp, *sqlbuf = NULL;
- char *sql = NULL;
+ uint32_t ireg_loops = IREG_SECONDS; /* Number of loop iterations done when we haven't checked for registrations */
+ uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */
+ void *pop = NULL; /* queue_pop placeholder */
+ switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */
+ char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */
+ char *sql = NULL; /* Current SQL statement */
+ switch_time_t last_commit; /* Last time we committed stuff to the DB */
+ switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */
+ switch_size_t len = 0; /* Current length of sqlbuf */
+ uint32_t statements = 0; /* Number of statements in the current sql buffer */
+
+ last_commit = last_check = switch_micro_time_now();
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
sqlbuf = (char *) malloc(sql_len);
}
- ireg_loops = IREG_SECONDS;
- gateway_loops = GATEWAY_SECONDS;
-
sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
- qsize = switch_queue_size(profile->sql_queue);
-
- while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) {
+ /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
+ while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
- if (qsize > 0 && (qsize >= 1024 || ++loop_count >= (int)profile->trans_timeout)) {
- switch_size_t newlen;
- uint32_t iterations = 0;
- switch_size_t len = 0;
-
- switch_mutex_lock(profile->ireg_mutex);
+ /* Do we have enough statements or is the timeout expired */
+ while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
+ (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
+
+ switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
- while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) {
+ if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
+ switch_size_t newlen;
+
if (!sql) sql = (char *) pop;
- newlen = strlen(sql) + 2;
- iterations++;
+ newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
if (len + newlen + 10 > sql_len) {
switch_size_t new_mlen = len + newlen + 10 + 10240;
}
sqlbuf = tmp;
} else {
- goto skip;
+ break;
}
}
len += newlen;
free(sql);
sql = NULL;
+
+ statements++;
}
-
- skip:
-
+ }
+
+ /* Execute here */
+ last_commit = switch_micro_time_now();
+
+ if (len) {
//printf("TRANS:\n%s\n", sqlbuf);
+ switch_mutex_lock(profile->ireg_mutex);
sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
//sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
switch_mutex_unlock(profile->ireg_mutex);
- loop_count = 0;
+ statements = 0;
+ len = 0;
}
+
} else {
- if (qsize) {
- //switch_mutex_lock(profile->ireg_mutex);
- while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
- sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
- free(pop);
- }
- //switch_mutex_unlock(profile->ireg_mutex);
+ if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
+ sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
+ free(pop);
}
}
- if (++loops >= 1000) {
-
-
-
+ if (switch_micro_time_now() - last_check >= 1000000) {
if (profile->watchdog_enabled) {
uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
if (event_fail || step_fail) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED!\n"
"GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
- switch_yield(2000);
+ switch_yield(2000000);
abort();
}
}
sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
gateway_loops = 0;
}
+
sofia_sub_check_gateway(profile, time(NULL));
- loops = 0;
+
+ last_check = switch_micro_time_now();
}
-
- switch_cond_next();
- qsize = switch_queue_size(profile->sql_queue);
}
switch_mutex_lock(profile->ireg_mutex);