SWITCH_DECLARE(char *)switch_html_strip(const char *str);
+/**
+ * switch_thread_ctl is the right thing if you need a thread to dispatch, buffer or queue
+ * Use it (and its helper functions) to safety control the thread states.
+ * Initiate switch_thread_ctl before starting your thread
+ * Use switch_thread_ctl_thread_set_running() and switch_thread_ctl_thread_set_stopped() inside your thread.
+ * Check your thread states from the outside with switch_thread_ctl_thread_is_initiated() and switch_thread_ctl_thread_is_running()
+ * See more in the header file.
+ **/
+struct switch_thread_ctl {
+ volatile int8_t thread_initiated;
+ volatile int8_t thread_running;
+ switch_thread_rwlock_t *rwlock;
+};
+
+/**
+ * Allocates and initializes switch_thread_ctl structure
+ * \thread_ctl [in] thread_ctl
+ * \pool [in] memory pool
+ * \return void
+ **/
+static inline void switch_thread_ctl_init(switch_thread_ctl_t **thread_ctl, switch_memory_pool_t *pool) {
+ *thread_ctl = (switch_thread_ctl_t *)switch_core_alloc(pool, sizeof(switch_thread_ctl_t));
+ switch_thread_rwlock_create(&(*thread_ctl)->rwlock, pool);
+}
+
+/**
+ * Checks if the newly created switch_thread_ctl thread was finally initiated to run.
+ * Usually called from the controlled thread itself to demonstrate that the thread turned into the operational state.
+ * Does not mean the thread is still running, because it may stop right away after by its own reasons.
+ * \thread_ctl [in] thread_ctl
+ * \return 1 - initiated, 0 - was not initiated yet.
+**/
+static inline int8_t switch_thread_ctl_thread_is_initiated(switch_thread_ctl_t *thread_ctl)
+{
+ int8_t result;
+ switch_thread_rwlock_rdlock(thread_ctl->rwlock);
+ result = (thread_ctl->thread_initiated == 1);
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+ return result;
+}
+
+/**
+ * Checks if switch_thread_ctl thread is running
+ * \thread_ctl [in] thread_ctl
+ * \return 1 - running, 0 - stopping or stopped.
+**/
+static inline int8_t switch_thread_ctl_thread_is_running(switch_thread_ctl_t *thread_ctl)
+{
+ int8_t result;
+ switch_thread_rwlock_rdlock(thread_ctl->rwlock);
+ result = (thread_ctl->thread_running == 1);
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+ return result;
+}
+
+/**
+ * Asks switch_thread_ctl thread to begin stopping and give other threads ability to wait for the actual stop.
+ * \thread_ctl [in] thread_ctl
+ * \return void
+**/
+static inline void switch_thread_ctl_thread_request_stop(switch_thread_ctl_t *thread_ctl)
+{
+ switch_thread_rwlock_wrlock(thread_ctl->rwlock);
+ thread_ctl->thread_running = -1;
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+}
+
+/**
+ * Checks if switch_thread_ctl thread is still stopping and not finally stopped.
+ * \thread_ctl [in] thread_ctl
+ * \return 1 - stopping, 0 - not stopping (0 does not mean stopped if wrongly used).
+**/
+static inline int8_t switch_thread_ctl_thread_is_stopping(switch_thread_ctl_t *thread_ctl)
+{
+ int8_t result;
+
+ switch_thread_rwlock_rdlock(thread_ctl->rwlock);
+ result = (thread_ctl->thread_running == -1);
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+
+ return result;
+}
+
+/**
+ * Puts switch_thread_ctl thread into the running and initializated state (initializated state will be never changed later)
+ * \thread_ctl [in] thread_ctl
+ * \return void
+ **/
+static inline void switch_thread_ctl_thread_set_running(switch_thread_ctl_t *thread_ctl)
+{
+ switch_thread_rwlock_wrlock(thread_ctl->rwlock);
+ thread_ctl->thread_initiated = 1;
+ thread_ctl->thread_running = 1;
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+}
+
+/**
+ * Puts switch_thread_ctl into the stopped state.
+ * Usually called by the controled thread itself right before the exiting.
+ * NOTE: Does not flip the thread_initiated flag!
+ * \thread_ctl [in] thread_ctl
+ * \return void
+ **/
+static inline void switch_thread_ctl_thread_set_stopped(switch_thread_ctl_t *thread_ctl)
+{
+ switch_thread_rwlock_wrlock(thread_ctl->rwlock);
+ thread_ctl->thread_running = 0;
+ switch_thread_rwlock_unlock(thread_ctl->rwlock);
+}
+
SWITCH_END_EXTERN_C
#endif
/* For Emacs:
switch_cache_db_handle_type_t type;
switch_cache_db_native_handle_t native_handle;
time_t last_used;
- switch_mutex_t *mutex;
+ switch_mutex_t *mutex;
switch_mutex_t *io_mutex;
+ switch_mutex_t *usage_mutex;
switch_memory_pool_t *pool;
int32_t flags;
unsigned long hash;
static struct {
switch_memory_pool_t *memory_pool;
switch_thread_t *db_thread;
- int db_thread_running;
switch_bool_t manage;
switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex;
- switch_mutex_t *ctl_mutex;
switch_cache_db_handle_t *handle_pool;
uint32_t total_handles;
uint32_t total_used_handles;
switch_cache_db_handle_t *dbh;
switch_sql_queue_manager_t *qm;
int paused;
+ switch_thread_ctl_t *thread_ctl;
} sql_manager;
new_dbh->pool = pool;
new_dbh->type = type;
switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
+ switch_mutex_init(&new_dbh->usage_mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
return new_dbh;
}
switch_ssize_t hlen = -1;
switch_mutex_lock(sql_manager.dbh_mutex);
+ switch_mutex_lock(dbh->mutex);
switch_set_string(dbh->creator, db_callsite_str);
sql_manager.handle_pool = dbh;
sql_manager.total_handles++;
- switch_mutex_lock(dbh->mutex);
+
switch_mutex_unlock(sql_manager.dbh_mutex);
}
{
switch_cache_db_handle_t *dbh_ptr, *last = NULL;
- switch_mutex_lock(sql_manager.dbh_mutex);
for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
if (dbh_ptr == dbh) {
if (last) {
last = dbh_ptr;
}
- switch_mutex_unlock(sql_manager.dbh_mutex);
}
SWITCH_DECLARE(void) switch_cache_db_database_interface_flush_handles(switch_database_interface_t *database_interface)
break;
}
- switch_mutex_lock(sql_manager.dbh_mutex);
(*dbh)->last_used = switch_epoch_time_now(NULL);
+ switch_mutex_unlock((*dbh)->mutex);
+
+ switch_mutex_lock(sql_manager.dbh_mutex);
(*dbh)->io_mutex = NULL;
(*dbh)->thread_hash = 1;
}
}
- switch_mutex_unlock((*dbh)->mutex);
+
sql_manager.total_used_handles--;
- *dbh = NULL;
switch_mutex_unlock(sql_manager.dbh_mutex);
+
+ *dbh = NULL;
}
}
{
int sec = 0, reg_sec = 0;;
- sql_manager.db_thread_running = 1;
+ switch_thread_ctl_thread_set_running(sql_manager.thread_ctl);
- while (sql_manager.db_thread_running == 1) {
+ while (switch_thread_ctl_thread_is_running(sql_manager.thread_ctl)) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
sec = 0;
uint32_t numq;
char *dsn;
switch_thread_t *thread;
- int thread_running;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
switch_mutex_t *cond2_mutex;
uint32_t max_trans;
uint32_t confirm;
uint8_t paused;
+ switch_thread_ctl_t *thread_ctl;
};
static int qm_wake(switch_sql_queue_manager_t *qm)
uint32_t ttl = 0;
uint32_t i;
+ switch_mutex_lock(qm->mutex);
for (i = 0; i < qm->numq; i++) {
ttl += switch_queue_size(qm->sql_queue[i]);
}
+ switch_mutex_unlock(qm->mutex);
return ttl;
}
switch_status_t status = SWITCH_STATUS_FALSE;
uint32_t i, sanity = 100;
- if (qm->thread_running == 1) {
- qm->thread_running = -1;
+ if (switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
+ switch_thread_ctl_thread_request_stop(qm->thread_ctl);
- while(--sanity && qm->thread_running == -1) {
+ while(--sanity && switch_thread_ctl_thread_is_stopping(qm->thread_ctl)) {
+ switch_mutex_lock(qm->mutex);
for(i = 0; i < qm->numq; i++) {
switch_queue_push(qm->sql_queue[i], NULL);
switch_queue_interrupt_all(qm->sql_queue[i]);
}
+ switch_mutex_unlock(qm->mutex);
+
qm_wake(qm);
- if (qm->thread_running == -1) {
+ if (switch_thread_ctl_thread_is_stopping(qm->thread_ctl)) {
switch_yield(100000);
}
}
{
switch_threadattr_t *thd_attr;
- if (!qm->thread_running) {
+ if (!switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
switch_threadattr_create(&thd_attr, qm->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_status_t status;
int x = 0;
- if (sql_manager.paused || qm->thread_running != 1) {
+ if (sql_manager.paused || !switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
if (!dup) free((char *)sql);
qm_wake(qm);
return SWITCH_STATUS_SUCCESS;
}
- if (qm->thread_running != 1) {
+ if (!switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
#ifdef EXEC_NOW
switch_cache_db_handle_t *dbh;
- if (sql_manager.paused || qm->thread_running != 1) {
+ if (sql_manager.paused || !switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
if (!dup) free((char *)sql);
qm_wake(qm);
qm->name = switch_core_strdup(qm->pool, name);
qm->max_trans = max_trans;
+ switch_thread_ctl_init(&qm->thread_ctl, qm->pool);
+
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->cond2_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
return NULL;
}
- qm->thread_running = 1;
+ switch_thread_ctl_thread_set_running(qm->thread_ctl);
switch_mutex_lock(qm->cond_mutex);
break;
}
-
- while (qm->thread_running == 1) {
+ while (switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
uint32_t i, lc;
uint32_t written = 0, iterations = 0;
check:
if ((lc = qm_ttl(qm)) == 0) {
+ /* Avoid deadlock by unlocking cond_mutex */
+ switch_mutex_unlock(qm->cond_mutex);
+
switch_mutex_lock(qm->cond2_mutex);
+
+ /* switch_thread_cond_wait() requires mutex (cond_mutex in that case) to be locked before calling */
+ switch_mutex_lock(qm->cond_mutex);
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
+
switch_mutex_unlock(qm->cond2_mutex);
}
while (--i > 0 && (lc = qm_ttl(qm)) < 500) {
switch_yield(5000);
}
-
-
}
switch_mutex_unlock(qm->cond_mutex);
switch_cache_db_release_db_handle(&qm->event_db);
- qm->thread_running = 0;
+ switch_thread_ctl_thread_set_stopped(qm->thread_ctl);
return NULL;
}
if (sql_idx) {
int i = 0;
-
+ switch_thread_rwlock_rdlock(sql_manager.thread_ctl->rwlock);
for (i = 0; i < sql_idx; i++) {
- if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
- switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
- } else {
- switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
+ if (!sql_manager.qm) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql[i]);
+ }
+ else {
+ if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
+ switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
+ }
+ else {
+ switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
+ }
}
sql[i] = NULL;
}
+ switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
}
}
{
switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
- switch_mutex_lock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_rdlock(sql_manager.thread_ctl->rwlock);
if (sql_manager.qm && sql_manager.qm->event_db) {
type = sql_manager.qm->event_db->type;
}
- switch_mutex_unlock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
return type;
}
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
- switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
+ switch_thread_ctl_init(&sql_manager.thread_ctl, sql_manager.memory_pool);
if (!sql_manager.manage) goto skip;
static void switch_core_sqldb_stop_thread(void)
{
- switch_mutex_lock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_wrlock(sql_manager.thread_ctl->rwlock);
if (sql_manager.manage) {
if (sql_manager.qm) {
switch_sql_queue_manager_destroy(&sql_manager.qm);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
-
- switch_mutex_unlock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
}
static void switch_core_sqldb_start_thread(void)
{
-
- switch_mutex_lock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_wrlock(sql_manager.thread_ctl->rwlock);
if (sql_manager.manage) {
if (!sql_manager.qm) {
char *dbname = runtime.odbc_dsn;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
- switch_mutex_unlock(sql_manager.ctl_mutex);
+ switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
}
void switch_core_sqldb_stop(void)
switch_event_unbind_callback(core_event_handler);
- if (sql_manager.db_thread && sql_manager.db_thread_running) {
- sql_manager.db_thread_running = -1;
+ if (sql_manager.db_thread && switch_thread_ctl_thread_is_running(sql_manager.thread_ctl)) {
+ switch_thread_ctl_thread_request_stop(sql_manager.thread_ctl);
switch_thread_join(&st, sql_manager.db_thread);
}