From: Dmitry Verenitsin Date: Mon, 9 Jun 2025 10:09:41 +0000 (+0500) Subject: [mod_verto] HEPv3 support. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fverto_hep;p=thirdparty%2Ffreeswitch.git [mod_verto] HEPv3 support. --- diff --git a/src/mod/endpoints/mod_verto/mod_verto.c b/src/mod/endpoints/mod_verto/mod_verto.c index 36aba6db5e..4dd10a0aef 100644 --- a/src/mod/endpoints/mod_verto/mod_verto.c +++ b/src/mod/endpoints/mod_verto/mod_verto.c @@ -68,12 +68,36 @@ SWITCH_MODULE_DEFINITION(mod_verto, mod_verto_load, mod_verto_shutdown, mod_vert static struct globals_s verto_globals; +static const uint8_t KS_HEPV3_PROTO_VERTO = 0x3d; static struct { switch_mutex_t *store_mutex; switch_hash_t *store_hash; } json_GLOBALS; +static struct { + ks_socket_t socket; + switch_bool_t start_on_load; + char server[256]; + ks_port_t port; + uint32_t node_id; + switch_bool_t use_tls; + + switch_memory_pool_t *pool; + switch_mutex_t *mutex; + int enabled; + int running; /* Thread is active, but connection may not be established. */ + int accept_packets; /* Connection is active so we can fill send buffer. */ + int reconnect; /* Signal capture thread to reinit the connection. */ + int terminating; + switch_queue_t *queue; + switch_thread_t *thread; +} capture_globals; + +typedef struct { + char *buffer; + ks_size_t buffer_size; +} capture_queue_item_t; const char json_sql[] = "create table json_store (\n" @@ -580,6 +604,220 @@ static void jrpc_add_result(cJSON *obj, cJSON *result) } } +static void verto_hepv3_capture_send(jsock_t *jsock, char *verto_payload, ks_size_t verto_payload_size, ks_hepv3_direction_t direction) +{ + char *hep_data = NULL; + ks_hepv3_capture_params_t hep_params = {0}; + size_t hep_size = 0; + + hep_params.direction = direction; + hep_params.ip_family = jsock->family; + hep_params.remote_ip = jsock->remote_host; + hep_params.remote_port = jsock->remote_port; + hep_params.local_ip = jsock->profile->ip->local_ip; + hep_params.local_port = jsock->profile->ip->local_port; + hep_params.capture_id = capture_globals.node_id; + hep_params.protocol_type_id = KS_HEPV3_PROTO_VERTO; + hep_params.payload_size = verto_payload_size; + hep_params.payload = verto_payload; + + hep_size = ks_hepv3_capture_create(&hep_params, &hep_data); + + if (hep_size > 0) { + capture_queue_item_t *capture_item = NULL; + + capture_item = malloc(sizeof(capture_queue_item_t)); + capture_item->buffer = hep_data; + capture_item->buffer_size = hep_size; + + switch_queue_trypush(capture_globals.queue, capture_item); + } +} + +/* Manages the connection to capture server. Sends queued packets.*/ +static void *SWITCH_THREAD_FUNC verto_hepv3_capture_runtime(switch_thread_t *thread, void *obj) +{ + static const switch_interval_time_t queue_pop_timeout_us = 100 * 1000; + static const switch_interval_time_t reconnect_loop_interval_us = 100 * 1000; + static const uint32_t reconnect_loop_max = 50; + /* Actual reconnect interval = (reconnect_loop_interval_us * reconnect_loop_max) */ + static const uint32_t verto_hepv3_reconnect_s = (uint32_t)((reconnect_loop_interval_us * reconnect_loop_max) / (1000 * 1000)); + + uint32_t reconnect_loop = reconnect_loop_max; + ks_hepv3_socket_t *capture_socket = NULL; + ks_pool_t *pool = NULL; + + capture_globals.running = 1; + + ks_pool_open(&pool); + + while (capture_globals.running) { + capture_queue_item_t *capture_item = NULL; + + /* Disconnect if signalled */ + if (capture_globals.reconnect) { + capture_globals.reconnect = 0; + if (capture_socket) { + ks_hepv3_socket_destroy(&capture_socket); + } + } + + /* Reconnect on init and errors. */ + if (capture_socket == NULL) { + ks_hepv3_socket_params_t hep_socket_params = {0}; + ks_status_t init_status; + + /* Interrupt delay between reconnections to check "running" state more frequently. */ + if (++reconnect_loop < 50) { + switch_yield(reconnect_loop_interval_us); + continue; + } + + reconnect_loop = 0; + + switch_mutex_lock(capture_globals.mutex); + + hep_socket_params.server = capture_globals.server; + hep_socket_params.port = capture_globals.port; + hep_socket_params.use_tls = (ks_bool_t)capture_globals.use_tls; + hep_socket_params.pool = pool; + + init_status = ks_hepv3_socket_init(&hep_socket_params, &capture_socket); + + switch_mutex_unlock(capture_globals.mutex); + + if (init_status != KS_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Can't establish the capture connection. Retry in [%d] sec.\n", verto_hepv3_reconnect_s); + continue; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Capture connection established.\n"); + } + } + + capture_globals.accept_packets = 1; + + /* Send queued packages. */ + if (switch_queue_pop_timeout(capture_globals.queue, (void **)&capture_item, queue_pop_timeout_us) == SWITCH_STATUS_SUCCESS) { + if (capture_item && capture_item->buffer) { + ks_size_t size_to_send = capture_item->buffer_size; + + if ((capture_item->buffer_size) && ks_hepv3_socket_write(capture_socket, capture_item->buffer, &capture_item->buffer_size) != KS_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket due to connection error...\n"); + ks_hepv3_socket_destroy(&capture_socket); + } else if (capture_item->buffer_size != size_to_send) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Capture attempted/sent mismatch [%" SWITCH_SIZE_T_FMT "]/[%" SWITCH_SIZE_T_FMT "]\n", size_to_send, capture_item->buffer_size); + } + + free(capture_item->buffer); + } + switch_safe_free(capture_item); + } + switch_yield(10); + } + + capture_globals.accept_packets = 0; + + if (capture_socket != NULL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket...\n"); + ks_hepv3_socket_destroy(&capture_socket); + } + + ks_pool_close(&pool); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Capture thread stopped.\n"); + + return NULL; +} + +static void verto_hepv3_capture_thread_run(void) +{ + switch_threadattr_t *thd_attr = NULL; + + if (capture_globals.thread) { + return; + } + + switch_threadattr_create(&thd_attr, capture_globals.pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&capture_globals.thread, thd_attr, verto_hepv3_capture_runtime, NULL, capture_globals.pool); +} + +static void verto_hepv3_capture_start(void) +{ + if (capture_globals.enabled || capture_globals.terminating) { + return; + } + + if (zstr(capture_globals.server)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't start capture. Server is not configured.\n"); + return; + } + + capture_globals.enabled = SWITCH_TRUE; + verto_hepv3_capture_thread_run(); +} + +static void verto_hepv3_capture_stop(void) +{ + capture_queue_item_t *capture_item = NULL; + uint32_t count = 0; + switch_status_t thread_status; + + if (!capture_globals.enabled) { + return; + } + + capture_globals.enabled = 0; + + /* Signal runtime thread to stop. */ + capture_globals.running = 0; + + if (capture_globals.thread) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for capture thread to stop...\n"); + switch_thread_join(&thread_status, capture_globals.thread); + capture_globals.thread = NULL; + } + + /* Cleanup queue */ + while (switch_queue_trypop(capture_globals.queue, (void **)&capture_item) == SWITCH_STATUS_SUCCESS) { + count++; + if (capture_item && capture_item->buffer) { + switch_safe_free(capture_item->buffer); + } + switch_safe_free(capture_item); + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleaned up [%d] queued capture packets.\n", count); +} + +static void verto_hepv3_capture_terminate(void) +{ + capture_globals.terminating = 1; + + if (capture_globals.enabled) { + verto_hepv3_capture_stop(); + } + + switch_queue_term(capture_globals.queue); + switch_mutex_destroy(capture_globals.mutex); + switch_core_destroy_memory_pool(&capture_globals.pool); +} + +/* The separate init for data that we need even if the capture thread isn't active */ +static void verto_hepv3_capture_init(void) +{ + memset(&capture_globals, 0, sizeof(capture_globals)); + capture_globals.start_on_load = SWITCH_FALSE; + capture_globals.node_id = KS_HEPV3_DEFAULT_NODE_ID; + capture_globals.port = KS_HEPV3_DEFAULT_PORT; + capture_globals.use_tls = SWITCH_FALSE; + + switch_core_new_memory_pool(&capture_globals.pool); + + switch_mutex_init(&capture_globals.mutex, SWITCH_MUTEX_DEFAULT, capture_globals.pool); + + switch_queue_create(&capture_globals.queue, 1000, capture_globals.pool); +} + static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t destroy) { char *json_text; @@ -601,14 +839,21 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t } if ((json_text = cJSON_PrintUnformatted(*json))) { + ks_size_t json_size = strlen(json_text); + if (jsock->profile->debug || verto_globals.debug) { //char *log_text = cJSON_Prin(*json); switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "WRITE %s [%s]\n", jsock->name, json_text); //free(log_text); } switch_mutex_lock(jsock->write_mutex); - r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, strlen(json_text)); + r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, json_size); switch_mutex_unlock(jsock->write_mutex); + + if (capture_globals.accept_packets) { + verto_hepv3_capture_send(jsock, json_text, json_size, KS_HEPV3_DIR_SEND); + } + switch_safe_free(json_text); } @@ -1533,9 +1778,17 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize if (json) { - if (jsock->profile->debug || verto_globals.debug) { + if (jsock->profile->debug || verto_globals.debug || capture_globals.accept_packets) { char *log_text = cJSON_PrintUnformatted(json); - switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text); + + if (jsock->profile->debug || verto_globals.debug) { + switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text); + } + + if (capture_globals.accept_packets) { + verto_hepv3_capture_send(jsock, log_text, strlen(log_text), KS_HEPV3_DIR_RECV); + } + free(log_text); } @@ -5035,6 +5288,8 @@ static void do_shutdown(void) unsub_all_jsock(); + verto_hepv3_capture_terminate(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Done\n"); } @@ -5461,6 +5716,22 @@ static switch_status_t parse_config(const char *cf) if (val) { verto_globals.kslog_on = switch_true(val); } + } else if (!strcasecmp(var, "capture-server") && !zstr(val)) { + switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", val); + } else if (!strcasecmp(var, "capture-port") && val) { + int tmp = atoi(val); + if (tmp > 0) { + capture_globals.port = tmp; + } + } else if (!strcasecmp(var, "capture-id") && val) { + int tmp = atoi(val); + if (tmp > 0) { + capture_globals.node_id = tmp; + } + } else if (!strcasecmp(var, "capture-use-tls") && val) { + capture_globals.use_tls = switch_true(val); + } else if (!strcasecmp(var, "capture-enable") && val) { + capture_globals.start_on_load = switch_true(val); } } } @@ -5694,6 +5965,84 @@ static switch_status_t cmd_json_status(char **argv, int argc, switch_stream_hand return SWITCH_STATUS_SUCCESS; } +static switch_status_t verto_hepv3_capture_set_server(const char *server, const char *str_port) +{ + if (zstr(server)) { + return SWITCH_STATUS_FALSE; + } + + switch_mutex_lock(capture_globals.mutex); + + switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", server); + + if (str_port) { + int tmp = atoi(str_port); + + if (tmp > 0) { + capture_globals.port = (ks_port_t)tmp; + } + } + + capture_globals.reconnect = 1; + + switch_mutex_unlock(capture_globals.mutex); + + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t verto_hepv3_capture_set_id(const char *str_id) +{ + if (str_id) { + int tmp = atoi(str_id); + + if (tmp > 0) { + capture_globals.node_id = tmp; + return SWITCH_STATUS_SUCCESS; + } + } + + return SWITCH_STATUS_FALSE; +} + +static switch_status_t cmd_capture(char **argv, int argc, switch_stream_handle_t *stream) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + + if (zstr(argv[0])) { + switch_mutex_lock(capture_globals.mutex); + + stream->write_function(stream, "Server: %s\n", capture_globals.server); + stream->write_function(stream, "Port: %d\n", capture_globals.port); + stream->write_function(stream, "Node Id: %d\n", capture_globals.node_id); + stream->write_function(stream, "Use TLS: %s\n", capture_globals.use_tls ? "yes" : "no" ); + stream->write_function(stream, "Enabled: %s\n", capture_globals.enabled ? "yes" : "no" ); + stream->write_function(stream, "Queue: %d\n", switch_queue_size(capture_globals.queue)); + + switch_mutex_unlock(capture_globals.mutex); + return SWITCH_STATUS_SUCCESS; + } + + if (!strcasecmp(argv[0], "id")) { + status = verto_hepv3_capture_set_id(argv[1]); + } else if (!strcasecmp(argv[0], "server")) { + status = verto_hepv3_capture_set_server(argv[1], argv[2]); + } else if (!strcasecmp(argv[0], "on")) { + verto_hepv3_capture_start(); + } else if (!strcasecmp(argv[0], "off")) { + verto_hepv3_capture_stop(); + } else { + status = SWITCH_STATUS_FALSE; + } + + if (status == SWITCH_STATUS_SUCCESS) { + stream->write_function(stream, "+OK"); + } else { + stream->write_function(stream, "-ERR"); + } + + return status; +} + SWITCH_STANDARD_API(verto_function) { char *argv[1024] = { 0 }; @@ -5709,6 +6058,9 @@ SWITCH_STANDARD_API(verto_function) "verto debug [0-10]\n" "verto perm \n" "verto noperm \n" + "verto capture [on|off]\n" + "verto capture server [port]\n" + "verto capture id \n" "--------------------------------------------------------------------------------\n"; if (zstr(cmd)) { @@ -5771,6 +6123,8 @@ SWITCH_STANDARD_API(verto_function) } stream->write_function(stream, "Debug Level: %s\n", switch_log_level2str(verto_globals.debug_level)); goto done; + } else if (!strcasecmp(argv[0], "capture")) { + func = cmd_capture; } if (func) { @@ -6814,7 +7168,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) verto_globals.enable_presence = SWITCH_TRUE; verto_globals.enable_fs_events = SWITCH_FALSE; verto_globals.debug_level = SWITCH_LOG_INFO; - + + verto_hepv3_capture_init(); + switch_mutex_init(&verto_globals.mutex, SWITCH_MUTEX_NESTED, verto_globals.pool); switch_mutex_init(&verto_globals.method_mutex, SWITCH_MUTEX_NESTED, verto_globals.pool); @@ -6880,6 +7236,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) switch_console_set_complete("add verto debug-level"); switch_console_set_complete("add verto status"); switch_console_set_complete("add verto xmlstatus"); + switch_console_set_complete("add verto capture"); + switch_console_set_complete("add verto capture on"); + switch_console_set_complete("add verto capture off"); + switch_console_set_complete("add verto capture server"); + switch_console_set_complete("add verto capture id"); SWITCH_ADD_JSON_API(json_api_interface, "store", "JSON store", json_store_function, ""); @@ -6910,6 +7271,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load) run_profiles(); + + if (capture_globals.start_on_load) { + verto_hepv3_capture_start(); + } + /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; }