static struct globals_s verto_globals;
+static const uint8_t KS_HEPV3_PROTO_VERTO = 0x3d;
static struct {
switch_mutex_t *store_mutex;
switch_hash_t *store_hash;
} json_GLOBALS;
+static struct {
+ ks_socket_t socket;
+ switch_bool_t start_on_load;
+ char server[256];
+ ks_port_t port;
+ uint32_t node_id;
+ switch_bool_t use_tls;
+
+ switch_memory_pool_t *pool;
+ switch_mutex_t *mutex;
+ int enabled;
+ int running; /* Thread is active, but connection may not be established. */
+ int accept_packets; /* Connection is active so we can fill send buffer. */
+ int reconnect; /* Signal capture thread to reinit the connection. */
+ int terminating;
+ switch_queue_t *queue;
+ switch_thread_t *thread;
+} capture_globals;
+
+typedef struct {
+ char *buffer;
+ ks_size_t buffer_size;
+} capture_queue_item_t;
const char json_sql[] =
"create table json_store (\n"
}
}
+static void verto_hepv3_capture_send(jsock_t *jsock, char *verto_payload, ks_size_t verto_payload_size, ks_hepv3_direction_t direction)
+{
+ char *hep_data = NULL;
+ ks_hepv3_capture_params_t hep_params = {0};
+ size_t hep_size = 0;
+
+ hep_params.direction = direction;
+ hep_params.ip_family = jsock->family;
+ hep_params.remote_ip = jsock->remote_host;
+ hep_params.remote_port = jsock->remote_port;
+ hep_params.local_ip = jsock->profile->ip->local_ip;
+ hep_params.local_port = jsock->profile->ip->local_port;
+ hep_params.capture_id = capture_globals.node_id;
+ hep_params.protocol_type_id = KS_HEPV3_PROTO_VERTO;
+ hep_params.payload_size = verto_payload_size;
+ hep_params.payload = verto_payload;
+
+ hep_size = ks_hepv3_capture_create(&hep_params, &hep_data);
+
+ if (hep_size > 0) {
+ capture_queue_item_t *capture_item = NULL;
+
+ capture_item = malloc(sizeof(capture_queue_item_t));
+ capture_item->buffer = hep_data;
+ capture_item->buffer_size = hep_size;
+
+ switch_queue_trypush(capture_globals.queue, capture_item);
+ }
+}
+
+/* Manages the connection to capture server. Sends queued packets.*/
+static void *SWITCH_THREAD_FUNC verto_hepv3_capture_runtime(switch_thread_t *thread, void *obj)
+{
+ static const switch_interval_time_t queue_pop_timeout_us = 100 * 1000;
+ static const switch_interval_time_t reconnect_loop_interval_us = 100 * 1000;
+ static const uint32_t reconnect_loop_max = 50;
+ /* Actual reconnect interval = (reconnect_loop_interval_us * reconnect_loop_max) */
+ static const uint32_t verto_hepv3_reconnect_s = (uint32_t)((reconnect_loop_interval_us * reconnect_loop_max) / (1000 * 1000));
+
+ uint32_t reconnect_loop = reconnect_loop_max;
+ ks_hepv3_socket_t *capture_socket = NULL;
+ ks_pool_t *pool = NULL;
+
+ capture_globals.running = 1;
+
+ ks_pool_open(&pool);
+
+ while (capture_globals.running) {
+ capture_queue_item_t *capture_item = NULL;
+
+ /* Disconnect if signalled */
+ if (capture_globals.reconnect) {
+ capture_globals.reconnect = 0;
+ if (capture_socket) {
+ ks_hepv3_socket_destroy(&capture_socket);
+ }
+ }
+
+ /* Reconnect on init and errors. */
+ if (capture_socket == NULL) {
+ ks_hepv3_socket_params_t hep_socket_params = {0};
+ ks_status_t init_status;
+
+ /* Interrupt delay between reconnections to check "running" state more frequently. */
+ if (++reconnect_loop < 50) {
+ switch_yield(reconnect_loop_interval_us);
+ continue;
+ }
+
+ reconnect_loop = 0;
+
+ switch_mutex_lock(capture_globals.mutex);
+
+ hep_socket_params.server = capture_globals.server;
+ hep_socket_params.port = capture_globals.port;
+ hep_socket_params.use_tls = (ks_bool_t)capture_globals.use_tls;
+ hep_socket_params.pool = pool;
+
+ init_status = ks_hepv3_socket_init(&hep_socket_params, &capture_socket);
+
+ switch_mutex_unlock(capture_globals.mutex);
+
+ if (init_status != KS_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Can't establish the capture connection. Retry in [%d] sec.\n", verto_hepv3_reconnect_s);
+ continue;
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Capture connection established.\n");
+ }
+ }
+
+ capture_globals.accept_packets = 1;
+
+ /* Send queued packages. */
+ if (switch_queue_pop_timeout(capture_globals.queue, (void **)&capture_item, queue_pop_timeout_us) == SWITCH_STATUS_SUCCESS) {
+ if (capture_item && capture_item->buffer) {
+ ks_size_t size_to_send = capture_item->buffer_size;
+
+ if ((capture_item->buffer_size) && ks_hepv3_socket_write(capture_socket, capture_item->buffer, &capture_item->buffer_size) != KS_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket due to connection error...\n");
+ ks_hepv3_socket_destroy(&capture_socket);
+ } else if (capture_item->buffer_size != size_to_send) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Capture attempted/sent mismatch [%" SWITCH_SIZE_T_FMT "]/[%" SWITCH_SIZE_T_FMT "]\n", size_to_send, capture_item->buffer_size);
+ }
+
+ free(capture_item->buffer);
+ }
+ switch_safe_free(capture_item);
+ }
+ switch_yield(10);
+ }
+
+ capture_globals.accept_packets = 0;
+
+ if (capture_socket != NULL) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket...\n");
+ ks_hepv3_socket_destroy(&capture_socket);
+ }
+
+ ks_pool_close(&pool);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Capture thread stopped.\n");
+
+ return NULL;
+}
+
+static void verto_hepv3_capture_thread_run(void)
+{
+ switch_threadattr_t *thd_attr = NULL;
+
+ if (capture_globals.thread) {
+ return;
+ }
+
+ switch_threadattr_create(&thd_attr, capture_globals.pool);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&capture_globals.thread, thd_attr, verto_hepv3_capture_runtime, NULL, capture_globals.pool);
+}
+
+static void verto_hepv3_capture_start(void)
+{
+ if (capture_globals.enabled || capture_globals.terminating) {
+ return;
+ }
+
+ if (zstr(capture_globals.server)) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't start capture. Server is not configured.\n");
+ return;
+ }
+
+ capture_globals.enabled = SWITCH_TRUE;
+ verto_hepv3_capture_thread_run();
+}
+
+static void verto_hepv3_capture_stop(void)
+{
+ capture_queue_item_t *capture_item = NULL;
+ uint32_t count = 0;
+ switch_status_t thread_status;
+
+ if (!capture_globals.enabled) {
+ return;
+ }
+
+ capture_globals.enabled = 0;
+
+ /* Signal runtime thread to stop. */
+ capture_globals.running = 0;
+
+ if (capture_globals.thread) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for capture thread to stop...\n");
+ switch_thread_join(&thread_status, capture_globals.thread);
+ capture_globals.thread = NULL;
+ }
+
+ /* Cleanup queue */
+ while (switch_queue_trypop(capture_globals.queue, (void **)&capture_item) == SWITCH_STATUS_SUCCESS) {
+ count++;
+ if (capture_item && capture_item->buffer) {
+ switch_safe_free(capture_item->buffer);
+ }
+ switch_safe_free(capture_item);
+ }
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleaned up [%d] queued capture packets.\n", count);
+}
+
+static void verto_hepv3_capture_terminate(void)
+{
+ capture_globals.terminating = 1;
+
+ if (capture_globals.enabled) {
+ verto_hepv3_capture_stop();
+ }
+
+ switch_queue_term(capture_globals.queue);
+ switch_mutex_destroy(capture_globals.mutex);
+ switch_core_destroy_memory_pool(&capture_globals.pool);
+}
+
+/* The separate init for data that we need even if the capture thread isn't active */
+static void verto_hepv3_capture_init(void)
+{
+ memset(&capture_globals, 0, sizeof(capture_globals));
+ capture_globals.start_on_load = SWITCH_FALSE;
+ capture_globals.node_id = KS_HEPV3_DEFAULT_NODE_ID;
+ capture_globals.port = KS_HEPV3_DEFAULT_PORT;
+ capture_globals.use_tls = SWITCH_FALSE;
+
+ switch_core_new_memory_pool(&capture_globals.pool);
+
+ switch_mutex_init(&capture_globals.mutex, SWITCH_MUTEX_DEFAULT, capture_globals.pool);
+
+ switch_queue_create(&capture_globals.queue, 1000, capture_globals.pool);
+}
+
static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t destroy)
{
char *json_text;
}
if ((json_text = cJSON_PrintUnformatted(*json))) {
+ ks_size_t json_size = strlen(json_text);
+
if (jsock->profile->debug || verto_globals.debug) {
//char *log_text = cJSON_Prin(*json);
switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "WRITE %s [%s]\n", jsock->name, json_text);
//free(log_text);
}
switch_mutex_lock(jsock->write_mutex);
- r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
+ r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, json_size);
switch_mutex_unlock(jsock->write_mutex);
+
+ if (capture_globals.accept_packets) {
+ verto_hepv3_capture_send(jsock, json_text, json_size, KS_HEPV3_DIR_SEND);
+ }
+
switch_safe_free(json_text);
}
if (json) {
- if (jsock->profile->debug || verto_globals.debug) {
+ if (jsock->profile->debug || verto_globals.debug || capture_globals.accept_packets) {
char *log_text = cJSON_PrintUnformatted(json);
- switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
+
+ if (jsock->profile->debug || verto_globals.debug) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
+ }
+
+ if (capture_globals.accept_packets) {
+ verto_hepv3_capture_send(jsock, log_text, strlen(log_text), KS_HEPV3_DIR_RECV);
+ }
+
free(log_text);
}
unsub_all_jsock();
+ verto_hepv3_capture_terminate();
+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Done\n");
}
if (val) {
verto_globals.kslog_on = switch_true(val);
}
+ } else if (!strcasecmp(var, "capture-server") && !zstr(val)) {
+ switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", val);
+ } else if (!strcasecmp(var, "capture-port") && val) {
+ int tmp = atoi(val);
+ if (tmp > 0) {
+ capture_globals.port = tmp;
+ }
+ } else if (!strcasecmp(var, "capture-id") && val) {
+ int tmp = atoi(val);
+ if (tmp > 0) {
+ capture_globals.node_id = tmp;
+ }
+ } else if (!strcasecmp(var, "capture-use-tls") && val) {
+ capture_globals.use_tls = switch_true(val);
+ } else if (!strcasecmp(var, "capture-enable") && val) {
+ capture_globals.start_on_load = switch_true(val);
}
}
}
return SWITCH_STATUS_SUCCESS;
}
+static switch_status_t verto_hepv3_capture_set_server(const char *server, const char *str_port)
+{
+ if (zstr(server)) {
+ return SWITCH_STATUS_FALSE;
+ }
+
+ switch_mutex_lock(capture_globals.mutex);
+
+ switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", server);
+
+ if (str_port) {
+ int tmp = atoi(str_port);
+
+ if (tmp > 0) {
+ capture_globals.port = (ks_port_t)tmp;
+ }
+ }
+
+ capture_globals.reconnect = 1;
+
+ switch_mutex_unlock(capture_globals.mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t verto_hepv3_capture_set_id(const char *str_id)
+{
+ if (str_id) {
+ int tmp = atoi(str_id);
+
+ if (tmp > 0) {
+ capture_globals.node_id = tmp;
+ return SWITCH_STATUS_SUCCESS;
+ }
+ }
+
+ return SWITCH_STATUS_FALSE;
+}
+
+static switch_status_t cmd_capture(char **argv, int argc, switch_stream_handle_t *stream)
+{
+ switch_status_t status = SWITCH_STATUS_SUCCESS;
+
+ if (zstr(argv[0])) {
+ switch_mutex_lock(capture_globals.mutex);
+
+ stream->write_function(stream, "Server: %s\n", capture_globals.server);
+ stream->write_function(stream, "Port: %d\n", capture_globals.port);
+ stream->write_function(stream, "Node Id: %d\n", capture_globals.node_id);
+ stream->write_function(stream, "Use TLS: %s\n", capture_globals.use_tls ? "yes" : "no" );
+ stream->write_function(stream, "Enabled: %s\n", capture_globals.enabled ? "yes" : "no" );
+ stream->write_function(stream, "Queue: %d\n", switch_queue_size(capture_globals.queue));
+
+ switch_mutex_unlock(capture_globals.mutex);
+ return SWITCH_STATUS_SUCCESS;
+ }
+
+ if (!strcasecmp(argv[0], "id")) {
+ status = verto_hepv3_capture_set_id(argv[1]);
+ } else if (!strcasecmp(argv[0], "server")) {
+ status = verto_hepv3_capture_set_server(argv[1], argv[2]);
+ } else if (!strcasecmp(argv[0], "on")) {
+ verto_hepv3_capture_start();
+ } else if (!strcasecmp(argv[0], "off")) {
+ verto_hepv3_capture_stop();
+ } else {
+ status = SWITCH_STATUS_FALSE;
+ }
+
+ if (status == SWITCH_STATUS_SUCCESS) {
+ stream->write_function(stream, "+OK");
+ } else {
+ stream->write_function(stream, "-ERR");
+ }
+
+ return status;
+}
+
SWITCH_STANDARD_API(verto_function)
{
char *argv[1024] = { 0 };
"verto debug [0-10]\n"
"verto perm <sessid> <type> <value>\n"
"verto noperm <sessid> <type> <value>\n"
+ "verto capture [on|off]\n"
+ "verto capture server <server> [port]\n"
+ "verto capture id <id>\n"
"--------------------------------------------------------------------------------\n";
if (zstr(cmd)) {
}
stream->write_function(stream, "Debug Level: %s\n", switch_log_level2str(verto_globals.debug_level));
goto done;
+ } else if (!strcasecmp(argv[0], "capture")) {
+ func = cmd_capture;
}
if (func) {
verto_globals.enable_presence = SWITCH_TRUE;
verto_globals.enable_fs_events = SWITCH_FALSE;
verto_globals.debug_level = SWITCH_LOG_INFO;
-
+
+ verto_hepv3_capture_init();
+
switch_mutex_init(&verto_globals.mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
switch_mutex_init(&verto_globals.method_mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
switch_console_set_complete("add verto debug-level");
switch_console_set_complete("add verto status");
switch_console_set_complete("add verto xmlstatus");
+ switch_console_set_complete("add verto capture");
+ switch_console_set_complete("add verto capture on");
+ switch_console_set_complete("add verto capture off");
+ switch_console_set_complete("add verto capture server");
+ switch_console_set_complete("add verto capture id");
SWITCH_ADD_JSON_API(json_api_interface, "store", "JSON store", json_store_function, "");
run_profiles();
+
+ if (capture_globals.start_on_load) {
+ verto_hepv3_capture_start();
+ }
+
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}