]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-2775 Rewrite XML fetch conditional wait to be more sane (Reported by James Aimonetti)
authorAndrew Thompson <andrew@hijacked.us>
Mon, 15 Nov 2010 17:39:54 +0000 (12:39 -0500)
committerAndrew Thompson <andrew@hijacked.us>
Mon, 15 Nov 2010 17:39:54 +0000 (12:39 -0500)
src/mod/event_handlers/mod_erlang_event/handle_msg.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c
src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h

index 1a8b64fdb4d8bbd638e2603d0ee462d495921b10..bb37cddc59c13fe135a558a5f9d7a4cd84eaf092 100644 (file)
@@ -173,68 +173,74 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *
                ei_x_encode_atom(rbuf, "error");
                ei_x_encode_atom(rbuf, "badarg");
        } else {
-               ei_x_buff *nbuf = malloc(sizeof(nbuf));
-               nbuf->buff = malloc(buf->buffsz);
-               memcpy(nbuf->buff, buf->buff, buf->buffsz);
-               nbuf->index = buf->index;
-               nbuf->buffsz = buf->buffsz;
-
-               switch_mutex_lock(globals.fetch_reply_mutex);
-               if ((p = switch_core_hash_find(globals.fetch_reply_hash, uuid_str))) {
-                       /* Get the status and release the lock ASAP. */
-                       enum { is_timeout, is_waiting, is_filled } status;
-                       if (p->state == reply_not_ready) {
-                               switch_thread_cond_wait(p->ready_or_found, globals.fetch_reply_mutex);
-                       }
-
-                       if (p->state == reply_waiting) {
-                               /* update the key with a reply */
-                               status = is_waiting;
-                               p->reply = nbuf;
-                               p->state = reply_found;
-                               strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
-                               switch_thread_cond_broadcast(p->ready_or_found);
-                       } else if (p->state == reply_timeout) {
-                               status = is_timeout;
-                       } else {
-                               status = is_filled;
-                       }
-
-                       put_reply_unlock(p, uuid_str);
+               /* TODO - maybe use a rwlock instead */
+               if ((p = switch_core_hash_find_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex))) {
+                       /* try to lock the mutex, so no other responder can */
+                       if (switch_mutex_trylock(p->mutex) == SWITCH_STATUS_SUCCESS) {
+                               if (p->state == reply_waiting) {
+                                       /* alright, we've got the lock and we're the first to reply */
+
+                                       /* clone the reply so it doesn't get destroyed on us */
+                                       ei_x_buff *nbuf = malloc(sizeof(nbuf));
+                                       nbuf->buff = malloc(buf->buffsz);
+                                       memcpy(nbuf->buff, buf->buff, buf->buffsz);
+                                       nbuf->index = buf->index;
+                                       nbuf->buffsz = buf->buffsz;
+
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str);
+
+                                       /* copy info into the reply struct */
+                                       p->state = reply_found;
+                                       p->reply = nbuf;
+                                       strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
+
+                                       /* signal waiting thread that its time to wake up */
+                                       switch_thread_cond_signal(p->ready_or_found);
+
+                                       /* reply OK */
+                                       ei_x_encode_tuple_header(rbuf, 2);
+                                       ei_x_encode_atom(rbuf, "ok");
+                                       _ei_x_encode_string(rbuf, uuid_str);
 
-                       /* Relay the status back to the fetch responder. */
-                       if (status == is_waiting) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str);
-                               ei_x_encode_tuple_header(rbuf, 2);
-                               ei_x_encode_atom(rbuf, "ok");
-                               _ei_x_encode_string(rbuf, uuid_str);
-                               /* Return here to avoid freeing the reply. */
-                               return SWITCH_STATUS_SUCCESS;
-                       } else if (status == is_timeout) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", uuid_str);
-                               ei_x_encode_tuple_header(rbuf, 3);
-                               ei_x_encode_atom(rbuf, "error");
-                               _ei_x_encode_string(rbuf, uuid_str);
-                               ei_x_encode_atom(rbuf, "timeout");
+                                       /* unlock */
+                                       switch_mutex_unlock(p->mutex);
+                               } else {
+                                       if (p->state == reply_found) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
+                                               ei_x_encode_tuple_header(rbuf, 3);
+                                               ei_x_encode_atom(rbuf, "error");
+                                               _ei_x_encode_string(rbuf, uuid_str);
+                                               ei_x_encode_atom(rbuf, "duplicate_response");
+                                       } else if (p->state == reply_timeout) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
+                                               ei_x_encode_tuple_header(rbuf, 3);
+                                               ei_x_encode_atom(rbuf, "error");
+                                               _ei_x_encode_string(rbuf, uuid_str);
+                                               ei_x_encode_atom(rbuf, "timeout");
+                                       } else if (p->state == reply_not_ready) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
+                                               ei_x_encode_tuple_header(rbuf, 3);
+                                               ei_x_encode_atom(rbuf, "error");
+                                               _ei_x_encode_string(rbuf, uuid_str);
+                                               ei_x_encode_atom(rbuf, "not_ready");
+                                       }
+                                       switch_mutex_unlock(p->mutex);
+                               }
                        } else {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", uuid_str);
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not lock mutex for reply %s\n", uuid_str);
                                ei_x_encode_tuple_header(rbuf, 3);
                                ei_x_encode_atom(rbuf, "error");
                                _ei_x_encode_string(rbuf, uuid_str);
                                ei_x_encode_atom(rbuf, "duplicate_response");
                        }
                } else {
-                       /* nothing in the hash */
-                       switch_mutex_unlock(globals.fetch_reply_mutex);
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", uuid_str);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str);
                        ei_x_encode_tuple_header(rbuf, 2);
                        ei_x_encode_atom(rbuf, "error");
                        ei_x_encode_atom(rbuf, "invalid_uuid");
                }
