]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
Switch from a busy-wait loop to crazy conditional timedwait stuff for speed
authorAndrew Thompson <andrew@hijacked.us>
Fri, 6 Aug 2010 05:31:07 +0000 (01:31 -0400)
committerAndrew Thompson <andrew@hijacked.us>
Fri, 6 Aug 2010 05:32:28 +0000 (01:32 -0400)
src/mod/event_handlers/mod_erlang_event/handle_msg.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

index 4bceaaf4e8cd21e8e6174c05309a7ccb77316108..38c0e4e4eb57fcdc4e7cdf028c8a673794557a76 100644 (file)
@@ -878,9 +878,12 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
 {
        erlang_ref ref;
        erlang_pid *pid;
-       void *p;
        char hash[100];
        int arity;
+       const void *key;
+       void *val;
+       session_elem_t *se;
+       switch_hash_index_t *iter;
 
        ei_decode_tuple_header(buf->buff, &buf->index, &arity);
 
@@ -906,32 +909,35 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
 
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
 
-       if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) {
-               if (p == &globals.TIMEOUT) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash);
-                       switch_core_hash_delete(listener->spawn_pid_hash, hash);
-                       ei_x_encode_tuple_header(rbuf, 2);
-                       ei_x_encode_atom(rbuf, "error");
-                       ei_x_encode_atom(rbuf, "timeout");
-               } else if (p == &globals.WAITING) {
-                       /* update the key to point at a pid */
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash);
-                       switch_core_hash_delete(listener->spawn_pid_hash, hash);
-                       switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
-                       return SWITCH_STATUS_FALSE;     /*no reply */
-               } else {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash);
-                       ei_x_encode_tuple_header(rbuf, 2);
-                       ei_x_encode_atom(rbuf, "error");
-                       ei_x_encode_atom(rbuf, "duplicate_response");
+       switch_thread_rwlock_rdlock(listener->session_rwlock);
+       for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
+               switch_hash_this(iter, &key, NULL, &val);
+               se = (session_elem_t*)val;
+               if (se->spawn_reply && !strncmp(se->spawn_reply->hash, hash, 100)) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "found matching session for %s : %s\n", hash, se->uuid_str);
+                       switch_mutex_lock(se->spawn_reply->mutex);
+                       if (se->spawn_reply->state == reply_not_ready) {
+                               switch_thread_cond_wait(se->spawn_reply->ready_or_found, se->spawn_reply->mutex);
+                       }
+
+                       if (se->spawn_reply->state == reply_waiting) {
+                               se->spawn_reply->pid = pid;
+                               switch_thread_cond_broadcast(se->spawn_reply->ready_or_found);
+                               ei_x_encode_atom(rbuf, "ok");
+                               switch_thread_rwlock_unlock(listener->session_rwlock);
+                               switch_mutex_unlock(se->spawn_reply->mutex);
+                               return SWITCH_STATUS_SUCCESS;
+                       }
+                       switch_mutex_unlock(se->spawn_reply->mutex);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "get_pid came in too late for %s\n", hash);
+                       break;
                }
-       } else {
-               /* nothin in the hash */
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash);
-               ei_x_encode_tuple_header(rbuf, 2);
-               ei_x_encode_atom(rbuf, "error");
-               ei_x_encode_atom(rbuf, "invalid_ref");
        }
+       switch_thread_rwlock_unlock(listener->session_rwlock);
+
+       ei_x_encode_tuple_header(rbuf, 2);
+       ei_x_encode_atom(rbuf, "error");
+       ei_x_encode_atom(rbuf, "notfound");
 
        switch_safe_free(pid);          /* don't need it */
 
index 053f33033b69a81cccfd3869d2ce4edff0475440..60acc96517da9f3b4dba2b3e7e3e3fa3acc8f820 100644 (file)
@@ -1217,6 +1217,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
        switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool);
        switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool);
        switch_core_hash_init(&session_element->event_hash, session_element->pool);
+       session_element->spawn_reply = NULL;
 
        for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
                session_element->event_list[x] = 0;
@@ -1266,9 +1267,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
        /* create a session list element */
        session_elem_t *session_element = session_elem_create(listener, session);
        char hash[100];
-       int i = 0;
-       void *p = NULL;
-       erlang_pid *pid;
+       //void *p = NULL;
+       spawn_reply_t *p;
        erlang_ref ref;
 
        switch_set_flag(session_element, LFLAG_WAITING_FOR_PID);
@@ -1279,13 +1279,25 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
        ei_init_ref(listener->ec, &ref);
        ei_hash_ref(&ref, hash);
        /* insert the waiting marker */
