]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
[mod_verto] HEPv3 support. verto_hep
authorDmitry Verenitsin <morbit85@gmail.com>
Mon, 9 Jun 2025 10:09:41 +0000 (15:09 +0500)
committerDmitry Verenitsin <morbit85@gmail.com>
Wed, 25 Jun 2025 15:34:27 +0000 (20:34 +0500)
src/mod/endpoints/mod_verto/mod_verto.c

index 36aba6db5e29d0dbb7f43310217119739fe0ab7f..4dd10a0aef7e2b968af88f6c7b77af954bcbd640 100644 (file)
@@ -68,12 +68,36 @@ SWITCH_MODULE_DEFINITION(mod_verto, mod_verto_load, mod_verto_shutdown, mod_vert
 
 static struct globals_s verto_globals;
 
+static const uint8_t KS_HEPV3_PROTO_VERTO = 0x3d;
 
 static struct {
        switch_mutex_t *store_mutex;
        switch_hash_t *store_hash;
 } json_GLOBALS;
 
+static struct {
+       ks_socket_t socket;
+       switch_bool_t start_on_load;
+       char server[256];
+       ks_port_t port;
+       uint32_t node_id;
+       switch_bool_t use_tls;
+
+       switch_memory_pool_t *pool;
+       switch_mutex_t *mutex;
+       int enabled;
+       int running;          /* Thread is active, but connection may not be established. */
+       int accept_packets;   /* Connection is active so we can fill send buffer. */
+       int reconnect;        /* Signal capture thread to reinit the connection. */
+       int terminating;
+       switch_queue_t *queue;
+       switch_thread_t *thread;
+} capture_globals;
+
+typedef struct {
+       char *buffer;
+       ks_size_t buffer_size;
+} capture_queue_item_t;
 
 const char json_sql[] =
        "create table json_store (\n"
@@ -580,6 +604,220 @@ static void jrpc_add_result(cJSON *obj, cJSON *result)
        }
 }
 