-
-               switch_safe_free(nbuf->buff);
-               switch_safe_free(nbuf);
        }
+
        return SWITCH_STATUS_SUCCESS;
 }
 
@@ -1052,7 +1058,7 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
 
                        if (se->spawn_reply->state == reply_waiting) {
                                se->spawn_reply->pid = pid;
-                               switch_thread_cond_broadcast(se->spawn_reply->ready_or_found);
+                               switch_thread_cond_signal(se->spawn_reply->ready_or_found);
                                ei_x_encode_atom(rbuf, "ok");
                                switch_thread_rwlock_unlock(listener->session_rwlock);
                                switch_mutex_unlock(se->spawn_reply->mutex);
index 8565e1afa77985e20e4003e02006198d58d563b0..e375515f4f951bf0e73df24fbaeeb7c92767ad34 100644 (file)
@@ -366,13 +366,21 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
        ei_x_buff buf;
        ei_x_new_with_version(&buf);
 
+       switch_uuid_get(&uuid);
+       switch_uuid_format(uuid_str, &uuid);
+
+       ei_x_encode_tuple_header(&buf, 7);
+       ei_x_encode_atom(&buf, "fetch");
+       ei_x_encode_atom(&buf, sectionstr);
+       _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined");
+       _ei_x_encode_string(&buf, key_name ? key_name : "undefined");
+       _ei_x_encode_string(&buf, key_value ? key_value : "undefined");
+       _ei_x_encode_string(&buf, uuid_str);
+
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
 
        section = switch_xml_parse_section_string((char *) sectionstr);
 
-       switch_uuid_get(&uuid);
-       switch_uuid_format(uuid_str, &uuid);
-
        for (ptr = bindings.head; ptr; ptr = ptr->next) {
                if (ptr->section != section)
                        continue;
@@ -384,13 +392,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
 
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node);
 
-               ei_x_encode_tuple_header(&buf, 7);
-               ei_x_encode_atom(&buf, "fetch");
-               ei_x_encode_atom(&buf, sectionstr);
-               _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined");
-               _ei_x_encode_string(&buf, key_name ? key_name : "undefined");
-               _ei_x_encode_string(&buf, key_value ? key_value : "undefined");
-               _ei_x_encode_string(&buf, uuid_str);
                if (params) {
                        ei_encode_switch_event_headers(&buf, params);
                } else {
@@ -401,41 +402,42 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
                        /* Create a new fetch object. */
                        p = malloc(sizeof(*p));
                        switch_thread_cond_create(&p->ready_or_found, module_pool);
-                       p->usecount = 1;
+                       switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool);
                        p->state = reply_not_ready;
                        p->reply = NULL;
                        switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex);
