\}
*/
-typedef int (*switch_db_event_callback_func_t) (void *pArg, switch_event_t *event);
+typedef int (*switch_core_db_event_callback_func_t) (void *pArg, switch_event_t *event);
#define CACHE_DB_LEN 256
typedef enum {
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
- const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err);
+ const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err);
SWITCH_DECLARE(pid_t) switch_fork(void);
}
struct helper {
- switch_db_event_callback_func_t callback;
+ switch_core_db_event_callback_func_t callback;
void *pdata;
};
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
- const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err)
+ const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err)
{
switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL;
return ttl;
}
+struct db_job {
+ switch_sql_queue_manager_t *qm;
+ char *sql;
+ switch_core_db_callback_func_t callback;
+ switch_core_db_event_callback_func_t event_callback;
+ void *pdata;
+ int event;
+ switch_memory_pool_t *pool;
+};
+
+static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj)
+{
+ struct db_job *job = (struct db_job *) obj;
+ switch_memory_pool_t *pool = job->pool;
+ char *err = NULL;
+ switch_cache_db_handle_t *dbh;
+
+
+ if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn);
+ return NULL;
+ }
+
+ if (job->callback) {
+ switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
+ } else if (job->event_callback) {
+ switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
+ }
+
+ if (err) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err);
+ free(err);
+ }
+
+ switch_cache_db_release_db_handle(&dbh);
+
+ if (pool) {
+ switch_core_destroy_memory_pool(&pool);
+ }
+
+ return NULL;
+}
+
+static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
+ switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata)
+{
+ switch_memory_pool_t *pool;
+ switch_thread_data_t *td;
+ struct db_job *job;
+ switch_core_new_memory_pool(&pool);
+
+ td = switch_core_alloc(pool, sizeof(*td));
+ job = switch_core_alloc(pool, sizeof(*job));
+
+ td->func = sql_in_thread;
+ td->obj = job;
+
+ job->sql = switch_core_strdup(pool, sql);
+ job->qm = qm;
+
+ if (callback) {
+ job->callback = callback;
+ } else if (event_callback) {
+ job->event_callback = event_callback;
+ }
+
+ job->pdata = pdata;
+ job->pool = pool;
+
+ return td;
+}
+
+
+SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm,
+ const char *sql, switch_core_db_callback_func_t callback, void *pdata)
+{
+
+ switch_thread_data_t *td;
+ if ((td = new_job(qm, sql, callback, NULL, pdata))) {
+ switch_thread_pool_launch_thread(&td);
+ }
+}
+
+SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
+ const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
+{
+
+ switch_thread_data_t *td;
+ if ((td = new_job(qm, sql, NULL, callback, pdata))) {
+ switch_thread_pool_launch_thread(&td);
+ }
+}
+
SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
{
int size = 0;