]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
[mod_kazoo] add configuration for queue timeouts
authorlazedo <luis.azedo@factorlusitano.com>
Fri, 24 Jan 2020 16:39:51 +0000 (16:39 +0000)
committerlazedo <luis.azedo@factorlusitano.com>
Fri, 24 Jan 2020 16:39:51 +0000 (16:39 +0000)
* implements ei_queue_pop that calls
  switch_queue_trypop or switch_queue_pop_timeout
  depending on timeout value

* default for sender_timeout is set to 0
  to bring back previous behaviour of
  using switch_queue_trypop when fetching
  messages to be sent to erlang node

src/mod/event_handlers/mod_kazoo/kazoo_ei.h
src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c
src/mod/event_handlers/mod_kazoo/kazoo_ei_utils.c
src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c
src/mod/event_handlers/mod_kazoo/kazoo_node.c

index 6add449b2b4464d2606b144bf9f8aa30f462dd61..a621115b06b42639bc82afc46e6b1a1242187ec7 100644 (file)
@@ -81,6 +81,7 @@ struct ei_event_stream_s {
        uint32_t flags;
        ei_node_t *node;
        short event_stream_framing;
+       switch_interval_time_t queue_timeout;
        struct ei_event_stream_s *next;
 };
 
@@ -103,6 +104,9 @@ struct ei_node_s {
        uint32_t flags;
        int legacy;
        short event_stream_framing;
+       switch_interval_time_t event_stream_queue_timeout;
+       switch_interval_time_t receiver_queue_timeout;
+       switch_interval_time_t sender_queue_timeout;
        struct ei_node_s *next;
 };
 
@@ -171,11 +175,14 @@ struct kz_globals_s {
        int send_all_headers;
        int send_all_private_headers;
        int connection_timeout;
-       int receive_timeout;
+       int ei_receive_timeout;
+       switch_interval_time_t node_sender_queue_timeout;
+       switch_interval_time_t node_receiver_queue_timeout;
        int receive_msg_preallocate;
        int event_stream_preallocate;
        int send_msg_batch;
        short event_stream_framing;
+       switch_interval_time_t event_stream_queue_timeout;
        switch_port_t port;
        int config_fetched;
        int io_fault_tolerance;
@@ -235,6 +242,7 @@ switch_status_t create_acceptor();
 switch_hash_t *create_default_filter();
 void kz_erl_init();
 void kz_erl_shutdown();
+SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout);
 
 void fetch_config();
 
index e6bb4fa5ef1c27a7070a333decdbfee9bcf7fa0f..53ed9841547a540685ec050367fb6af1f51aed17 100644 (file)
@@ -118,11 +118,14 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
        kazoo_globals.send_all_headers = 0;
        kazoo_globals.send_all_private_headers = 1;
        kazoo_globals.connection_timeout = 500;
-       kazoo_globals.receive_timeout = 200;
+       kazoo_globals.ei_receive_timeout = 200;
        kazoo_globals.receive_msg_preallocate = 2000;
        kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
        kazoo_globals.send_msg_batch = 10;
        kazoo_globals.event_stream_framing = 2;
+       kazoo_globals.event_stream_queue_timeout = 200000;
+       kazoo_globals.node_receiver_queue_timeout = 100000;
+       kazoo_globals.node_sender_queue_timeout = 0;
        kazoo_globals.port = 0;
        kazoo_globals.io_fault_tolerance = 10;
        kazoo_globals.json_encoding = ERLANG_TUPLE;
@@ -189,7 +192,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
                                kazoo_globals.connection_timeout = atoi(val);
                        } else if (!strcmp(var, "receive-timeout")) {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-timeout: %s\n", val);
-                               kazoo_globals.receive_timeout = atoi(val);
+                               kazoo_globals.ei_receive_timeout = atoi(val);
                        } else if (!strcmp(var, "receive-msg-preallocate")) {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-msg-preallocate: %s\n", val);
                                kazoo_globals.receive_msg_preallocate = atoi(val);
@@ -219,6 +222,15 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
                        } else if (!strcmp(var, "expand-headers-on-fetch")) {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val);
                                kazoo_globals.expand_headers_on_fetch = switch_true(val);
