static switch_memory_pool_t *module_pool = NULL;
static void remove_listener(listener_t *listener);
+static void destroy_listener(listener_t *listener);
static switch_status_t state_handler(switch_core_session_t *session);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
- /* TODO listener read lock */
+
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
switch_log_node_t *dnode = switch_log_node_dup(node);
l->lost_logs++;
}
}
+
}
switch_thread_rwlock_unlock(globals.listener_rwlock);
one of them should receive the event as well
*/
- /* TODO need read locking */
-
send_event_to_attached_sessions(l, event);
if (!switch_test_flag(l, LFLAG_EVENTS)) {
}
-/*static void add_listener(listener_t *listener)*/
-/*{*/
- /* add me to the listeners so I get events */
- /*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/
- /*listener->next = listen_list.listeners;*/
- /*listen_list.listeners = listener;*/
- /*switch_thread_rwlock_unlock(globals.listener_rwlock);*/
-/*}*/
+static void add_listener(listener_t *listener)
+{
+ /* add me to the listeners so I get events */
+ switch_thread_rwlock_wrlock(globals.listener_rwlock);
+ listener->next = listen_list.listeners;
+ listen_list.listeners = listener;
+ switch_thread_rwlock_unlock(globals.listener_rwlock);
+}
-/* TODO lock */
static void remove_listener(listener_t *listener)
{
listener_t *l, *last = NULL;
+ switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
if (l == listener) {
if (last) {
}
last = l;
}
+ switch_thread_rwlock_unlock(globals.listener_rwlock);
}
-/* Search for a listener already talking to the specified node */
-static listener_t *find_listener(char *nodename)
+/* Search for a listener already talking to the specified node and lock for reading*/
+static listener_t *find_listener_locked(char *nodename)
{
listener_t *l = NULL;
switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
- /* TODO listener rwlock */
+ switch_thread_rwlock_rdlock(l->rwlock);
break;
}
}
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
{
listener_t *listener = (listener_t *) obj;
- session_elem_t *s = NULL;
- const void *key;
- void *value;
- switch_hash_index_t *iter;
switch_mutex_lock(globals.listener_count_mutex);
prefs.threads++;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */
}
- /*add_listener(listener);*/
+ add_listener(listener);
listener_main_loop(listener);
}
- /* clean up */
-
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
-
- listener->dead = 1; /* mark it as dead */
-
- /* TODO - release write lock */
- switch_thread_rwlock_wrlock(globals.listener_rwlock);
- remove_listener(listener);
- switch_thread_rwlock_unlock(globals.listener_rwlock);
-
- switch_thread_rwlock_wrlock(listener->rwlock);
-
- if (listener->sockfd) {
- close_socket(&listener->sockfd);
- }
-
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
- /* TODO make listener destroy function and move there */
- switch_core_hash_destroy(&listener->event_hash);
- /* remove any bindings for this connection */
- remove_binding(listener, NULL);
-
- /* clean up all the attached sessions */
- switch_thread_rwlock_wrlock(listener->session_rwlock);
- for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
- switch_hash_this(iter, &key, NULL, &value);
- s = (session_elem_t*)value;
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
- remove_session_elem_from_listener(listener, s);
- destroy_session_elem(s);
- }
- switch_thread_rwlock_unlock(listener->session_rwlock);
- switch_thread_rwlock_unlock(listener->rwlock);
-
- if (listener->pool) {
- switch_memory_pool_t *pool = listener->pool;
- switch_core_destroy_memory_pool(&pool);
- }
+ remove_listener(listener);
+ destroy_listener(listener);
switch_mutex_lock(globals.listener_count_mutex);
prefs.threads--;
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
- /* TODO remove */
- listener->dead = 0; /* born alive */
listener->sockfd = clientfd;
listener->pool = pool;
listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode));
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_core_hash_init(&listener->sessions, listener->pool);
- /* TODO listener rdlock */
- listener->next = listen_list.listeners;
- listen_list.listeners = listener;
-
return listener;
}
-/*TODO we don't need bottleneck*/
-static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd)
-{
- listener_t *res;
- switch_thread_rwlock_wrlock(globals.listener_rwlock);
- res = new_listener(ec, clientfd);
- switch_thread_rwlock_unlock(globals.listener_rwlock);
- return res;
-}
-/* TODO new session??? */
-static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
+static listener_t *new_outbound_listener_locked(char *node)
{
listener_t *listener = NULL;
struct ei_cnode_s ec;
int clientfd;
- /* TODO find listener func */
- switch_thread_rwlock_wrlock(globals.listener_rwlock);
- for (listener = listen_list.listeners; listener; listener = listener->next) {
- if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) {
- break;
- }
- }
-
- if (listener && listener->dead) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node);
- remove_listener(listener); /* remove the dead listener and continue adding one */
- } else if (listener) {
- switch_thread_rwlock_unlock(globals.listener_rwlock);
-
- *new_session = SWITCH_FALSE;
- return listener;
- }
-
-
if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) {
#ifdef WIN32
WSASetLastError(0);
#endif
if ((clientfd = ei_connect(&ec, node)) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno);
- switch_thread_rwlock_unlock(globals.listener_rwlock);
+
return NULL;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
listener->peer_nodename = switch_core_strdup(listener->pool, node);
}
- switch_thread_rwlock_unlock(globals.listener_rwlock);
- *new_session = SWITCH_TRUE;
+ switch_thread_rwlock_rdlock(listener->rwlock);
return listener;
}
+void destroy_listener(listener_t * listener)
+{
+ session_elem_t *s = NULL;
+ const void *key;
+ void *value;
+ switch_hash_index_t *iter;
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
+ switch_thread_rwlock_wrlock(listener->rwlock);
+
+ if (listener->sockfd) {
+ close_socket(&listener->sockfd);
+ }
+
+ switch_core_hash_destroy(&listener->event_hash);
+
+ /* remove any bindings for this connection */
+ remove_binding(listener, NULL);
+
+ /* clean up all the attached sessions */
+ switch_thread_rwlock_wrlock(listener->session_rwlock);
+ for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
+ switch_hash_this(iter, &key, NULL, &value);
+ s = (session_elem_t*)value;
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
+ remove_session_elem_from_listener(listener, s);
+ destroy_session_elem(s);
+ }
+ switch_thread_rwlock_unlock(listener->session_rwlock);
+ switch_thread_rwlock_unlock(listener->rwlock);
+
+ if (listener->pool) {
+ switch_memory_pool_t *pool = listener->pool;
+ switch_core_destroy_memory_pool(&pool);
+ }
+
+}
+
static switch_status_t state_handler(switch_core_session_t *session)
{
switch_channel_t *channel = switch_core_session_get_channel(session);
char *argv[80] = { 0 }, *argv2[80] = { 0 };
char *mydata, *myarg;
char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1];
- switch_bool_t new_session = SWITCH_FALSE;
session_elem_t *session_element = NULL;
/* process app arguments */
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
/* first work out if there is a listener already talking to the node we want to talk to */
- listener = find_listener(node);
+ listener = find_listener_locked(node);
/* if there is no listener, then create one */
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
- listener = new_outbound_listener(node, &new_session);
- /* XXX new_session isn't accurate now */
+ if ((listener = new_outbound_listener_locked(node))) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
+ launch_listener_thread(listener);
+ }
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n");
- /* TODO don't we need to connect ? */
}
- /* TODO it's too late */
- switch_thread_rwlock_rdlock(globals.listener_rwlock);
-
- if (listener && !listener->dead) {
- /* prevent the listener_run thread from destroying the listener out from under us */
- /* get the listener lock */
- switch_thread_rwlock_rdlock(listener->rwlock);
- /* release the global listener lock, since the listener can't be freed without the listener lock */
- switch_thread_rwlock_unlock(globals.listener_rwlock);
-
- if (new_session == SWITCH_TRUE) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
- launch_listener_thread(listener);
- }
+ if (listener) {
if (module && function) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
session_element = attach_call_to_registered_process(listener, reg_name, session);
}
- /* should be safe now */
switch_thread_rwlock_unlock(listener->rwlock);
-
if (session_element) {
switch_ivr_park(session, NULL);
}
char *mydata;
ei_x_buff buf;
listener_t *listener;
- switch_bool_t new_session;
ei_x_new_with_version(&buf);
ei_x_encode_atom(&buf, "freeswitch_sendmsg");
_ei_x_encode_string(&buf, argv[2]);
- listener = find_listener(node);
+ listener = find_listener_locked(node);
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
- listener = new_outbound_listener(node, &new_session);
+ listener = new_outbound_listener_locked(node);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node);
}
- if (listener && !listener->dead) {
+ if (listener) {
ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index);
+
+ switch_thread_rwlock_unlock(listener->rwlock);
}
}
continue;
}
- listener = new_listener_locked(&ec, clientfd);
+ listener = new_listener(&ec, clientfd);
if (listener) {
/* store the IP and node name we are talking with */
switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));