#define API_COMMAND_OPTION 4
#define API_NODE_OPTION_FRAMING 0
-#define API_NODE_OPTION_LEGACY 1
+#define API_NODE_OPTION_KEEPALIVE 1
+#define API_NODE_OPTION_LEGACY 2
#define API_NODE_OPTION_MAX 99
static const char *node_runtime_options[] = {
"event-stream-framing",
+ "event-stream-keepalive",
"enable-legacy",
NULL
};
stream->write_function(stream, "+OK %i", ei_node->event_stream_framing);
break;
+ case API_NODE_OPTION_KEEPALIVE:
+ stream->write_function(stream, "+OK %i", ei_node->event_stream_keepalive);
+ break;
+
case API_NODE_OPTION_LEGACY:
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
break;
}
break;
+ case API_NODE_OPTION_KEEPALIVE:
+ val = switch_true(value);
+ stream->write_function(stream, "+OK %i", val);
+ ei_node->event_stream_keepalive = val;
+ break;
+
case API_NODE_OPTION_LEGACY:
ei_node->legacy = switch_true(value);
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
stream->write_function(stream, "%s:%d -> disconnected\n"
,ip_addr, port);
} else {
- stream->write_function(stream, "%s:%d -> %s:%d\n"
+ unsigned int year, day, hour, min, sec, delta;
+
+ delta = (switch_micro_time_now() - event_stream->connected_time) / 1000000;
+ sec = delta % 60;
+ min = delta / 60 % 60;
+ hour = delta / 3600 % 24;
+ day = delta / 86400 % 7;
+ year = delta / 31556926 % 12;
+
+ stream->write_function(stream, "%s:%d -> %s:%d for %d years, %d days, %d hours, %d minutes, %d seconds\n"
,event_stream->local_ip, event_stream->local_port
- ,event_stream->remote_ip, event_stream->remote_port);
+ ,event_stream->remote_ip, event_stream->remote_port
+ ,year, day, hour, min, sec);
}
binding = event_stream->bindings;
break;
}
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %ld\n", fh.duration);
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %" SWITCH_INT64_T_FMT "\n", fh.duration);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH offset_pos %d\n", fh.offset_pos);
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %ld\n", fh.pos);
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count);
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %" SWITCH_INT64_T_FMT "\n", fh.pos);
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %" SWITCH_SIZE_T_FMT "\n", fh.sample_count);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples);
switch_safe_free(my_data);
switch_socket_t *socket;
switch_mutex_t *socket_mutex;
switch_bool_t connected;
+ switch_time_t connected_time;
char remote_ip[48];
uint16_t remote_port;
char local_ip[48];
uint32_t flags;
ei_node_t *node;
short event_stream_framing;
+ short event_stream_keepalive;
switch_interval_time_t queue_timeout;
struct ei_event_stream_s *next;
};
uint32_t flags;
int legacy;
short event_stream_framing;
+ short event_stream_keepalive;
switch_interval_time_t event_stream_queue_timeout;
switch_interval_time_t receiver_queue_timeout;
switch_interval_time_t sender_queue_timeout;
int event_stream_preallocate;
int send_msg_batch;
short event_stream_framing;
+ short event_stream_keepalive;
switch_interval_time_t event_stream_queue_timeout;
switch_port_t port;
int config_fetched;
kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
kazoo_globals.send_msg_batch = 10;
kazoo_globals.event_stream_framing = 2;
+ kazoo_globals.event_stream_keepalive = 1;
kazoo_globals.event_stream_queue_timeout = 200000;
kazoo_globals.node_receiver_queue_timeout = 100000;
kazoo_globals.node_sender_queue_timeout = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val);
kazoo_globals.event_stream_framing = atoi(val);
+ } else if (!strcmp(var, "event-stream-keep-alive")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-keep-alive: %s\n", val);
+ kazoo_globals.event_stream_keepalive = switch_true(val);
+
} else if (!strcmp(var, "io-fault-tolerance")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
kazoo_globals.io_fault_tolerance = atoi(val);
const char *ip_addr;
void *pop;
short event_stream_framing;
+ short event_stream_keepalive;
short ok = 1;
switch_atomic_inc(&kazoo_globals.threads);
switch_assert(event_stream != NULL);
event_stream_framing = event_stream->event_stream_framing;
+ event_stream_keepalive = event_stream->event_stream_keepalive;
/* figure out what socket we just opened */
switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
}
+ if (event_stream_keepalive) {
+ if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n");
+ }
+ }
+
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
}
switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
event_stream->connected = SWITCH_TRUE;
+ event_stream->connected_time = switch_micro_time_now();
+
switch_mutex_unlock(event_stream->socket_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
event_stream->connected = SWITCH_FALSE;
event_stream->node = ei_node;
event_stream->event_stream_framing = ei_node->event_stream_framing;
+ event_stream->event_stream_keepalive = ei_node->event_stream_keepalive;
event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
memcpy(&event_stream->pid, from, sizeof(erlang_pid));
switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
ei_node->created_time = switch_micro_time_now();
ei_node->legacy = kazoo_globals.legacy_events;
ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
+ ei_node->event_stream_keepalive = kazoo_globals.event_stream_keepalive;
ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;