+                       p->state = reply_waiting;
                        now = switch_micro_time_now();
                }
                /* We don't need to lock here because everybody is waiting
                   on our condition before the action starts. */
-               p->usecount ++;
 
                switch_mutex_lock(ptr->listener->sock_mutex);
                ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf);
                switch_mutex_unlock(ptr->listener->sock_mutex);
        }
 
+       ei_x_free(&buf);
+
        if (!p) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
                goto cleanup;
        }
 
        /* Tell the threads to be ready, and wait five seconds for a reply. */
-       switch_mutex_lock(globals.fetch_reply_mutex);
-       p->state = reply_waiting;
-       switch_thread_cond_broadcast(p->ready_or_found);
+       switch_mutex_lock(p->mutex);
+       //p->state = reply_waiting;
        switch_thread_cond_timedwait(p->ready_or_found,
-                       globals.fetch_reply_mutex, 5000000);
+                       p->mutex, 5000000);
        if (!p->reply) {
                p->state = reply_timeout;
-               switch_mutex_unlock(globals.fetch_reply_mutex);
+               switch_mutex_unlock(p->mutex);
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out after %d milliseconds when waiting for XML fetch response for %s\n", (int) (switch_micro_time_now() - now) / 1000, uuid_str);
                goto cleanup;
        }
 
        rep = p->reply;
-       switch_mutex_unlock(globals.fetch_reply_mutex);
+       switch_mutex_unlock(p->mutex);
 
        ei_get_type(rep->buff, &rep->index, &type, &size);
 
@@ -450,7 +452,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
                goto cleanup;
        }
 
-
        if (!(xmlstr = malloc(size + 1))) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
                goto cleanup;
@@ -471,26 +472,17 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
        /* cleanup */
  cleanup:
        if (p) {
-               switch_mutex_lock(globals.fetch_reply_mutex);
-               put_reply_unlock(p, uuid_str);
-       }
-
-       return xml;
-}
-
-
-void put_reply_unlock(fetch_reply_t *p, char *uuid_str)
-{
-       if (-- p->usecount == 0) {
-               switch_core_hash_delete(globals.fetch_reply_hash, uuid_str);
+               /* lock so nothing can have it while we delete it */
+               switch_mutex_lock(p->mutex);
+               switch_core_hash_delete_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex);
+               switch_mutex_unlock(p->mutex);
+               switch_mutex_destroy(p->mutex);
                switch_thread_cond_destroy(p->ready_or_found);
-               if (p->reply) {
-                       switch_safe_free(p->reply->buff);
-                       switch_safe_free(p->reply);
-               }
+               switch_safe_free(p->reply);
                switch_safe_free(p);
        }
-       switch_mutex_unlock(globals.fetch_reply_mutex);
+
+       return xml;
 }
 
 
index 248e66b056eb53fca5c42c50575ffb40393bf9d6..eb36612ce6fffcb6ee97bd11bc0cdf20aa4832b7 100644 (file)
@@ -60,7 +60,7 @@ enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout };
 struct fetch_reply_struct
 {
        switch_thread_cond_t *ready_or_found;
-       int usecount;
+       switch_mutex_t *mutex;
        enum reply_state state;
        ei_x_buff *reply;
        char winner[MAXNODELEN + 1];