time_t linger_timeout;
struct listener *next;
switch_pollfd_t *pollfd;
+ uint8_t lock_acquired;
+ uint8_t finished;
};
typedef struct listener listener_t;
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
-static void launch_listener_thread(listener_t *listener);
+static switch_status_t launch_listener_thread(listener_t *listener);
static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{
if (switch_test_flag(listener, LFLAG_ASYNC)) {
const char *var;
- launch_listener_thread(listener);
+ if (launch_listener_thread(listener) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Failed to start listener\n");
+ return;
+ }
- while (switch_channel_ready(channel) && !switch_test_flag(listener, LFLAG_CONNECTED)) {
+ /* Wait until listener_thread acquires session read lock */
+ while (!listener->lock_acquired && !listener->finished) {
+ switch_cond_next();
+ }
+
+ while (switch_channel_ready(channel) && !listener->finished && !switch_test_flag(listener, LFLAG_CONNECTED)) {
switch_cond_next();
}
if ((session = listener->session)) {
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Unable to lock session!\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
locked = 0;
+ session = NULL;
goto done;
}
+
+ listener->lock_acquired = 1;
}
if (!listener->sock) {
}
switch_mutex_unlock(listener->filter_mutex);
- if (listener->session) {
+ if (listener->session && locked) {
channel = switch_core_session_get_channel(listener->session);
}
}
if (listener->session) {
- switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
+ if (locked) {
+ switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
+ }
switch_clear_flag_locked(listener, LFLAG_SESSION);
if (locked) {
switch_core_session_rwunlock(listener->session);
prefs.threads--;
switch_mutex_unlock(globals.listener_mutex);
+ listener->finished = 1;
+
return NULL;
}
/* Create a thread for the socket and launch it */
-static void launch_listener_thread(listener_t *listener)
+static switch_status_t launch_listener_thread(listener_t *listener)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_threadattr_create(&thd_attr, listener->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
+ return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
}
static int config(void)
if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
- launch_listener_thread(listener);
- continue;
+ if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
+ continue;
}
}