+                       } else if (!strcmp(var, "node-receiver-queue-timeout")) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+                               kazoo_globals.node_receiver_queue_timeout = atoi(val);
+                       } else if (!strcmp(var, "node-sender-queue-timeout")) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+                               kazoo_globals.node_sender_queue_timeout = atoi(val);
+                       } else if (!strcmp(var, "event-stream-queue-timeout")) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+                               kazoo_globals.event_stream_queue_timeout = atoi(val);
                        } else {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val);
                        }
index 96e2c7de66f63f915dfd125c1b76504f1d050780..fb655e88609d00583906eddbd88f3f853b61898a 100644 (file)
@@ -1038,6 +1038,14 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) {
        return SWITCH_STATUS_TERM;
 }
 
+SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
+{
+       if (timeout == 0) {
+               return switch_queue_trypop(queue, data);
+       } else {
+               return switch_queue_pop_timeout(queue, data, timeout);
+       }
+}
 /* For Emacs:
  * Local Variables:
  * mode:c
index 5e5fb8dff4a8ce04568795e37db62b46b1375523..c2a53933ea6b6daaa5879eb3b4bb7171d3d7b309 100644 (file)
@@ -302,7 +302,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
                }
 
                /* if there was an event waiting in our queue send it to the client */
-               if (switch_queue_pop_timeout(event_stream->queue, &pop, 200000) == SWITCH_STATUS_SUCCESS) {
+               if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) {
                        ei_x_buff *ebuf = (ei_x_buff *) pop;
 
                        if (event_stream->socket) {
@@ -401,6 +401,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from)
        event_stream->connected = SWITCH_FALSE;
        event_stream->node = ei_node;
        event_stream->event_stream_framing = ei_node->event_stream_framing;
+       event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
        memcpy(&event_stream->pid, from, sizeof(erlang_pid));
        switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
 
index 273fcbaa7f719d10cf61733cbe0ac4f82a5adea5..e11a48d6ac719841494272ebd5ea7c71a51f689a 100644 (file)
@@ -1452,7 +1452,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o
        while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
                void *pop = NULL;
 
-               if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) {
+               if (ei_queue_pop(ei_node->received_msgs, &pop, ei_node->receiver_queue_timeout) == SWITCH_STATUS_SUCCESS) {
                        ei_received_msg_t *received_msg = (ei_received_msg_t *) pop;
                        handle_erl_msg(ei_node, &received_msg->msg, &received_msg->buf);
                        ei_x_free(&received_msg->buf);
@@ -1505,7 +1505,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
                }
 
                while (++send_msg_count <= kazoo_globals.send_msg_batch
-                          && switch_queue_pop_timeout(ei_node->send_msgs, &pop, 20000) == SWITCH_STATUS_SUCCESS) {
+                          && ei_queue_pop(ei_node->send_msgs, &pop, ei_node->sender_queue_timeout) == SWITCH_STATUS_SUCCESS) {
                        ei_send_msg_t *send_msg = (ei_send_msg_t *) pop;
                        ei_helper_send(ei_node, &send_msg->pid, &send_msg->buf);
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sent erlang message to %s <%d.%d.%d>\n"
@@ -1518,7 +1518,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
                }
 
                /* wait for a erlang message, or timeout to check if the module is still running */
-               status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.receive_timeout);
+               status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.ei_receive_timeout);
 
                switch (status) {
                case ERL_TICK:
@@ -1629,6 +1629,9 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) {
        ei_node->created_time = switch_micro_time_now();
        ei_node->legacy = kazoo_globals.legacy_events;
        ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
+       ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
+       ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
+       ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;
 
        /* store the IP and node name we are talking with */
        switch_os_sock_put(&ei_node->socket, (switch_os_socket_t *)&nodefd, pool);