-       switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
+
+       p = switch_core_alloc(session_element->pool, sizeof(*p));
+       switch_thread_cond_create(&p->ready_or_found, session_element->pool);
+       switch_mutex_init(&p->mutex, SWITCH_MUTEX_DEFAULT, session_element->pool);
+       p->state = reply_not_ready;
+       p->hash = hash;
+       p->pid = NULL;
+
+       session_element->spawn_reply = p;
+
+       switch_mutex_lock(p->mutex);
 
        if (!strcmp(function, "!")) {
                /* send a message to request a pid */
                ei_x_buff rbuf;
                ei_x_new_with_version(&rbuf);
 
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "get_pid\n");
+
                ei_x_encode_tuple_header(&rbuf, 4);
                ei_x_encode_atom(&rbuf, "get_pid");
                _ei_x_encode_string(&rbuf, switch_core_session_get_uuid(session));
@@ -1307,33 +1319,31 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
                 */
        }
 
-       /* loop until either we timeout or we get a value that's not the waiting marker */
-       while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
-               if (i > 500) {                  /* 5 second timeout */
-                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash);
-                       remove_session_elem_from_listener_locked(listener, session_element);
-                       switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT);      /* TODO lock this? */
-                       destroy_session_elem(session_element);
-                       return NULL;
-               }
-               i++;
-               switch_yield(10000);    /* 10ms */
+       p->state = reply_waiting;
+       switch_thread_cond_broadcast(p->ready_or_found);
+       switch_thread_cond_timedwait(p->ready_or_found,
+                       p->mutex, 5000000);
+       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "wtf\n");
+       if (!p->pid) {
+               p->state = reply_timeout;
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash);
+               remove_session_elem_from_listener_locked(listener, session_element);
+               destroy_session_elem(session_element);
+               return NULL;
        }
 
-       switch_core_hash_delete(listener->spawn_pid_hash, hash);
-
-       pid = (erlang_pid *) p;
        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "got pid! %s\n", hash);
 
        session_element->process.type = ERLANG_PID;
-       memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
+       memcpy(&session_element->process.pid, p->pid, sizeof(erlang_pid));
 
        switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
        switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
        switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
 
-       ei_link(listener, ei_self(listener->ec), pid);
-       switch_safe_free(pid);          /* malloced in handle_ref_tuple */
+       ei_link(listener, ei_self(listener->ec), &session_element->process.pid);
+
+       switch_safe_free(p->pid);
 
        return session_element;
 }
@@ -1425,7 +1435,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
                }
 
                if (module && function) {
-                       switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
                        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
                        session_element = attach_call_to_spawned_process(listener, module, function, session);
                } else {
index a900f02dcec80750750a2ae6c69136a86a6257c9..03b66d3b73a97be15093fb4c9e36804499498185 100644 (file)
@@ -55,6 +55,28 @@ struct erlang_process {
        erlang_pid pid;
 };
 
+enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout };
+
+struct fetch_reply_struct
+{
+       switch_thread_cond_t *ready_or_found;
+       int usecount;
+       enum reply_state state;
+       ei_x_buff *reply;
+       char winner[MAXNODELEN + 1];
+};
+typedef struct fetch_reply_struct fetch_reply_t;
+
+struct spawn_reply_struct
+{
+       switch_thread_cond_t *ready_or_found;
+       switch_mutex_t *mutex;
+       enum reply_state state;
+       erlang_pid *pid;
+       char *hash;
+};
+typedef struct spawn_reply_struct spawn_reply_t;
+
 struct session_elem {
        char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
        switch_mutex_t *flag_mutex;
@@ -65,6 +87,7 @@ struct session_elem {
        switch_memory_pool_t *pool;
        uint8_t event_list[SWITCH_EVENT_ALL + 1];
        switch_hash_t *event_hash;
+       spawn_reply_t *spawn_reply;
        //struct session_elem *next;
 };
 
@@ -105,7 +128,6 @@ struct listener {
        switch_log_level_t level;
        uint8_t event_list[SWITCH_EVENT_ALL + 1];
        switch_hash_t *event_hash;
-       switch_hash_t *spawn_pid_hash;
        switch_thread_rwlock_t *rwlock;
        switch_thread_rwlock_t *session_rwlock;
        //session_elem_t *session_list;
@@ -153,16 +175,6 @@ struct globals_struct {
 };
 typedef struct globals_struct globals_t;
 
-struct fetch_reply_struct
-{
-       switch_thread_cond_t *ready_or_found;
-       int usecount;
-       enum { reply_not_ready, reply_waiting, reply_found, reply_timeout } state;
-       ei_x_buff *reply;
-       char winner[MAXNODELEN + 1];
-};
-typedef struct fetch_reply_struct fetch_reply_t;
-
 struct listen_list_struct {
 #ifdef WIN32
        SOCKET sockfd;