+static void verto_hepv3_capture_send(jsock_t *jsock, char *verto_payload, ks_size_t verto_payload_size, ks_hepv3_direction_t direction)
+{
+       char *hep_data = NULL;
+       ks_hepv3_capture_params_t hep_params = {0};
+       size_t hep_size = 0;
+
+       hep_params.direction = direction;
+       hep_params.ip_family = jsock->family;
+       hep_params.remote_ip = jsock->remote_host;
+       hep_params.remote_port = jsock->remote_port;
+       hep_params.local_ip = jsock->profile->ip->local_ip;
+       hep_params.local_port = jsock->profile->ip->local_port;
+       hep_params.capture_id = capture_globals.node_id;
+       hep_params.protocol_type_id = KS_HEPV3_PROTO_VERTO;
+       hep_params.payload_size = verto_payload_size;
+       hep_params.payload = verto_payload;
+
+       hep_size = ks_hepv3_capture_create(&hep_params, &hep_data);
+
+       if (hep_size > 0) {
+               capture_queue_item_t *capture_item = NULL;
+
+               capture_item = malloc(sizeof(capture_queue_item_t));
+               capture_item->buffer = hep_data;
+               capture_item->buffer_size = hep_size;
+
+               switch_queue_trypush(capture_globals.queue, capture_item);
+       }
+}
+
+/* Manages the connection to capture server. Sends queued packets.*/
+static void *SWITCH_THREAD_FUNC verto_hepv3_capture_runtime(switch_thread_t *thread, void *obj)
+{
+       static const switch_interval_time_t queue_pop_timeout_us = 100 * 1000;
+       static const switch_interval_time_t reconnect_loop_interval_us = 100 * 1000;
+       static const uint32_t reconnect_loop_max = 50;
+       /* Actual reconnect interval = (reconnect_loop_interval_us * reconnect_loop_max) */
+       static const uint32_t verto_hepv3_reconnect_s = (uint32_t)((reconnect_loop_interval_us * reconnect_loop_max) / (1000 * 1000));
+
+       uint32_t reconnect_loop = reconnect_loop_max;
+       ks_hepv3_socket_t *capture_socket = NULL;
+       ks_pool_t *pool = NULL;
+
+       capture_globals.running = 1;
+
+       ks_pool_open(&pool);
+
+       while (capture_globals.running) {
+               capture_queue_item_t *capture_item = NULL;
+
+               /* Disconnect if signalled */
+               if (capture_globals.reconnect) {
+                       capture_globals.reconnect = 0;
+                       if (capture_socket) {
+                               ks_hepv3_socket_destroy(&capture_socket);
+                       }
+               }
+
+               /* Reconnect on init and errors. */
+               if (capture_socket == NULL) {
+                       ks_hepv3_socket_params_t hep_socket_params = {0};
+                       ks_status_t init_status;
+
+                       /* Interrupt delay between reconnections to check "running" state more frequently. */
+                       if (++reconnect_loop < 50) {
+                               switch_yield(reconnect_loop_interval_us);
+                               continue;
+                       }
+
+                       reconnect_loop = 0;
+
+                       switch_mutex_lock(capture_globals.mutex);
+
+                       hep_socket_params.server = capture_globals.server;
+                       hep_socket_params.port = capture_globals.port;
+                       hep_socket_params.use_tls = (ks_bool_t)capture_globals.use_tls;
+                       hep_socket_params.pool = pool;
+
+                       init_status = ks_hepv3_socket_init(&hep_socket_params, &capture_socket);
+
+                       switch_mutex_unlock(capture_globals.mutex);
+
+                       if (init_status != KS_STATUS_SUCCESS) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Can't establish the capture connection. Retry in [%d] sec.\n", verto_hepv3_reconnect_s);
+                               continue;
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Capture connection established.\n");
+                       }
+               }
+
+               capture_globals.accept_packets = 1;
+
+               /* Send queued packages. */
+               if (switch_queue_pop_timeout(capture_globals.queue, (void **)&capture_item, queue_pop_timeout_us) == SWITCH_STATUS_SUCCESS) {
+                       if (capture_item && capture_item->buffer) {
+                               ks_size_t size_to_send = capture_item->buffer_size;
+
+                               if ((capture_item->buffer_size) && ks_hepv3_socket_write(capture_socket, capture_item->buffer, &capture_item->buffer_size) != KS_STATUS_SUCCESS) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket due to connection error...\n");
+                                       ks_hepv3_socket_destroy(&capture_socket);
+                               } else if (capture_item->buffer_size != size_to_send) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Capture attempted/sent mismatch [%" SWITCH_SIZE_T_FMT "]/[%" SWITCH_SIZE_T_FMT "]\n", size_to_send, capture_item->buffer_size);
+                               }
+
+                               free(capture_item->buffer);
+                       }
+                       switch_safe_free(capture_item);
+               }
+               switch_yield(10);
+       }
+
+       capture_globals.accept_packets = 0;
+
+       if (capture_socket != NULL) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket...\n");
+               ks_hepv3_socket_destroy(&capture_socket);
+       }
+
+       ks_pool_close(&pool);
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Capture thread stopped.\n");
+
+       return NULL;
+}
+
+static void verto_hepv3_capture_thread_run(void)
+{
+       switch_threadattr_t *thd_attr = NULL;
+
+       if (capture_globals.thread) {
+               return;
+       }
+
+       switch_threadattr_create(&thd_attr, capture_globals.pool);
+       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+       switch_thread_create(&capture_globals.thread, thd_attr, verto_hepv3_capture_runtime, NULL, capture_globals.pool);
+}
+
+static void verto_hepv3_capture_start(void)
+{
+       if (capture_globals.enabled || capture_globals.terminating) {
+               return;
+       }
+
+       if (zstr(capture_globals.server)) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't start capture. Server is not configured.\n");
+               return;
+       }
+
+       capture_globals.enabled = SWITCH_TRUE;
+       verto_hepv3_capture_thread_run();
+}
+
+static void verto_hepv3_capture_stop(void)
+{
+       capture_queue_item_t *capture_item = NULL;
+       uint32_t count = 0;
+       switch_status_t thread_status;
+
+       if (!capture_globals.enabled) {
+               return;
+       }
+
+       capture_globals.enabled = 0;
+
+       /* Signal runtime thread to stop. */
+       capture_globals.running = 0;
+
+       if (capture_globals.thread) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for capture thread to stop...\n");
+               switch_thread_join(&thread_status, capture_globals.thread);
+               capture_globals.thread = NULL;
+       }
+
+       /* Cleanup queue */
+       while (switch_queue_trypop(capture_globals.queue, (void **)&capture_item) == SWITCH_STATUS_SUCCESS) {
+               count++;
+               if (capture_item && capture_item->buffer) {
+                       switch_safe_free(capture_item->buffer);
+               }
+               switch_safe_free(capture_item);
+       }
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleaned up [%d] queued capture packets.\n", count);
+}
+
+static void verto_hepv3_capture_terminate(void)
+{
+       capture_globals.terminating = 1;
+
+       if (capture_globals.enabled) {
+               verto_hepv3_capture_stop();
+       }
+
+       switch_queue_term(capture_globals.queue);
+       switch_mutex_destroy(capture_globals.mutex);
+       switch_core_destroy_memory_pool(&capture_globals.pool);
+}
+
+/* The separate init for data that we need even if the capture thread isn't active */
+static void verto_hepv3_capture_init(void)
+{
+       memset(&capture_globals, 0, sizeof(capture_globals));
+       capture_globals.start_on_load = SWITCH_FALSE;
+       capture_globals.node_id = KS_HEPV3_DEFAULT_NODE_ID;
+       capture_globals.port = KS_HEPV3_DEFAULT_PORT;
+       capture_globals.use_tls = SWITCH_FALSE;
+
+       switch_core_new_memory_pool(&capture_globals.pool);
+
+       switch_mutex_init(&capture_globals.mutex, SWITCH_MUTEX_DEFAULT, capture_globals.pool);
+
+       switch_queue_create(&capture_globals.queue, 1000, capture_globals.pool);
+}
+
 static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t destroy)
 {
        char *json_text;
@@ -601,14 +839,21 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t
        }
 
        if ((json_text = cJSON_PrintUnformatted(*json))) {
+               ks_size_t json_size = strlen(json_text);
+
                if (jsock->profile->debug || verto_globals.debug) {
                        //char *log_text = cJSON_Prin(*json);
                        switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "WRITE %s [%s]\n", jsock->name, json_text);
                        //free(log_text);
                }
                switch_mutex_lock(jsock->write_mutex);
