uint32_t flags;
ei_node_t *node;
short event_stream_framing;
+ switch_interval_time_t queue_timeout;
struct ei_event_stream_s *next;
};
uint32_t flags;
int legacy;
short event_stream_framing;
+ switch_interval_time_t event_stream_queue_timeout;
+ switch_interval_time_t receiver_queue_timeout;
+ switch_interval_time_t sender_queue_timeout;
struct ei_node_s *next;
};
int send_all_headers;
int send_all_private_headers;
int connection_timeout;
- int receive_timeout;
+ int ei_receive_timeout;
+ switch_interval_time_t node_sender_queue_timeout;
+ switch_interval_time_t node_receiver_queue_timeout;
int receive_msg_preallocate;
int event_stream_preallocate;
int send_msg_batch;
short event_stream_framing;
+ switch_interval_time_t event_stream_queue_timeout;
switch_port_t port;
int config_fetched;
int io_fault_tolerance;
switch_hash_t *create_default_filter();
void kz_erl_init();
void kz_erl_shutdown();
+SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout);
void fetch_config();
kazoo_globals.send_all_headers = 0;
kazoo_globals.send_all_private_headers = 1;
kazoo_globals.connection_timeout = 500;
- kazoo_globals.receive_timeout = 200;
+ kazoo_globals.ei_receive_timeout = 200;
kazoo_globals.receive_msg_preallocate = 2000;
kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
kazoo_globals.send_msg_batch = 10;
kazoo_globals.event_stream_framing = 2;
+ kazoo_globals.event_stream_queue_timeout = 200000;
+ kazoo_globals.node_receiver_queue_timeout = 100000;
+ kazoo_globals.node_sender_queue_timeout = 0;
kazoo_globals.port = 0;
kazoo_globals.io_fault_tolerance = 10;
kazoo_globals.json_encoding = ERLANG_TUPLE;
kazoo_globals.connection_timeout = atoi(val);
} else if (!strcmp(var, "receive-timeout")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-timeout: %s\n", val);
- kazoo_globals.receive_timeout = atoi(val);
+ kazoo_globals.ei_receive_timeout = atoi(val);
} else if (!strcmp(var, "receive-msg-preallocate")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set receive-msg-preallocate: %s\n", val);
kazoo_globals.receive_msg_preallocate = atoi(val);
} else if (!strcmp(var, "expand-headers-on-fetch")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val);
kazoo_globals.expand_headers_on_fetch = switch_true(val);
+ } else if (!strcmp(var, "node-receiver-queue-timeout")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+ kazoo_globals.node_receiver_queue_timeout = atoi(val);
+ } else if (!strcmp(var, "node-sender-queue-timeout")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+ kazoo_globals.node_sender_queue_timeout = atoi(val);
+ } else if (!strcmp(var, "event-stream-queue-timeout")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
+ kazoo_globals.event_stream_queue_timeout = atoi(val);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val);
}
return SWITCH_STATUS_TERM;
}
+SWITCH_DECLARE(switch_status_t) ei_queue_pop(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
+{
+ if (timeout == 0) {
+ return switch_queue_trypop(queue, data);
+ } else {
+ return switch_queue_pop_timeout(queue, data, timeout);
+ }
+}
/* For Emacs:
* Local Variables:
* mode:c
}
/* if there was an event waiting in our queue send it to the client */
- if (switch_queue_pop_timeout(event_stream->queue, &pop, 200000) == SWITCH_STATUS_SUCCESS) {
+ if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) {
ei_x_buff *ebuf = (ei_x_buff *) pop;
if (event_stream->socket) {
event_stream->connected = SWITCH_FALSE;
event_stream->node = ei_node;
event_stream->event_stream_framing = ei_node->event_stream_framing;
+ event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
memcpy(&event_stream->pid, from, sizeof(erlang_pid));
switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
void *pop = NULL;
- if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) {
+ if (ei_queue_pop(ei_node->received_msgs, &pop, ei_node->receiver_queue_timeout) == SWITCH_STATUS_SUCCESS) {
ei_received_msg_t *received_msg = (ei_received_msg_t *) pop;
handle_erl_msg(ei_node, &received_msg->msg, &received_msg->buf);
ei_x_free(&received_msg->buf);
}
while (++send_msg_count <= kazoo_globals.send_msg_batch
- && switch_queue_pop_timeout(ei_node->send_msgs, &pop, 20000) == SWITCH_STATUS_SUCCESS) {
+ && ei_queue_pop(ei_node->send_msgs, &pop, ei_node->sender_queue_timeout) == SWITCH_STATUS_SUCCESS) {
ei_send_msg_t *send_msg = (ei_send_msg_t *) pop;
ei_helper_send(ei_node, &send_msg->pid, &send_msg->buf);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sent erlang message to %s <%d.%d.%d>\n"
}
/* wait for a erlang message, or timeout to check if the module is still running */
- status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.receive_timeout);
+ status = ei_xreceive_msg_tmo(ei_node->nodefd, &received_msg->msg, &received_msg->buf, kazoo_globals.ei_receive_timeout);
switch (status) {
case ERL_TICK:
ei_node->created_time = switch_micro_time_now();
ei_node->legacy = kazoo_globals.legacy_events;
ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
+ ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
+ ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
+ ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;
/* store the IP and node name we are talking with */
switch_os_sock_put(&ei_node->socket, (switch_os_socket_t *)&nodefd, pool);