]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fix sql queue manager issues
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Nov 2012 03:15:31 +0000 (21:15 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 22 Nov 2012 03:15:36 +0000 (21:15 -0600)
src/mod/endpoints/mod_sofia/sofia_presence.c
src/switch_core_sqldb.c

index e11d96ff76a4d6c4bb716d67ff115bf543014cf9..53b524a2369071762baa7227db1eb17dba68cf97 100644 (file)
@@ -873,13 +873,13 @@ static void do_dialog_probe(switch_event_t *event)
                if (mod_sofia_globals.debug_presence > 1) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s DUMP DIALOG_PROBE set version sql:\n%s\n", profile->name, sql);
                }
-               sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
+               sofia_glue_execute_sql_soon(profile, &sql, SWITCH_TRUE);
                switch_safe_free(sql);
 
 
                // The dialog_probe_callback has built up the dialogs to be included in the NOTIFY.
                // Now send the "full" dialog event to the triggering subscription.
-               sql = switch_mprintf("select call_id,expires,sub_to_user,sub_to_host,event,version, "
+               sql = switch_mprintf("select call_id,expires,sub_to_user,sub_to_host,event,version+1, "
                                                         "'full',full_to,full_from,contact,network_ip,network_port "
                                                         "from sip_subscriptions "
                                                         "where hostname='%q' and profile_name='%q' and sub_to_user='%q' and sub_to_host='%q' and call_id='%q'",
@@ -4543,7 +4543,7 @@ void sofia_presence_check_subscriptions(sofia_profile_t *profile, time_t now)
                                                         "((expires > 0 and expires <= %ld)) and profile_name='%q' and hostname='%q'", 
                                                         (long) now, profile->name, mod_sofia_globals.hostname);
 
-               sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
+               sofia_glue_execute_sql_soon(profile, &sql, SWITCH_TRUE);
                switch_safe_free(sql);
 
                sql = switch_mprintf("select full_to, full_from, contact, -1, call_id, event, network_ip, network_port, "
index f396d18f2b592ce40fdaa3c9eeed391ee56dbe19..556306ec976af84e322dc4e909b93adf4f320f58 100644 (file)
@@ -1314,10 +1314,12 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
 }
 
 
-static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
+static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
 {
        void *pop = NULL;
-       
+       switch_queue_t *q = qm->sql_queue[i];
+
+       switch_mutex_lock(qm->mutex);
        while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
                if (pop) {
                        if (dbh) {
@@ -1326,6 +1328,7 @@ static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
                        free(pop);
                }
        }
+       switch_mutex_unlock(qm->mutex);
 
 }
 
@@ -1347,7 +1350,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queu
 
 
        for(i = 0; i < qm->numq; i++) {
-               do_flush(qm->sql_queue[i], NULL);
+               do_flush(qm, i, NULL);
        }
 
        pool = qm->pool;
@@ -1408,7 +1411,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
        switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
        written = qm->written[pos];
        size = switch_sql_queue_manager_size(qm, pos);
-       want = written + size;
+       want = written + qm->pre_written[pos] + size;
        switch_mutex_unlock(qm->mutex);
 
        qm_wake(qm);
@@ -1563,7 +1566,9 @@ static uint32_t do_trans(switch_sql_queue_manager_t *qm)
 
                if (pop) {
                        if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
+                               switch_mutex_lock(qm->mutex);
                                qm->pre_written[i]++;
+                               switch_mutex_unlock(qm->mutex);
                                ttl++;
                        }
                        free(pop);
@@ -1633,7 +1638,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
        uint32_t sanity = 120;
        switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
-       uint32_t i, countdown = 0;
+       uint32_t i;
 
        while (!qm->event_db) {
                if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
@@ -1674,7 +1679,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
                if (sql_manager.paused) {
                        for (i = 0; i < qm->numq; i++) {
-                               do_flush(qm->sql_queue[i], NULL);
+                               do_flush(qm, i, NULL);
                        }
                        goto check;
                }
@@ -1707,21 +1712,19 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
        check:
 
-               countdown = 40;
-
-               while (--countdown && (lc = qm_ttl(qm)) < qm->max_trans / 4) {
-                       if (lc == 0) {
-                               switch_thread_cond_wait(qm->cond, qm->cond_mutex);                                      
-                               break;
-                       }
-                       switch_yield(5000);
+               if ((lc = qm_ttl(qm)) < qm->max_trans / 4) {
+                       switch_yield(500000);
+               } else if (lc == 0) {
+                       switch_thread_cond_wait(qm->cond, qm->cond_mutex);
+               } else {
+                       switch_cond_next();
                }
        }
 
        switch_mutex_unlock(qm->cond_mutex);
 
        for(i = 0; i < qm->numq; i++) {
-               do_flush(qm->sql_queue[i], qm->event_db);
+               do_flush(qm, i, qm->event_db);
        }
 
        qm->thread_running = 0;