]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
add async select feature to sql queue manager api
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 31 Jan 2013 14:59:21 +0000 (08:59 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 31 Jan 2013 14:59:57 +0000 (08:59 -0600)
src/include/switch_core.h
src/switch_core_sqldb.c

index 575e4601aee8b8b0fbc1cb32f9d69b4801c072e6..c53446fb3ee8d3c06dae9f494b258dc8c55532e2 100644 (file)
@@ -2226,7 +2226,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_resume(void);
   \}
 */
 
-typedef int (*switch_db_event_callback_func_t) (void *pArg, switch_event_t *event);
+typedef int (*switch_core_db_event_callback_func_t) (void *pArg, switch_event_t *event);
 
 #define CACHE_DB_LEN 256
 typedef enum {
@@ -2461,7 +2461,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
-                                                                                                                                                  const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err);
+                                                                                                                                                  const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err);
                                                        
 SWITCH_DECLARE(pid_t) switch_fork(void);
 
index 64e72c1bbb1ebf9dbf3b53b381beb1d04a350c79..a4d16a3f155085189cf9886e480c58aac23d8800 100644 (file)
@@ -1017,7 +1017,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(sw
 }
 
 struct helper {
-       switch_db_event_callback_func_t callback;
+       switch_core_db_event_callback_func_t callback;
        void *pdata;
 };
 
@@ -1037,7 +1037,7 @@ static int helper_callback(void *pArg, int argc, char **argv, char **columnNames
 }
 
 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
-                                                                                                                                        const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err)
+                                                                                                                                        const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err)
 {
        switch_status_t status = SWITCH_STATUS_FALSE;
        char *errmsg = NULL;
@@ -1287,6 +1287,99 @@ static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
        return ttl;
 }
 
+struct db_job {
+       switch_sql_queue_manager_t *qm;
+       char *sql;
+       switch_core_db_callback_func_t callback;
+       switch_core_db_event_callback_func_t event_callback;
+       void *pdata;
+       int event;
+       switch_memory_pool_t *pool;
+};
+
+static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj)
+{
+       struct db_job *job = (struct db_job *) obj;
+       switch_memory_pool_t *pool = job->pool;
+       char *err = NULL;
+       switch_cache_db_handle_t *dbh;
+
+
+       if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn);
+               return NULL;
+       }
+
+       if (job->callback) {
+               switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
+       } else if (job->event_callback) {
+               switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
+       }
+
+       if (err) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err);
+               free(err);
+       }
+       
+       switch_cache_db_release_db_handle(&dbh);
+       
+       if (pool) {
+               switch_core_destroy_memory_pool(&pool);
+       }
+
+       return NULL;
+}
+
+static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql, 
+                                                                        switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata)
+{
+       switch_memory_pool_t *pool;
+       switch_thread_data_t *td;
+       struct db_job *job;
+       switch_core_new_memory_pool(&pool);
+
+       td = switch_core_alloc(pool, sizeof(*td));
+       job = switch_core_alloc(pool, sizeof(*job));
+
+       td->func = sql_in_thread;
+       td->obj = job;
+
+       job->sql = switch_core_strdup(pool, sql);
+       job->qm = qm;
+
+       if (callback) {
+               job->callback = callback;
+       } else if (event_callback) {
+               job->event_callback = event_callback;
+       }
+
+       job->pdata = pdata;
+       job->pool = pool;
+
+       return td;
+}
+
+
+SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm, 
+                                                                                                                                  const char *sql, switch_core_db_callback_func_t callback, void *pdata)
+{
+       
+       switch_thread_data_t *td;
+       if ((td = new_job(qm, sql, callback, NULL, pdata))) {
+               switch_thread_pool_launch_thread(&td);
+       }
+}
+
+SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm, 
+                                                                                                                                               const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
+{
+       
+       switch_thread_data_t *td;
+       if ((td = new_job(qm, sql, NULL, callback, pdata))) {
+               switch_thread_pool_launch_thread(&td);
+       }
+}
+
 SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
 {
        int size = 0;