4.1. file_connector
4.2. std_connector
4.3. tcp_connector
+ 4.4. unixdomain_connector
5. Inspector Modules
* tcp_connector.messages: total messages (sum)
+4.4. unixdomain_connector
+
+--------------
+
+Help: implement the unix domain stream connector
+
+Type: connector
+
+Usage: global
+
+Configuration:
+
+ * string unixdomain_connector[].connector: connector name
+ * str_list unixdomain_connector[].paths: list of paths to the remote
+ end-points
+ * bool unixdomain_connector[].conn_retries: retries to establish connection
+ enabled or not
+ * enum unixdomain_connector[].setup: stream establishment { call | answer}
+ * int unixdomain_connector[].retry_interval: retry interval in seconds
+ * int unixdomain_connector[].max_retries: maximum number of retries
+
+Peg counts:
+
+ * unixdomain_connector.messages: total messages (sum)
+
---------------------------------------------------------------------
5. Inspector Modules
$<TARGET_OBJECTS:stream_paf>
$<TARGET_OBJECTS:target_based>
$<TARGET_OBJECTS:tcp_connector>
+ $<TARGET_OBJECTS:unixdomain_connector>
$<TARGET_OBJECTS:time>
$<TARGET_OBJECTS:trace>
$<TARGET_OBJECTS:utils>
-
add_subdirectory(file_connector)
add_subdirectory(tcp_connector)
add_subdirectory(std_connector)
+add_subdirectory(unixdomain_connector)
add_library( connectors OBJECT
connectors.cc
extern const BaseApi* file_connector[];
extern const BaseApi* tcp_connector[];
extern const BaseApi* std_connector[];
+extern const BaseApi* unixdomain_connector[];
void load_connectors()
{
PluginManager::load_plugins(file_connector);
PluginManager::load_plugins(tcp_connector);
PluginManager::load_plugins(std_connector);
+ PluginManager::load_plugins(unixdomain_connector);
}
--- /dev/null
+
+add_library( unixdomain_connector OBJECT
+ unixdomain_connector.cc
+ unixdomain_connector.h
+ unixdomain_connector_config.h
+ unixdomain_connector_module.cc
+ unixdomain_connector_module.h
+)
+
+add_subdirectory(test)
--- /dev/null
+Implements a connector plugin that reads and writes messages across a IPC
+stream channel using Unix Domain sockets.
+
+Each connector implements a duplex channel, both transmit and receive. When used
+by a side_channel object, a single UnixDomainConnector object is used for both the
+transmit and receive connectors.
+
+An additional UnixDomainConnector message header is pre-pended to each message transmitted.
+This header specifies the protocol format version and the length of the message.
+This length does not include the unixdomain connector message header, but does include the
+user's message header.
+
+The unixdomain_connector Connector configuration results in ONE ConnectorCommon
+object which is used to contain a list of all Connectors being configured.
+A vector<> in the ConnectorCommon object holds individual Connector config
+objects. The ConnectorManager then uses this vector<> to instantiate the
+set of desired Connectors.
+
+UnixDomain connector configuration includes a partner path, connection retries flag,
+retry interval, maximum number of retries, connection setup direction. If the 'paths' list
+contains more than one path,
+the "per-thread" destination mode will be assumed. In this mode, each thread
+will connect to a corresponding destination path by selecting a path
+from the list based on the instance_id.
+
+The call setup depends on the conn_retries flag. If this flag is set to true, the system will
+repeatedly attempt to connect to the remote endpoint at intervals specified by retry_interval,
+which defaults to 4 seconds, until a specified maximum number of retry
+attempts has been reached. The maximum number of retry attempts is determined by the
+max_retries configuration parameter, which defaults to 5.
+
+A UnixDomain connector can be either the active partner and initiate the stream channel connection
+or can be the passive partner and expect to be called by the active side. This
+is controlled by the 'setup' configuration element.
+
+Receive messages are managed via separate thread and ring buffer queue structure.
+The thread's purpose is to read messages from the stream and insert them into the queue.
+Then the packet processing thread is able to read a whole message from the queue.
+
--- /dev/null
+
+add_cpputest( unixdomain_connector_test
+ SOURCES
+ ../unixdomain_connector.cc
+ ../../../framework/module.cc
+ ../../../managers/connector_manager.cc
+ LIBS
+ ${CMAKE_THREAD_LIBS_INIT}
+)
+
+add_cpputest( unixdomain_connector_module_test
+ SOURCES
+ ../unixdomain_connector_module.cc
+ ../../../framework/module.cc
+ ../../../framework/parameter.cc
+ ../../../framework/value.cc
+ ../../../sfip/sf_ip.cc
+ $<TARGET_OBJECTS:catch_tests>
+ LIBS
+ ${DNET_LIBRARIES}
+)
+
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector_module_test.cc author Umang Sharma <umasharm@cisco.com>
+// unit test main
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "connectors/unixdomain_connector/unixdomain_connector_module.h"
+#include "main/thread_config.h"
+#include "profiler/profiler.h"
+
+#include <CppUTest/CommandLineTestRunner.h>
+#include <CppUTest/TestHarness.h>
+
+using namespace snort;
+
+
+THREAD_LOCAL SimpleStats unixdomain_connector_stats;
+THREAD_LOCAL ProfileStats unixdomain_connector_perfstats;
+
+void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
+void show_stats(PegCount*, const PegInfo*, const std::vector<unsigned>&, const char*, FILE*) { }
+
+unsigned instance_max = 1;
+namespace snort
+{
+char* snort_strdup(const char* s)
+{ return strdup(s); }
+
+unsigned get_instance_id()
+{ return 0; }
+
+unsigned ThreadConfig::get_instance_max()
+{ return instance_max; }
+
+void ParseError(const char*, ...) { }
+}
+
+TEST_GROUP(unixdomain_connector_module)
+{
+};
+
+TEST(unixdomain_connector_module, test_call)
+{
+ Value connector_val("unixdomain-c");
+ Value base_path_val("/tmp/pub_sub1 /tmp/pub_sub2");
+ Value setup_val("call");
+
+ Parameter connector_param =
+ {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
+ Parameter base_path_param =
+ {"paths", Parameter::PT_STR_LIST, nullptr, nullptr, "list of paths"};
+ Parameter setup_param =
+ {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
+
+ UnixDomainConnectorModule module;
+
+ base_path_val.set(&base_path_param);
+ CHECK(true == base_path_param.validate(base_path_val));
+ setup_val.set(&setup_param);
+ CHECK(true == setup_param.validate(setup_val));
+ connector_val.set(&connector_param);
+ CHECK(true == connector_param.validate(connector_val));
+
+ instance_max = 2;
+
+ CHECK(module.begin("unixdomain_connector", 0, nullptr));
+ CHECK(module.begin("unixdomain_connector", 1, nullptr));
+ CHECK(module.set("unixdomain_connector.paths", base_path_val, nullptr));
+ CHECK(module.set("unixdomain_connector.connector", connector_val, nullptr));
+ CHECK(module.set("unixdomain_connector.setup", setup_val, nullptr));
+ CHECK(module.end("unixdomain_connector", 1, nullptr));
+ CHECK(module.end("unixdomain_connector", 0, nullptr));
+
+ ConnectorConfig::ConfigSet config_set = module.get_and_clear_config();
+
+ CHECK(1 == config_set.size());
+
+ const UnixDomainConnectorConfig& config = static_cast<const UnixDomainConnectorConfig&>(*config_set.front());
+ CHECK("/tmp/pub_sub1" == config.paths[0]);
+ CHECK("/tmp/pub_sub2" == config.paths[1]);
+ CHECK(UnixDomainConnectorConfig::Setup::CALL == config.setup);
+ CHECK("unixdomain-c" == config.connector_name);
+ CHECK(Connector::CONN_DUPLEX == config.direction);
+
+ CHECK(nullptr != module.get_pegs());
+ CHECK(nullptr != module.get_counts());
+ CHECK(nullptr != module.get_profile());
+}
+
+TEST(unixdomain_connector_module, test_paths_count_failure)
+{
+ Value connector_val("unixdomain-a");
+ Value base_path_val("/tmp/pub_sub1");
+ Value setup_val("answer");
+ Parameter connector_param =
+ {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
+ Parameter base_path_param =
+ {"paths", Parameter::PT_STR_LIST, nullptr, nullptr, "list of paths"};
+ Parameter setup_param =
+ {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
+
+ UnixDomainConnectorModule module;
+
+ base_path_val.set(&base_path_param);
+ CHECK(true == base_path_param.validate(base_path_val));
+ setup_val.set(&setup_param);
+ CHECK(true == setup_param.validate(setup_val));
+ connector_val.set(&connector_param);
+ CHECK(true == connector_param.validate(connector_val));
+
+ CHECK(module.begin("unixdomain_connector", 0, nullptr));
+ CHECK(module.begin("unixdomain_connector", 1, nullptr));
+ CHECK(module.set("unixdomain_connector.paths", base_path_val, nullptr));
+ CHECK(module.set("unixdomain_connector.connector", connector_val, nullptr));
+ CHECK(module.set("unixdomain_connector.setup", setup_val, nullptr));
+ CHECK(module.end("unixdomain_connector", 1, nullptr));
+ CHECK(module.end("unixdomain_connector", 0, nullptr));
+
+ ConnectorConfig::ConfigSet config_set = module.get_and_clear_config();
+
+ CHECK(1 == config_set.size());
+
+ const UnixDomainConnectorConfig& config = static_cast<const UnixDomainConnectorConfig&>(*config_set.front());
+ CHECK("/tmp/pub_sub1" == config.paths[0]);
+ CHECK(config.setup == UnixDomainConnectorConfig::Setup::ANSWER);
+ CHECK("unixdomain-a" == config.connector_name);
+ CHECK(Connector::CONN_DUPLEX == config.direction);
+}
+
+TEST(unixdomain_connector_module, test_answer)
+{
+ Value connector_val("unixdomain-a");
+ Value base_path_val("/tmp/pub_sub1 /tmp/pub_sub2");
+ Value setup_val("answer");
+ Parameter connector_param =
+ {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
+ Parameter base_path_param =
+ {"paths", Parameter::PT_STR_LIST, nullptr, nullptr, "list of paths"};
+ Parameter setup_param =
+ {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
+
+ UnixDomainConnectorModule module;
+
+ base_path_val.set(&base_path_param);
+ CHECK(true == base_path_param.validate(base_path_val));
+ setup_val.set(&setup_param);
+ CHECK(true == setup_param.validate(setup_val));
+ connector_val.set(&connector_param);
+ CHECK(true == connector_param.validate(connector_val));
+
+ instance_max = 3;
+ CHECK(module.begin("unixdomain_connector", 0, nullptr));
+ CHECK(module.begin("unixdomain_connector", 1, nullptr));
+ CHECK(module.set("unixdomain_connector.paths", base_path_val, nullptr));
+ CHECK(module.set("unixdomain_connector.connector", connector_val, nullptr));
+ CHECK(module.set("unixdomain_connector.setup", setup_val, nullptr));
+ CHECK(module.end("unixdomain_connector", 1, nullptr) == false);
+ CHECK(module.end("unixdomain_connector", 0, nullptr));
+}
+
+int main(int argc, char** argv)
+{
+ return CommandLineTestRunner::RunAllTests(argc, argv);
+}
+
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector_test.cc author Umang Sharma <umasharm@cisco.com>
+// unit test main
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "connectors/unixdomain_connector/unixdomain_connector.h"
+#include "connectors/unixdomain_connector/unixdomain_connector_module.h"
+#include "managers/connector_manager.h"
+
+#include <netdb.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include "main/thread_config.h"
+
+#include <CppUTest/CommandLineTestRunner.h>
+#include <CppUTest/TestHarness.h>
+
+using namespace snort;
+
+extern const BaseApi* unixdomain_connector;
+const ConnectorApi* unixdomainc_api = nullptr;
+
+static unsigned s_instance = 0;
+static unsigned char* s_rec_message = nullptr;
+static size_t s_rec_message_size = 0;
+static int s_socket_return = 1;
+static int s_bind_return = 0;
+static int s_listen_return = 0;
+static int s_accept_return = 2;
+static int s_connect_return = 1;
+static bool s_poll_error = false;
+static bool s_poll_undesirable = false;
+static bool s_poll_data_available = false;
+static int s_rec_error = 0;
+static int s_rec_error_size = -1;
+static bool s_rec_return_zero = false;
+
+static int s_send_ret_header = sizeof(UnixDomainConnectorMsgHdr);
+static int s_send_ret_other = 0;
+
+UnixDomainConnectorConfig connector_config;
+
+Module* mod;
+
+ConnectorCommon* connector_common;
+
+Connector* connector;
+
+void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
+void show_stats(PegCount*, const PegInfo*, const std::vector<unsigned>&, const char*, FILE*) { }
+
+namespace snort
+{
+unsigned get_instance_id()
+{ return s_instance; }
+unsigned ThreadConfig::get_instance_max() { return 1; }
+
+void ErrorMessage(const char*, ...) { }
+void LogMessage(const char*, ...) { }
+void ParseError(const char*, ...) { }
+
+}
+
+int connect (int, const struct sockaddr*, socklen_t) { return s_connect_return; }
+ssize_t send (int, const void*, size_t n, int)
+{
+ if ( n == sizeof(UnixDomainConnectorMsgHdr) )
+ return s_send_ret_header;
+ else
+ return s_send_ret_other;
+}
+
+int poll (struct pollfd* fds, nfds_t nfds, int)
+{
+ if ( s_poll_error )
+ return -1;
+
+ fds[0].revents = 0;
+ if ( s_poll_undesirable )
+ {
+ fds[0].revents |= POLLHUP;
+ return 1;
+ }
+
+ if ( (nfds > 0) && s_poll_data_available )
+ {
+ fds[0].revents |= POLLIN;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+ssize_t recv (int, void *buf, size_t n, int)
+{
+ if ( (s_rec_error_size == -1) ||
+ (s_rec_error_size == (int)n) )
+ {
+ if ( s_rec_return_zero )
+ return 0;
+
+ if ( (errno = s_rec_error) != 0 )
+ {
+ s_rec_error = 0;
+ return -1;
+ }
+ }
+
+ if ( (s_rec_message != nullptr) && (s_rec_message_size >= n) )
+ {
+ memcpy( buf, s_rec_message, n);
+ s_rec_message_size -= n;
+ s_rec_message += n;
+ return (ssize_t)n;
+ }
+ else
+ return 0;
+}
+
+#ifdef __GLIBC__
+int socket (int, int, int) __THROW { return s_socket_return; }
+int bind (int, const struct sockaddr*, socklen_t) __THROW { return s_bind_return; }
+int listen (int, int) __THROW { return s_listen_return; }
+#else
+int socket (int, int, int) { return s_socket_return; }
+int bind (int, const struct sockaddr*, socklen_t) { return s_bind_return; }
+int listen (int, int) { return s_listen_return; }
+#endif
+
+int accept (int, struct sockaddr*, socklen_t*) { return s_accept_return; }
+int close (int) { return 0; }
+
+static void set_normal_status()
+{
+ s_instance = 0;
+ s_rec_message = nullptr;
+ s_rec_message_size = 0;
+ s_socket_return = 1;
+ s_bind_return = 0;
+ s_listen_return = 0;
+ s_accept_return = 2;
+ s_send_ret_header = sizeof(UnixDomainConnectorMsgHdr);
+ s_send_ret_other = 0;
+ s_connect_return = 1;
+ s_send_ret_header = sizeof(UnixDomainConnectorMsgHdr);
+ s_send_ret_other = 0;
+ s_poll_error = false;
+ s_poll_undesirable = false;
+ s_poll_data_available = false;
+ s_rec_error = 0;
+ s_rec_error_size = -1;
+ s_rec_return_zero = false;
+}
+
+UnixDomainConnectorModule::UnixDomainConnectorModule() :
+ Module("UnixDomainC", "UnixDomainC Help", nullptr)
+{ }
+
+ConnectorConfig::ConfigSet UnixDomainConnectorModule::get_and_clear_config()
+{
+ return ConnectorConfig::ConfigSet();
+}
+
+ProfileStats* UnixDomainConnectorModule::get_profile() const { return nullptr; }
+
+bool UnixDomainConnectorModule::set(const char*, Value&, SnortConfig*) { return true; }
+bool UnixDomainConnectorModule::begin(const char*, int, SnortConfig*) { return true; }
+bool UnixDomainConnectorModule::end(const char*, int, SnortConfig*) { return true; }
+
+const PegInfo* UnixDomainConnectorModule::get_pegs() const { return nullptr; }
+PegCount* UnixDomainConnectorModule::get_counts() const { return nullptr; }
+
+TEST_GROUP(unixdomain_connector)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::CALL;
+ connector_config.async_receive = false;
+ }
+
+ void teardown() override
+ { connector_config.paths = std::move(std::vector<std::string>()); }
+};
+
+TEST(unixdomain_connector, mod_ctor_dtor)
+{
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ unixdomain_connector->mod_dtor(mod);
+}
+
+TEST(unixdomain_connector, mod_instance_ctor_dtor)
+{
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+}
+
+TEST_GROUP(unixdomain_connector_call_error)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::CALL;
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ }
+
+ void teardown() override
+ {
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector == nullptr);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST_GROUP(unixdomain_connector_call_other)
+{
+ void teardown()
+ { connector_config.paths = std::move(std::vector<std::string>()); }
+};
+
+TEST_GROUP(unixdomain_connector_answer_error)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain-a";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::ANSWER;
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ }
+
+ void teardown() override
+ {
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector == nullptr);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST(unixdomain_connector_call_error, bad_socket)
+{
+ s_socket_return = -1;
+}
+
+TEST(unixdomain_connector_call_error, bad_connect)
+{
+ s_connect_return = -1;
+}
+
+TEST(unixdomain_connector_answer_error, bad_socket)
+{
+ s_socket_return = -1;
+}
+
+TEST(unixdomain_connector_answer_error, bad_bind)
+{
+ s_bind_return = -1;
+}
+
+TEST(unixdomain_connector_answer_error, bad_listen)
+{
+ s_listen_return = -1;
+}
+
+TEST(unixdomain_connector_answer_error, bad_accept)
+{
+ s_accept_return = -1;
+}
+
+TEST(unixdomain_connector_call_other, bad_setup)
+{
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ s_instance = 0;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = (UnixDomainConnectorConfig::Setup)(-1);
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector == nullptr);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+}
+
+TEST_GROUP(unixdomain_connector_tinit_tterm_thread_call)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ s_instance = 0;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::CALL;
+ connector_config.async_receive = true;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
+ }
+
+ void teardown() override
+ {
+ unixdomainc_api->tterm(connector);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST_GROUP(unixdomain_connector_tinit_tterm_call)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ s_instance = 0;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::CALL;
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
+ }
+
+ void teardown() override
+ {
+ unixdomainc_api->tterm(connector);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST_GROUP(unixdomain_connector_no_tinit_tterm_call)
+{
+ void setup() override
+ {
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ s_instance = 0;
+ set_normal_status();
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain";
+ connector_config.paths.push_back("/tmp/pub_sub1");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::CALL;
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ }
+
+ void teardown() override
+ {
+ unixdomainc_api->tterm(connector);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST(unixdomain_connector_no_tinit_tterm_call, poll_error)
+{
+ s_poll_error = true;
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + 10;
+ uint8_t* message = new uint8_t[size];
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = 10;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+
+ const ConnectorMsg msg = unixdomainc->receive_message(false);
+ CHECK(msg.get_data() == nullptr);
+ CHECK(msg.get_length() == 0);
+
+ delete[] message;
+}
+
+TEST_GROUP(unixdomain_connector_tinit_tterm_answer)
+{
+ void setup() override
+ {
+ s_instance = 0;
+ set_normal_status();
+ unixdomainc_api = (const ConnectorApi*) unixdomain_connector;
+ connector_config.direction = Connector::CONN_DUPLEX;
+ connector_config.connector_name = "unixdomain-a";
+ connector_config.paths.push_back("/tmp/pub_sub2");
+ connector_config.setup = UnixDomainConnectorConfig::Setup::ANSWER;
+ connector_config.async_receive = false;
+ CHECK(unixdomain_connector != nullptr);
+ mod = unixdomain_connector->mod_ctor();
+ CHECK(mod != nullptr);
+ connector_common = unixdomainc_api->ctor(mod);
+ CHECK(connector_common != nullptr);
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
+ CHECK(connector != nullptr);
+ }
+
+ void teardown() override
+ {
+ unixdomainc_api->tterm(connector);
+ unixdomainc_api->dtor(connector_common);
+ unixdomain_connector->mod_dtor(mod);
+ connector_config.paths = std::move(std::vector<std::string>());
+ }
+};
+
+TEST(unixdomain_connector_tinit_tterm_call, alloc_transmit)
+{
+ const uint32_t len = 40;
+ const uint8_t* data = new uint8_t[len];
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+ set_normal_status();
+
+ ConnectorMsg msg(data, len, true);
+
+ CHECK(msg.get_length() == len);
+ CHECK(msg.get_data() == data);
+
+ s_send_ret_other = len;
+ CHECK(unixdomainc->transmit_message(msg) == true);
+ CHECK(unixdomainc->transmit_message(std::move(msg)) == true);
+}
+
+TEST(unixdomain_connector_tinit_tterm_call, alloc_transmit_header_fail)
+{
+ const uint32_t len = 40;
+ const uint8_t* data = new uint8_t[len];
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+ set_normal_status();
+
+ ConnectorMsg msg(data, len, true);
+
+ CHECK(msg.get_length() == len);
+ CHECK(msg.get_data() == data);
+
+ s_send_ret_header = sizeof(UnixDomainConnectorMsgHdr)-1;
+ s_send_ret_other = len;
+ CHECK(unixdomainc->transmit_message(msg) == false);
+ CHECK(unixdomainc->transmit_message(std::move(msg)) == false);
+}
+
+TEST(unixdomain_connector_tinit_tterm_call, alloc_transmit_body_fail)
+{
+ const uint32_t len = 40;
+ const uint8_t* data = new uint8_t[len];
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+ set_normal_status();
+
+ ConnectorMsg msg(data, len, true);
+
+ CHECK(msg.get_length() == len);
+ CHECK(msg.get_data() == data);
+
+ s_send_ret_other = 30;
+ CHECK(unixdomainc->transmit_message(msg) == false);
+ CHECK(unixdomainc->transmit_message(std::move(msg)) == false);
+}
+
+TEST(unixdomain_connector_tinit_tterm_call, alloc_transmit_no_sock)
+{
+ const uint32_t len = 40;
+ const uint8_t* data = new uint8_t[len];
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ ConnectorMsg msg(data, len, true);
+
+ CHECK(msg.get_length() == len);
+ CHECK(msg.get_data() == data);
+
+ unixdomainc->sock_fd = -1;
+ CHECK(unixdomainc->transmit_message(msg) == false);
+ CHECK(unixdomainc->transmit_message(std::move(msg)) == false);
+}
+
+TEST(unixdomain_connector_tinit_tterm_call, receive_no_sock)
+{
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+ unixdomainc->sock_fd = -1;
+ const ConnectorMsg msg = unixdomainc->receive_message(false);
+ CHECK(msg.get_data() == nullptr);
+ CHECK(msg.get_length() == 0);
+}
+
+TEST(unixdomain_connector_tinit_tterm_call, receive)
+{
+ const uint32_t cmsg_len = 10;
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_length() == cmsg_len);
+ CHECK(memcmp(conn_msg.get_data(), (message+sizeof(UnixDomainConnectorMsgHdr)), cmsg_len) == 0);
+
+ delete[] message;
+
+ conn_msg = std::move(unixdomainc->receive_message(false));
+ CHECK(conn_msg.get_data() == nullptr);
+ CHECK(conn_msg.get_length() == 0);
+}
+
+TEST(unixdomain_connector_no_tinit_tterm_call, receive_wrong_version)
+{
+ const uint32_t cmsg_len = 10;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION+1;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ const ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_data() == nullptr);
+ CHECK(conn_msg.get_length() == 0);
+ delete[] message;
+}
+
+TEST(unixdomain_connector_no_tinit_tterm_call, receive_recv_error_EAGAIN)
+{
+ const uint32_t cmsg_len = 10;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+ s_rec_error = EAGAIN;
+
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ const ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_length() == cmsg_len);
+ CHECK(memcmp(conn_msg.get_data(), (message+sizeof(UnixDomainConnectorMsgHdr)), cmsg_len) == 0);
+
+ delete[] message;
+}
+
+TEST(unixdomain_connector_no_tinit_tterm_call, receive_recv_error_EBADF)
+{
+ const uint32_t cmsg_len = 10;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+ s_rec_error = EBADF;
+
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ const ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_length() == cmsg_len);
+ CHECK(memcmp(conn_msg.get_data(), (message+sizeof(UnixDomainConnectorMsgHdr)), cmsg_len) == 0);
+
+ delete[] message;
+}
+
+TEST(unixdomain_connector_no_tinit_tterm_call, receive_recv_closed)
+{
+ const uint32_t cmsg_len = 10;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+ s_rec_return_zero = true;
+
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ const ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_data() == nullptr);
+ CHECK(conn_msg.get_length() == 0);
+
+ delete[] message;
+}
+
+TEST(unixdomain_connector_no_tinit_tterm_call, receive_recv_body_closed)
+{
+ const uint32_t cmsg_len = 10;
+ size_t size = sizeof(UnixDomainConnectorMsgHdr) + cmsg_len;
+ uint8_t* message = new uint8_t[size];
+
+ for (int i = sizeof(UnixDomainConnectorMsgHdr); i < (int)size; i++ )
+ message[i] = i;
+
+ UnixDomainConnectorMsgHdr* hdr = (UnixDomainConnectorMsgHdr*)message;
+ hdr->version = UNIXDOMAIN_FORMAT_VERSION;
+ hdr->connector_msg_length = cmsg_len;
+ s_rec_error_size = cmsg_len; // only indicate the error on the cmsg_len byte recv()
+ s_rec_message = message;
+ s_rec_message_size = size; // also trigger the read action
+ s_poll_data_available = true;
+ s_rec_return_zero = true;
+
+ connector = unixdomainc_api->tinit(connector_config);
+ CHECK(connector != nullptr);
+ UnixDomainConnector* unixdomainc = (UnixDomainConnector*)connector;
+
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ unixdomainc->process_receive();
+ const ConnectorMsg conn_msg = unixdomainc->receive_message(false);
+
+ CHECK(conn_msg.get_data() == nullptr);
+ CHECK(conn_msg.get_length() == 0);
+
+ delete[] message;
+}
+
+int main(int argc, char** argv)
+{
+ int return_value = CommandLineTestRunner::RunAllTests(argc, argv);
+ return return_value;
+}
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector.cc author Umang Sharma <umasharm@cisco.com>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+
+#include "unixdomain_connector.h"
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <poll.h>
+#include <cstring>
+#include <iostream>
+#include <fcntl.h>
+#include <unordered_map>
+
+#include "log/messages.h"
+#include "profiler/profiler_defs.h"
+
+#include "unixdomain_connector_module.h"
+
+using namespace snort;
+/* Globals ****************************************************************/
+
+THREAD_LOCAL SimpleStats unixdomain_connector_stats;
+THREAD_LOCAL ProfileStats unixdomain_connector_perfstats;
+
+/* Module *****************************************************************/
+
+static bool attempt_connection(int& sfd, const char* path) {
+ sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sfd == -1) {
+ ErrorMessage("UnixDomainC: socket error: %s \n", strerror(errno));
+ return false;
+ }
+
+ // Set the socket to non-blocking mode
+ int flags = fcntl(sfd, F_GETFL, 0);
+ if (flags == -1) {
+ ErrorMessage("UnixDomainC: fcntl(F_GETFL) error: %s \n", strerror(errno));
+ close(sfd);
+ return false;
+ }
+
+ if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ ErrorMessage("UnixDomainC: fcntl(F_SETFL) error: %s \n", strerror(errno));
+ close(sfd);
+ return false;
+ }
+
+ struct sockaddr_un addr;
+ memset(&addr, 0, sizeof(struct sockaddr_un));
+ addr.sun_family = AF_UNIX;
+ strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
+
+ if (connect(sfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) {
+ if (errno != EINPROGRESS) {
+ ErrorMessage("UnixDomainC: connect error: %s \n", strerror(errno));
+ close(sfd);
+ return false;
+ }
+ }
+ return true;
+}
+
+// Function to handle connection retries
+static void connection_retry_handler(const UnixDomainConnectorConfig& cfg, size_t idx) {
+ ConnectorManager::update_thread_connector(cfg.connector_name, idx, nullptr);
+
+ if ( cfg.setup == UnixDomainConnectorConfig::Setup::CALL and cfg.conn_retries) {
+
+ const auto& paths = cfg.paths;
+
+ if (idx >= paths.size())
+ return;
+
+ uint32_t retry_count = 0;
+ const char* path = paths[idx].c_str();
+
+ while (retry_count < cfg.max_retries) {
+ int sfd;
+ if (attempt_connection(sfd, path)) {
+ // Connection successful
+ UnixDomainConnector* unixdomain_conn = new UnixDomainConnector(cfg, sfd, idx);
+ LogMessage("UnixDomainC: Connected to %s", path);
+ ConnectorManager::update_thread_connector(cfg.connector_name, idx, unixdomain_conn);
+ break;
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(cfg.retry_interval));
+ retry_count++;
+ }
+ }
+}
+
+static void start_retry_thread(const UnixDomainConnectorConfig& cfg, size_t idx) {
+ std::thread retry_thread(connection_retry_handler, cfg, idx);
+ retry_thread.detach();
+}
+
+UnixDomainConnector::UnixDomainConnector(const UnixDomainConnectorConfig& unixdomain_connector_config, int sfd, size_t idx)
+ : Connector(unixdomain_connector_config), sock_fd(sfd), run_thread(false), receive_thread(nullptr),
+ receive_ring(new ReceiveRing(50)), instance_id(idx), cfg(unixdomain_connector_config) {
+ if (unixdomain_connector_config.async_receive) {
+ start_receive_thread();
+ }
+}
+
+UnixDomainConnector::~UnixDomainConnector() {
+ stop_receive_thread();
+ delete receive_ring;
+ if (fcntl(sock_fd, F_GETFD) == -1) {
+ if (errno == EBADF) {
+ LogMessage("UnixDomainC: Socket %d already closed \n", sock_fd);
+ return;
+ }
+ }
+
+ close(sock_fd);
+}
+
+enum ReadDataOutcome { SUCCESS = 0, TRUNCATED, ERROR, CLOSED, PARTIAL, AGAIN };
+
+static ReadDataOutcome read_data(int sockfd, uint8_t *data, uint16_t length, ssize_t& read_offset)
+{
+ ssize_t bytes_read, offset;
+
+ offset = read_offset;
+ bytes_read = recv(sockfd, data + offset, length - offset, 0);
+ if (bytes_read == 0)
+ {
+ if ( offset != 0 )
+ return TRUNCATED;
+ return CLOSED;
+ }
+ if ( bytes_read == -1 )
+ {
+ if (errno == EAGAIN || errno == EINTR)
+ {
+ if (offset > 0)
+ return PARTIAL;
+ return AGAIN;
+ }
+ return ERROR;
+ }
+ read_offset = offset + bytes_read;
+ if ((offset + bytes_read) < length)
+ return PARTIAL;
+
+ return SUCCESS;
+}
+
+static ReadDataOutcome read_message_data(int sockfd, uint16_t length, uint8_t *data)
+{
+ if ( length > 0 )
+ {
+ ReadDataOutcome rval;
+ do
+ {
+ ssize_t offset = 0;
+ rval = read_data(sockfd, data, length, offset);
+ } while (rval == PARTIAL || rval == AGAIN);
+
+ if (rval != SUCCESS)
+ return rval;
+ }
+
+ return SUCCESS;
+}
+
+
+ConnectorMsg* UnixDomainConnector::read_message()
+{
+ UnixDomainConnectorMsgHdr hdr;
+ ReadDataOutcome outcome;
+
+ outcome = read_message_data(sock_fd, sizeof(hdr), (uint8_t*)&hdr);
+ if (outcome != SUCCESS)
+ {
+ if (outcome == CLOSED)
+ LogMessage("UnixDomainC Input Thread: Connection closed\n");
+ else
+ ErrorMessage("UnixDomainC Input Thread: Unable to receive message header: %d\n", (int)outcome);
+ return nullptr;
+ }
+
+ if (hdr.version != UNIXDOMAIN_FORMAT_VERSION)
+ {
+ ErrorMessage("UnixDomainC Input Thread: Received header with invalid version 0x%d\n", (int)hdr.version);
+ return nullptr;
+ }
+
+ uint8_t* data = new uint8_t[hdr.connector_msg_length];
+
+ if ((outcome = read_message_data(sock_fd, hdr.connector_msg_length, data)) != SUCCESS)
+ {
+ if (outcome == CLOSED)
+ LogMessage("UnixDomainC Input Thread: Connection closed while reading message data \n");
+ else
+ ErrorMessage("UnixDomainC Input Thread: Unable to receive local message data: %d\n", (int)outcome);
+ delete[] data;
+ return nullptr;
+ }
+
+ return new ConnectorMsg(data, hdr.connector_msg_length, true);
+}
+
+void UnixDomainConnector::process_receive() {
+ struct pollfd pfds[1];
+ int rval;
+
+ pfds[0].events = POLLIN;
+ pfds[0].fd = sock_fd;
+ rval = poll(pfds, 1, 1000);
+ if (rval == -1) {
+ if (errno != EINTR) {
+ char error_msg[1024] = { '\0' };
+ if (strerror_r(errno, error_msg, sizeof(error_msg)) == 0)
+ ErrorMessage("UnixDomainC Input Thread: Error polling on socket %d: %s\n", pfds[0].fd, error_msg);
+ else
+ ErrorMessage("UnixDomainC Input Thread: Error polling on socket %d: (%d)\n", pfds[0].fd, errno);
+ }
+ return;
+ }
+ else if ((pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)) != 0)
+ {
+ ErrorMessage("UnixDomainC Input Thread: Undesirable return event while polling on socket %d: 0x%x\n",
+ pfds[0].fd, pfds[0].revents);
+
+ run_thread.store(false, std::memory_order_relaxed);
+
+ if (sock_fd != -1)
+ {
+ close(sock_fd);
+ sock_fd = -1;
+ }
+
+ start_retry_thread(cfg, instance_id);
+ return;
+ }
+ else if (rval > 0 && pfds[0].revents & POLLIN) {
+ ConnectorMsg* connector_msg = read_message();
+ if (connector_msg && !receive_ring->put(connector_msg)) {
+ ErrorMessage("UnixDomainC: Input Thread: overrun\n");
+ delete connector_msg;
+ }
+ }
+}
+
+void UnixDomainConnector::receive_processing_thread() {
+ while (run_thread.load(std::memory_order_relaxed)) {
+ process_receive();
+ }
+}
+
+void UnixDomainConnector::start_receive_thread() {
+ run_thread.store(true, std::memory_order_relaxed);
+ receive_thread = new std::thread(&UnixDomainConnector::receive_processing_thread, this);
+}
+
+void UnixDomainConnector::stop_receive_thread() {
+
+ if (receive_thread != nullptr) {
+ run_thread.store(false, std::memory_order_relaxed);
+ if (receive_thread->joinable()) {
+ receive_thread->join();
+ }
+ delete receive_thread;
+ receive_thread = nullptr;
+ }
+}
+
+bool UnixDomainConnector::internal_transmit_message(const ConnectorMsg& msg) {
+ if (!msg.get_data() || msg.get_length() == 0)
+ return false;
+
+ if (sock_fd < 0) {
+ ErrorMessage("UnixDomainC: transmitting to a closed socket\n");
+ return false;
+ }
+
+ UnixDomainConnectorMsgHdr unixdomainc_hdr(msg.get_length());
+
+ if ( send( sock_fd, (const char*)&unixdomainc_hdr, sizeof(unixdomainc_hdr), 0 ) != sizeof(unixdomainc_hdr) )
+ {
+ ErrorMessage("UnixDomainC: failed to transmit header\n");
+ return false;
+ }
+
+ if (send(sock_fd, msg.get_data(), msg.get_length(), 0) != msg.get_length())
+ return false;
+
+ return true;
+}
+
+bool UnixDomainConnector::transmit_message(const ConnectorMsg& msg, const ID&) {
+ return internal_transmit_message(msg);
+}
+
+bool UnixDomainConnector::transmit_message(const ConnectorMsg&& msg, const ID&) {
+ return internal_transmit_message(msg);
+}
+
+ConnectorMsg UnixDomainConnector::receive_message(bool) {
+ if (sock_fd < 0)
+ return ConnectorMsg();
+
+ ConnectorMsg* received_msg = receive_ring->get(nullptr);
+
+ if (!received_msg)
+ return ConnectorMsg();
+
+ ConnectorMsg ret_msg(std::move(*received_msg));
+ delete received_msg;
+
+ return ret_msg;
+}
+
+//-------------------------------------------------------------------------
+// api stuff
+//-------------------------------------------------------------------------
+
+static Module* mod_ctor() {
+ return new UnixDomainConnectorModule;
+}
+
+static void mod_dtor(Module* m) {
+ delete m;
+}
+
+static UnixDomainConnector* unixdomain_connector_tinit_call(const UnixDomainConnectorConfig& cfg, const char* path, size_t idx) {
+ int sfd;
+ if (!attempt_connection(sfd, path)) {
+ if (cfg.conn_retries) {
+ // Spawn a new thread to handle connection retries
+ start_retry_thread(cfg, idx);
+
+ return nullptr; // Return nullptr as the connection is not yet established
+ } else {
+ close(sfd);
+ return nullptr;
+ }
+ }
+ LogMessage("UnixDomainC: Connected to %s", path);
+ UnixDomainConnector* unixdomain_conn = new UnixDomainConnector(cfg, sfd, idx);
+ return unixdomain_conn;
+}
+
+static UnixDomainConnector* unixdomain_connector_tinit_answer(const UnixDomainConnectorConfig& cfg, const char* path, size_t idx) {
+ int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sfd == -1) {
+ ErrorMessage("UnixDomainC: socket error: %s", strerror(errno));
+ return nullptr;
+ }
+
+ struct sockaddr_un addr;
+ memset(&addr, 0, sizeof(struct sockaddr_un));
+ addr.sun_family = AF_UNIX;
+ strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
+
+ unlink(path);
+
+ if (bind(sfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) {
+ ErrorMessage("UnixDomainC: bind error: %s \n", strerror(errno));
+ close(sfd);
+ return nullptr;
+ }
+
+ if (listen(sfd, 10) == -1) {
+ ErrorMessage("UnixDomainC: listen error: %s \n", strerror(errno));
+ close(sfd);
+ return nullptr;
+ }
+
+ int peer_sfd = accept(sfd, nullptr, nullptr);
+ if (peer_sfd == -1) {
+ ErrorMessage("UnixDomainC: accept error: %s \n", strerror(errno));
+ close(sfd);
+ return nullptr;
+ }
+
+ LogMessage("UnixDomainC: Accepted connection from %s \n", path);
+ return new UnixDomainConnector(cfg, peer_sfd, idx);
+}
+
+static bool is_valid_path(const std::string& path) {
+ if (path.empty()) {
+ return false;
+ }
+
+ for (char c : path) {
+ if (!isalnum(c) && c != '_' && c != '.' && c != '/' && c != '-') {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+// Create a per-thread object
+static Connector* unixdomain_connector_tinit(const ConnectorConfig& config) {
+ const UnixDomainConnectorConfig& cfg = static_cast<const UnixDomainConnectorConfig&>(config);
+ const auto& paths = cfg.paths;
+ auto idx = 0;
+
+ if (get_instance_id() >= paths.size())
+ return nullptr;
+
+ idx = get_instance_id();
+ const char* path = paths[idx].c_str();
+
+ if (!is_valid_path(path)) {
+ ErrorMessage("UnixDomainC: Invalid path: %s", path);
+ return nullptr;
+ }
+
+ UnixDomainConnector* unix_conn;
+
+ if (cfg.setup == UnixDomainConnectorConfig::Setup::CALL)
+ unix_conn = unixdomain_connector_tinit_call(cfg, path, idx);
+ else if (cfg.setup == UnixDomainConnectorConfig::Setup::ANSWER)
+ unix_conn = unixdomain_connector_tinit_answer(cfg, path, idx);
+ else
+ unix_conn = nullptr;
+
+ return unix_conn;
+}
+
+static void unixdomain_connector_tterm(Connector* connector) {
+ UnixDomainConnector* unix_conn = (UnixDomainConnector*)connector;
+ delete unix_conn;
+}
+
+static ConnectorCommon* unixdomain_connector_ctor(Module* m) {
+ UnixDomainConnectorModule* mod = (UnixDomainConnectorModule*)m;
+ ConnectorCommon* unix_connector_common = new ConnectorCommon(mod->get_and_clear_config());
+ return unix_connector_common;
+}
+
+static void unixdomain_connector_dtor(ConnectorCommon* c) {
+ delete c;
+}
+
+const ConnectorApi unixdomain_connector_api = {
+ {
+ PT_CONNECTOR,
+ sizeof(ConnectorApi),
+ CONNECTOR_API_VERSION,
+ 2,
+ API_RESERVED,
+ API_OPTIONS,
+ UNIXDOMAIN_CONNECTOR_NAME,
+ UNIXDOMAIN_CONNECTOR_HELP,
+ mod_ctor,
+ mod_dtor
+ },
+ 0,
+ nullptr,
+ nullptr,
+ unixdomain_connector_tinit,
+ unixdomain_connector_tterm,
+ unixdomain_connector_ctor,
+ unixdomain_connector_dtor
+};
+
+#ifdef BUILDING_SO
+SO_PUBLIC const BaseApi* snort_plugins[] =
+#else
+const BaseApi* unixdomain_connector[] =
+#endif
+{
+ &unixdomain_connector_api.base,
+ nullptr
+};
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector.h author Umang Sharma <umasharm@cisco.com>
+
+#ifndef UNIXDOMAIN_CONNECTOR_H
+#define UNIXDOMAIN_CONNECTOR_H
+
+#include <atomic>
+#include <thread>
+
+#include "framework/connector.h"
+#include "managers/connector_manager.h"
+#include "helpers/ring.h"
+
+#include "unixdomain_connector_config.h"
+
+#define UNIXDOMAIN_FORMAT_VERSION (1)
+
+//-------------------------------------------------------------------------
+// class stuff
+//-------------------------------------------------------------------------
+
+class __attribute__((__packed__)) UnixDomainConnectorMsgHdr
+{
+public:
+ UnixDomainConnectorMsgHdr() : version(0), connector_msg_length(0)
+ { }
+ UnixDomainConnectorMsgHdr(uint32_t length)
+ { version = UNIXDOMAIN_FORMAT_VERSION; connector_msg_length = length; }
+
+ uint8_t version;
+ uint16_t connector_msg_length;
+};
+
+class UnixDomainConnector : public snort::Connector
+{
+public:
+ UnixDomainConnector(const UnixDomainConnectorConfig& config, int sfd, size_t idx);
+ ~UnixDomainConnector() override;
+
+ bool transmit_message(const snort::ConnectorMsg&, const ID& = null) override;
+ bool transmit_message(const snort::ConnectorMsg&&, const ID& = null) override;
+
+ snort::ConnectorMsg receive_message(bool) override;
+ void process_receive();
+
+ int sock_fd;
+
+private:
+ typedef Ring<snort::ConnectorMsg*> ReceiveRing;
+
+ void start_receive_thread();
+ void stop_receive_thread();
+ void receive_processing_thread();
+
+ snort::ConnectorMsg* read_message();
+ bool internal_transmit_message(const snort::ConnectorMsg& msg);
+
+ std::atomic<bool> run_thread;
+ std::thread* receive_thread;
+ ReceiveRing* receive_ring;
+ size_t instance_id;
+ UnixDomainConnectorConfig cfg;
+};
+
+#endif // UNIXDOMAIN_CONNECTOR_H
+
--- /dev/null
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector_config.h author Umang Sharma <umasharm@cisco.com>
+
+#ifndef UNIXDOMAIN_CONNECTOR_CONFIG_H
+#define UNIXDOMAIN_CONNECTOR_CONFIG_H
+
+#include <string>
+#include <vector>
+
+#include "framework/connector.h"
+#include "managers/plugin_manager.h"
+
+class UnixDomainConnectorConfig : public snort::ConnectorConfig
+{
+public:
+ enum Setup { CALL, ANSWER };
+
+ UnixDomainConnectorConfig()
+ { direction = snort::Connector::CONN_DUPLEX; async_receive = true; }
+
+ std::vector<std::string> paths;
+ Setup setup = {};
+ bool conn_retries = false;
+ uint32_t retry_interval = 4;
+ uint32_t max_retries = 5;
+ bool async_receive;
+};
+
+#endif
+
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector_module.cc author Umang Sharma <umasharm@cisco.com>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "unixdomain_connector_module.h"
+
+#include <sstream>
+#include <string>
+
+#include "log/messages.h"
+#include "main/thread_config.h"
+
+using namespace snort;
+using namespace std;
+
+static const Parameter unixdomain_connector_params[] =
+{
+ { "connector", Parameter::PT_STRING, nullptr, nullptr,
+ "connector name" },
+
+ { "paths", Parameter::PT_STR_LIST, nullptr, nullptr,
+ "list of paths to remote end-point" },
+
+ { "conn_retries", Parameter::PT_BOOL, nullptr, "false",
+ "retries to establish connection enabled or not" },
+
+ { "setup", Parameter::PT_ENUM, "call | answer", nullptr,
+ "stream establishment" },
+
+ { "retry_interval", Parameter::PT_INT, "1:50", "4",
+ "retry interval in seconds" },
+
+ { "max_retries", Parameter::PT_INT, "1:50", "5",
+ "maximum number of retries" },
+
+ { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
+};
+
+static const PegInfo unixdomain_connector_pegs[] =
+{
+ { CountType::SUM, "messages", "total messages" },
+ { CountType::END, nullptr, nullptr }
+};
+
+extern THREAD_LOCAL SimpleStats unixdomain_connector_stats;
+extern THREAD_LOCAL ProfileStats unixdomain_connector_perfstats;
+
+//-------------------------------------------------------------------------
+// unixdomain_connector module
+//-------------------------------------------------------------------------
+
+UnixDomainConnectorModule::UnixDomainConnectorModule() :
+ Module(UNIXDOMAIN_CONNECTOR_NAME, UNIXDOMAIN_CONNECTOR_HELP, unixdomain_connector_params, true)
+{ }
+
+ProfileStats* UnixDomainConnectorModule::get_profile() const
+{ return &unixdomain_connector_perfstats; }
+
+static void fill_paths(vector<string>& paths, const string& s)
+{
+ string path;
+ stringstream ss(s);
+
+ while (ss >> path)
+ paths.push_back(path);
+}
+
+bool UnixDomainConnectorModule::set(const char*, Value& v, SnortConfig*)
+{
+ if ( v.is("connector") )
+ config->connector_name = v.get_string();
+
+ else if ( v.is("paths") )
+ fill_paths(config->paths, v.get_string());
+
+ else if ( v.is("setup") )
+ {
+ switch ( v.get_uint8() )
+ {
+ case 0:
+ config->setup = UnixDomainConnectorConfig::CALL;
+ break;
+ case 1:
+ config->setup = UnixDomainConnectorConfig::ANSWER;
+ break;
+ default:
+ return false;
+ }
+ }
+
+ else if ( v.is("conn_retries") )
+ config->conn_retries = v.get_bool();
+
+ else if ( v.is("retry_interval") )
+ config->retry_interval = v.get_uint32();
+
+ else if ( v.is("max_retries") )
+ config->max_retries = v.get_uint32();
+
+ else
+ return false;
+
+ return true;
+}
+
+ConnectorConfig::ConfigSet UnixDomainConnectorModule::get_and_clear_config()
+{
+ return std::move(config_set);
+}
+
+bool UnixDomainConnectorModule::begin(const char*, int, SnortConfig*)
+{
+ if ( !config )
+ {
+ config = std::make_unique<UnixDomainConnectorConfig>();
+ config->direction = Connector::CONN_DUPLEX;
+ }
+
+ return true;
+}
+
+bool UnixDomainConnectorModule::end(const char*, int idx, SnortConfig*)
+{
+ if (idx != 0)
+ {
+ if ( config->paths.size() > 1 and config->paths.size() < ThreadConfig::get_instance_max() )
+ {
+ ParseError("The number of paths specified is insufficient to cover all threads. "
+ "Number of threads: %d.", ThreadConfig::get_instance_max());
+ return false;
+ }
+
+ config_set.emplace_back(std::move(config));
+ }
+
+ return true;
+}
+
+const PegInfo* UnixDomainConnectorModule::get_pegs() const
+{ return unixdomain_connector_pegs; }
+
+PegCount* UnixDomainConnectorModule::get_counts() const
+{ return (PegCount*)&unixdomain_connector_stats; }
+
--- /dev/null
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2025 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation. You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// unixdomain_connector_module.h author Umang Sharma <umasharm@cisco.com>
+
+#ifndef UNIXDOMAIN_CONNECTOR_MODULE_H
+#define UNIXDOMAIN_CONNECTOR_MODULE_H
+
+#include "framework/connector.h"
+#include "framework/module.h"
+
+#include <memory>
+
+#include "unixdomain_connector_config.h"
+
+#define UNIXDOMAIN_CONNECTOR_NAME "unixdomain_connector"
+#define UNIXDOMAIN_CONNECTOR_HELP "implement the unix domain stream connector"
+
+class UnixDomainConnectorModule : public snort::Module {
+public:
+ UnixDomainConnectorModule();
+
+ bool set(const char*, snort::Value&, snort::SnortConfig*) override;
+ bool begin(const char*, int, snort::SnortConfig*) override;
+ bool end(const char*, int, snort::SnortConfig*) override;
+
+ snort::ConnectorConfig::ConfigSet get_and_clear_config();
+
+ const PegInfo* get_pegs() const override;
+ PegCount* get_counts() const override;
+
+ snort::ProfileStats* get_profile() const override;
+
+ Usage get_usage() const override
+ { return GLOBAL; }
+
+private:
+ std::unique_ptr<UnixDomainConnectorConfig> config;
+ snort::ConnectorConfig::ConfigSet config_set;
+};
+
+#endif // UNIXDOMAIN_CONNECTOR_MODULE_H
return true;
}
+
+
+static bool valid_str_list(Value& v, const char* r)
+{
+ if ( v.get_type() != Value::VT_STR )
+ return false;
+
+ string pl = v.get_string();
+ vector<string> list;
+ split(pl, list);
+
+ for (const auto& p : list)
+ {
+ Value val(p.c_str());
+ if (!valid_string(val, r)) // valid_str
+ return false;
+ }
+ return true;
+}
+
//--------------------------------------------------------------------------
// Parameter methods
//--------------------------------------------------------------------------
case PT_IMPLIED:
return true;
+ case PT_STR_LIST:
+ return valid_str_list(v, (const char*)range);
+
default:
break;
}
"bool", "int", "interval", "real", "port",
"string", "select", "multi", "enum",
"mac", "ip4", "addr",
- "bit_list", "int_list", "addr_list", "implied"
+ "bit_list", "int_list", "addr_list", "implied",
+ "str_list"
};
const char* Parameter::get_type() const
{ false, valid_int_list, "1", "0" },
{ true, valid_int_list, "0", "0" },
+ { true, valid_str_list, "/tmp/file_path1", nullptr },
+ { true, valid_str_list, "/tmp/file_path1 /tmp/file_path2", "100" },
+ { false, valid_str_list, "green", "4" },
+ { false, valid_str_list, "/file_path3", "1" },
+
{ false, nullptr, nullptr, nullptr }
// __STRDUMP_ENABLE__
};
PT_INT_LIST, // string that contains ints
PT_ADDR_LIST, // Snort 2 ip list in [ ]
PT_IMPLIED, // rule option args w/o values eg relative
+ PT_STR_LIST, // string that contains strings
PT_MAX
};
const char* name;
return ( nullptr );
}
+void ConnectorManager::update_thread_connector(const std::string& connector_name, int instance_id, snort::Connector* connector)
+{
+ for (auto& sc : s_connector_commons)
+ {
+ auto connector_ptr = sc.connectors.find(connector_name);
+
+ if (connector_ptr != sc.connectors.end())
+ {
+ if (connector_ptr->second.thread_connectors[instance_id]) {
+ if (connector != connector_ptr->second.thread_connectors[instance_id])
+ sc.api->tterm(connector_ptr->second.thread_connectors[instance_id]);
+ }
+
+ connector_ptr->second.thread_connectors[instance_id] = connector;
+ break;
+ }
+ }
+}
+
void ConnectorManager::thread_init()
{
unsigned instance = get_instance_id();
static void instantiate(const snort::ConnectorApi*, snort::Module*, snort::SnortConfig*);
static snort::Connector::Direction is_instantiated(const std::string& name);
+ static void update_thread_connector(const std::string& connector_name, int instance_id, snort::Connector* connector);
+
static void thread_init();
static void thread_reinit();