-               r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
+               r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, json_size);
                switch_mutex_unlock(jsock->write_mutex);
+
+               if (capture_globals.accept_packets) {
+                       verto_hepv3_capture_send(jsock, json_text, json_size, KS_HEPV3_DIR_SEND);
+               }
+
                switch_safe_free(json_text);
        }
 
@@ -1533,9 +1778,17 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize
 
        if (json) {
 
-               if (jsock->profile->debug || verto_globals.debug) {
+               if (jsock->profile->debug || verto_globals.debug || capture_globals.accept_packets) {
                        char *log_text = cJSON_PrintUnformatted(json);
-                       switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
+
+                       if (jsock->profile->debug || verto_globals.debug) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
+                       }
+
+                       if (capture_globals.accept_packets) {
+                               verto_hepv3_capture_send(jsock, log_text, strlen(log_text), KS_HEPV3_DIR_RECV);
+                       }
+
                        free(log_text);
                }
 
@@ -5035,6 +5288,8 @@ static void do_shutdown(void)
 
        unsub_all_jsock();
 
+       verto_hepv3_capture_terminate();
+
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Done\n");
 }
 
@@ -5461,6 +5716,22 @@ static switch_status_t parse_config(const char *cf)
                                if (val) {
                                        verto_globals.kslog_on = switch_true(val);
                                }
+                       } else if (!strcasecmp(var, "capture-server") && !zstr(val)) {
+                                switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", val);
+                       } else if (!strcasecmp(var, "capture-port") && val) {
+                               int tmp = atoi(val);
+                               if (tmp > 0) {
+                                       capture_globals.port = tmp;
+                               }
+                       } else if (!strcasecmp(var, "capture-id") && val) {
+                               int tmp = atoi(val);
+                               if (tmp > 0) {
+                                       capture_globals.node_id = tmp;
+                               }
+                       } else if (!strcasecmp(var, "capture-use-tls") && val) {
+                               capture_globals.use_tls = switch_true(val);
+                       } else if (!strcasecmp(var, "capture-enable") && val) {
+                               capture_globals.start_on_load = switch_true(val);
                        }
                }
        }
@@ -5694,6 +5965,84 @@ static switch_status_t cmd_json_status(char **argv, int argc, switch_stream_hand
        return SWITCH_STATUS_SUCCESS;
 }
 
+static switch_status_t verto_hepv3_capture_set_server(const char *server, const char *str_port)
+{
+       if (zstr(server)) {
+               return SWITCH_STATUS_FALSE;
+       }
+
+       switch_mutex_lock(capture_globals.mutex);
+
+       switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", server);
+
+       if (str_port) {
+               int tmp = atoi(str_port);
+
+               if (tmp > 0) {
+                       capture_globals.port = (ks_port_t)tmp;
+               }
+       }
+
+       capture_globals.reconnect = 1;
+
+       switch_mutex_unlock(capture_globals.mutex);
+
+       return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t verto_hepv3_capture_set_id(const char *str_id)
+{
+       if (str_id) {
+               int tmp = atoi(str_id);
+
+               if (tmp > 0) {
+                       capture_globals.node_id = tmp;
+                       return SWITCH_STATUS_SUCCESS;
+               }
+       }
+
+       return SWITCH_STATUS_FALSE;
+}
+
+static switch_status_t cmd_capture(char **argv, int argc, switch_stream_handle_t *stream)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+
+       if (zstr(argv[0])) {
+               switch_mutex_lock(capture_globals.mutex);
+
+               stream->write_function(stream, "Server: %s\n", capture_globals.server);
+               stream->write_function(stream, "Port: %d\n", capture_globals.port);
+               stream->write_function(stream, "Node Id: %d\n", capture_globals.node_id);
+               stream->write_function(stream, "Use TLS: %s\n", capture_globals.use_tls ? "yes" : "no" );
+               stream->write_function(stream, "Enabled: %s\n", capture_globals.enabled ? "yes" : "no" );
+               stream->write_function(stream, "Queue: %d\n", switch_queue_size(capture_globals.queue));
+
+               switch_mutex_unlock(capture_globals.mutex);
+               return SWITCH_STATUS_SUCCESS;
+       }
+
+       if (!strcasecmp(argv[0], "id")) {
+               status = verto_hepv3_capture_set_id(argv[1]);
+       } else if (!strcasecmp(argv[0], "server")) {
+               status = verto_hepv3_capture_set_server(argv[1], argv[2]);
+       } else if (!strcasecmp(argv[0], "on")) {
+               verto_hepv3_capture_start();
+       } else if (!strcasecmp(argv[0], "off")) {
+               verto_hepv3_capture_stop();
+       } else {
+               status = SWITCH_STATUS_FALSE;
+       }
+
+       if (status == SWITCH_STATUS_SUCCESS) {
+               stream->write_function(stream, "+OK");
+       } else {
+               stream->write_function(stream, "-ERR");
+       }
+
+       return status;
+}
+
 SWITCH_STANDARD_API(verto_function)
 {
        char *argv[1024] = { 0 };
@@ -5709,6 +6058,9 @@ SWITCH_STANDARD_API(verto_function)
                "verto debug [0-10]\n"
                "verto perm <sessid> <type> <value>\n"
                "verto noperm <sessid> <type> <value>\n"
+               "verto capture [on|off]\n"
+               "verto capture server <server> [port]\n"
+               "verto capture id <id>\n"
                "--------------------------------------------------------------------------------\n";
 
        if (zstr(cmd)) {
@@ -5771,6 +6123,8 @@ SWITCH_STANDARD_API(verto_function)
                }
                stream->write_function(stream, "Debug Level: %s\n", switch_log_level2str(verto_globals.debug_level));
                goto done;
+       } else if (!strcasecmp(argv[0], "capture")) {
+               func = cmd_capture;
        }
 
        if (func) {
@@ -6814,7 +7168,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
        verto_globals.enable_presence = SWITCH_TRUE;
        verto_globals.enable_fs_events = SWITCH_FALSE;
        verto_globals.debug_level = SWITCH_LOG_INFO;
-       
+
+       verto_hepv3_capture_init();
+
        switch_mutex_init(&verto_globals.mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
 
        switch_mutex_init(&verto_globals.method_mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
@@ -6880,6 +7236,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
        switch_console_set_complete("add verto debug-level");
        switch_console_set_complete("add verto status");
        switch_console_set_complete("add verto xmlstatus");
+       switch_console_set_complete("add verto capture");
+       switch_console_set_complete("add verto capture on");
+       switch_console_set_complete("add verto capture off");
+       switch_console_set_complete("add verto capture server");
+       switch_console_set_complete("add verto capture id");
 
        SWITCH_ADD_JSON_API(json_api_interface, "store", "JSON store", json_store_function, "");
 
@@ -6910,6 +7271,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
 
        run_profiles();
 
+
+       if (capture_globals.start_on_load) {
+               verto_hepv3_capture_start();
+       }
+
        /* indicate that the module should continue to be loaded */
        return SWITCH_STATUS_SUCCESS;
 }