]> git.ipfire.org Git - thirdparty/snort3.git/commitdiff
Pull request #4462: Connectors API update
authorVitalii Serhiiovych Horbatov -X (vhorbato - SOFTSERVE INC at Cisco) <vhorbato@cisco.com>
Fri, 18 Oct 2024 18:46:39 +0000 (18:46 +0000)
committerOleksii Shumeiko -X (oshumeik - SOFTSERVE INC at Cisco) <oshumeik@cisco.com>
Fri, 18 Oct 2024 18:46:39 +0000 (18:46 +0000)
Merge in SNORT/snort3 from ~VHORBATO/snort3:connector_upd to master

Squashed commit of the following:

commit 9acdf2c1e2657ff86cdd96ebcadd4af28ac30107
Author: Oleksii Shumeiko <oshumeik@cisco.com>
Date:   Tue Oct 8 16:21:02 2024 +0300

    connectors: make config object as reference

commit 0e8976d37222ad400feb5529ec336679b52d8b2f
Author: vhorbato <vhorbato@cisco.com>
Date:   Wed Oct 2 16:30:52 2024 +0300

    managers: update connector manager

    - use instance_id instead of a thread_id to access thread connectors
    - fix vector corruption in case of a thread restart

commit c8c1851b046b49f681b1a9f5cff240c18555cef0
Author: vhorbato <vhorbato@cisco.com>
Date:   Mon Oct 7 14:00:49 2024 +0300

    connectors: fix tsan warning in tcp conector

commit 6904058a46cb899949b6aee45cce0b3335f1de6b
Author: vhorbato <vhorbato@cisco.com>
Date:   Wed Oct 2 16:31:56 2024 +0300

    connectors: add connector reinitialization functionality

commit 0019e46ea3ecd3fa19129ca45437bedf8a5cb5de
Author: vhorbato <vhorbato@cisco.com>
Date:   Thu Sep 26 19:15:26 2024 +0300

    framework: update Connector interface

    * make ConnectorMsg fields const and accessible thru getters
    * make ConnectorMsg data lifetime control configurable
    * connectors: move message text formatting to SideChannel
    * connectors: change transmit_message overloads

commit ac8bd110f70287ce9c1f03cf84a4ab8f972c2e4b
Author: vhorbato <vhorbato@cisco.com>
Date:   Thu Sep 26 19:24:24 2024 +0300

    connectors: fill tcp_connector port number from user-configured list

    : fill port number from user-configured list

commit c6352512967c81d2f31c52da80a7708612079b3b
Author: vhorbato <vhorbato@cisco.com>
Date:   Thu Sep 26 19:23:19 2024 +0300

    main: move Connectors initialization from SideChannel

commit 8e6923fcf6c00a3204cf8d40f929b9bb9c0fdea1
Author: vhorbato <vhorbato@cisco.com>
Date:   Thu Sep 26 19:06:53 2024 +0300

    connectors: remove MessageHandle abstraction

34 files changed:
CMakeLists.txt
doc/user/connectors.txt
doc/user/side_channel.txt
src/connectors/file_connector/dev_notes.txt
src/connectors/file_connector/file_connector.cc
src/connectors/file_connector/file_connector.h
src/connectors/file_connector/file_connector_module.cc
src/connectors/file_connector/test/file_connector_module_test.cc
src/connectors/file_connector/test/file_connector_test.cc
src/connectors/tcp_connector/dev_notes.txt
src/connectors/tcp_connector/tcp_connector.cc
src/connectors/tcp_connector/tcp_connector.h
src/connectors/tcp_connector/tcp_connector_config.h
src/connectors/tcp_connector/tcp_connector_module.cc
src/connectors/tcp_connector/test/tcp_connector_module_test.cc
src/connectors/tcp_connector/test/tcp_connector_test.cc
src/flow/ha.cc
src/flow/test/ha_test.cc
src/framework/connector.h
src/main/analyzer.cc
src/main/test/distill_verdict_stubs.h
src/managers/connector_manager.cc
src/managers/connector_manager.h
src/side_channel/CMakeLists.txt
src/side_channel/dev_notes.txt
src/side_channel/side_channel.cc
src/side_channel/side_channel.h
src/side_channel/side_channel_format.cc [new file with mode: 0644]
src/side_channel/side_channel_format.h [new file with mode: 0644]
src/side_channel/side_channel_module.cc
src/side_channel/side_channel_module.h
src/side_channel/test/CMakeLists.txt
src/side_channel/test/side_channel_module_test.cc
src/side_channel/test/side_channel_test.cc

index e2e7f0704082db0d60b243fbd5c36549d0a1a431..65589e806a0fb4c649e6e8230c7551dc0cfbe95f 100644 (file)
@@ -2,7 +2,7 @@ cmake_minimum_required (VERSION 3.4.3)
 project (snort CXX C)
 
 set (VERSION_MAJOR 3)
-set (VERSION_MINOR 4)
+set (VERSION_MINOR 5)
 set (VERSION_PATCH 0)
 set (VERSION_SUBLEVEL 0)
 set (VERSION "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}.${VERSION_SUBLEVEL}")
index 8b07364cf73325db56c098a1c6c7e70d342c0caa..3be255e0d7a1ba9e5c0d22e677de95414caac2c8 100644 (file)
@@ -14,8 +14,8 @@ The TcpConnector is duplex while the FileConnector is simplex.
 
 All subtypes of Connector have a 'direction' configuration element and a
 'connector' element.  The 'connector' string is the key used to identify the
-element for sidechannel configuration.  The 'direction' element may have a
-default value, for instance TcpConnector's are 'duplex'.
+element for client module configuration.  The 'direction' element may have a
+default value, for instance TcpConnector is 'duplex'.
 
 
 There are currently two implementations of Connectors:
@@ -38,8 +38,11 @@ TcpConnector adds a few session setup configuration elements:
 
 * address = '<addr>' - used for 'call' setup to specify the partner
 
-* base_port = port - used to construct the actual port number for 'call' and
-        'answer' modes.  Actual port used is (base_port + instance_id).
+* ports = "port port ..." - used to pick a port number for 'call' and
+        'answer' modes.  If the 'ports' list contains more than one port,
+        the "per-thread" destination mode will be assumed. In this mode, each thread
+        will connect to a corresponding destination port by selecting a port number
+        from the list based on the instance_id.
 
 An example segment of TcpConnector configuration:
 
@@ -49,7 +52,7 @@ An example segment of TcpConnector configuration:
             connector = 'tcp_1',
             address = '127.0.0.1',
             setup = 'call',
-            base_port = 11000
+            ports = "11000 11001 11002 11003",
         },
     }
 
@@ -64,7 +67,7 @@ FileConnector configuration adds two additional element:
 
 * name = string - used as part of the message file name
 
-* format = 'text' or 'binary' - FileConnector supports two file types
+* text_format = bool - FileConnector works in binary mode by default, the option switches it to text mode
 
 The configured 'name' string is used to construct the actual names as in:
 
@@ -76,8 +79,6 @@ In the case of a receive FileConnector, all messages are read from the file
 prior to the start of packet processing.  This allows the messages to
 establish state information for all processed packets.
 
-Connectors are used solely by SideChannel
-
 An example segment of FileConnector configuration:
 
     file_connector =
@@ -85,13 +86,13 @@ An example segment of FileConnector configuration:
         {
             connector = 'file_tx_1',
             direction = 'transmit',
-            format = 'text',
+            text_format = true,
             name = 'HA'
         },
         {
             connector = 'file_rx_1',
             direction = 'receive',
-            format = 'text',
+            text_format = true,
             name = 'HA'
         },
     }
index 74b9f824997cc9d62f386b82cfcc0d3fe0589296..8a250d21512f9c9e05c7bd0c8b3cb571626097bd 100644 (file)
@@ -11,6 +11,8 @@ SideChannel adds functionality onto the Connector as:
 * application receive processing - handler for received messages on a
     specific port.
 
+* message formatting - convert data to text format if configured to do so
+
 SideChannel's are always implement a duplex (bidirectional) messaging model
 and can map to separate transmit and receive Connectors.
 
@@ -42,7 +44,8 @@ configuration:
                     connector = 'file_tx_1',
                 }
             },
-        },  
+            format = "text"
+        },
     }
 
     file_connector =
@@ -50,13 +53,13 @@ configuration:
         {
             connector = 'file_tx_1',
             direction = 'transmit',
-            format = 'text',
+            text_format = true,
             name = 'HA'
         },
         {
             connector = 'file_rx_1',
             direction = 'receive',
-            format = 'text',
+            text_format = true,
             name = 'HA'
         },
     }
index dce585a484ae5ef06e5c48c60a1feb29b8e788cd..cefa8ff7b645e773b8a860cb9f928ff4252f2c1f 100644 (file)
@@ -1,17 +1,17 @@
-Implement a connector plugin that reads/writes side channel messages from/to
-files.
+Implement a connector plugin that reads/writes messages from/to files.
 
 Each connector implements a simplex channel, either transmit or receive.  In-
-turn, each SideChannel owns a transmit and/or a receive connector object.
+turn, each client module owns a connector object.
 
-The "format = 'text'" option in the configuration sets text file input/output
-else binary file format is used.
+In binary files, an additional FileConnector message header is prepended to
+each message written to the output file.  This header specifies
+the file format version and the length of the message.  This length
+does not include the file connector message header, but does include the
+user's message header.
 
-In binary files, an additional FileConnector message header is pre-pended to
-each side channel message written to the output file.  This header specifies
-the file format version and the length of the side channel message.  This length
-does not include the file connector message header, but does include the side
-channel message header.
+The "text_format" option in the configuration sets text file input/output
+mode. In text mode, no header is added, but a newline symbol is appended
+to each message instead.
 
 The utility 'get_instance_file()' is used to uniquely name the files.  The
 complete file name convention is:
index 944e2485aa00c5f375f26af56ad0a9e17935425d..7d448fc2a572453b23efd2d83187b50c3021483a 100644 (file)
@@ -36,17 +36,6 @@ using namespace snort;
 THREAD_LOCAL SimpleStats file_connector_stats;
 THREAD_LOCAL ProfileStats file_connector_perfstats;
 
-FileConnectorMsgHandle::FileConnectorMsgHandle(const uint32_t length)
-{
-    connector_msg.length = length;
-    connector_msg.data = new uint8_t[length];
-}
-
-FileConnectorMsgHandle::~FileConnectorMsgHandle()
-{
-    delete[] connector_msg.data;
-}
-
 FileConnectorCommon::FileConnectorCommon(FileConnectorConfig::FileConnectorConfigSet* conf)
 {
     config_set = (ConnectorConfig::ConfigSet*)conf;
@@ -64,161 +53,85 @@ FileConnectorCommon::~FileConnectorCommon()
     delete config_set;
 }
 
-FileConnector::FileConnector(FileConnectorConfig* file_connector_config)
+bool FileConnector::internal_transmit_message(const ConnectorMsg& msg)
 {
-    config = file_connector_config;
-}
-
-ConnectorMsgHandle* FileConnector::alloc_message(const uint32_t length, const uint8_t** data)
-{
-    FileConnectorMsgHandle* msg = new FileConnectorMsgHandle(length);
+    if ( !msg.get_data() or msg.get_length() == 0 )
+        return false;
 
-    *data = (uint8_t*)msg->connector_msg.data;
-
-    return msg;
-}
-
-void FileConnector::discard_message(ConnectorMsgHandle* msg)
-{
-    FileConnectorMsgHandle* fmsg = (FileConnectorMsgHandle*)msg;
-    delete fmsg;
-}
-
-bool FileConnector::transmit_message(ConnectorMsgHandle* msg)
-{
-    FileConnectorMsgHandle* fmsg = (FileConnectorMsgHandle*)msg;
-    const FileConnectorConfig* cfg = (const FileConnectorConfig*)config;
-
-    if ( cfg->text_format )
+    if ( cfg.text_format )
     {
-        unsigned char* message = (unsigned char*)(fmsg->connector_msg.data + sizeof(SCMsgHdr));
-        SCMsgHdr* hdr = (SCMsgHdr*)(fmsg->connector_msg.data);
-
-        file << hdr->port << ":" << hdr->time_seconds << "." << hdr->time_u_seconds;
-
-        for ( int i = 0; i<(int)(fmsg->connector_msg.length - sizeof(SCMsgHdr)); i++ )
-        {
-            char hex_string[4];
-            snprintf(hex_string, 4, ",%02X", *message++);
-            file << hex_string;
-        }
-
+        file.write((const char*)msg.get_data(), msg.get_length());
         file << "\n";
     }
     else
     {
-        FileConnectorMsgHdr fc_hdr(fmsg->connector_msg.length);
+        FileConnectorMsgHdr fc_hdr(msg.get_length());
 
-        file.write( (const char*)&fc_hdr, sizeof(fc_hdr) );
-        file.write( (const char*)fmsg->connector_msg.data, fmsg->connector_msg.length);
+        file.write((const char*)&fc_hdr, sizeof(fc_hdr) );
+        file.write((const char*)msg.get_data(), msg.get_length());
     }
 
-    delete fmsg;
-
-    return true;
+    return file.good();
 }
 
-ConnectorMsgHandle* FileConnector::receive_message_binary()
-{
-    uint8_t* buffer = new uint8_t[MAXIMUM_SC_MESSAGE_CONTENT+sizeof(SCMsgHdr)+
-        sizeof(FileConnectorMsgHdr)];
-    // The FileConnectorMsgHdr is at the beginning of the buffer
-    FileConnectorMsgHdr* fc_hdr = (FileConnectorMsgHdr*)buffer;
+bool FileConnector::transmit_message(const ConnectorMsg& msg)
+{ return internal_transmit_message(msg); }
 
-    // Read the FileConnect and SC headers
-    file.read((char*)buffer, (sizeof(FileConnectorMsgHdr)+sizeof(SCMsgHdr)));
+bool FileConnector::transmit_message(const ConnectorMsg&& msg)
+{ return internal_transmit_message(msg); }
 
-    // If not present, then no message exists
-    if ( (unsigned)file.gcount() < (sizeof(FileConnectorMsgHdr)+sizeof(SCMsgHdr)) )
-    {
-        delete[] buffer;
-        return nullptr;
-    }
+ConnectorMsg FileConnector::receive_message_binary()
+{
+    uint8_t* fc_hdr_buf = new uint8_t[sizeof(FileConnectorMsgHdr)];
 
-    // Now read the SC message content
-    file.read((char*)(buffer+sizeof(FileConnectorMsgHdr)+sizeof(SCMsgHdr)),
-        (fc_hdr->connector_msg_length - sizeof(SCMsgHdr)));
+    FileConnectorMsgHdr* fc_hdr = (FileConnectorMsgHdr*)fc_hdr_buf;
+    file.read((char*)fc_hdr_buf, sizeof(FileConnectorMsgHdr));
 
-    // If not present, then no valid message exists
-    if ( (unsigned)file.gcount() < (fc_hdr->connector_msg_length - sizeof(SCMsgHdr)) )
+    if ( (unsigned)file.gcount() < sizeof(FileConnectorMsgHdr) or
+        fc_hdr->connector_msg_length == 0 or fc_hdr->version != FILE_FORMAT_VERSION )
     {
-        delete[] buffer;
-        return nullptr;
+        delete[] fc_hdr_buf;
+        return ConnectorMsg();
     }
 
-    // The message is valid, make a ConnectorMsg to contain it.
-    FileConnectorMsgHandle* handle = new FileConnectorMsgHandle(fc_hdr->connector_msg_length);
-
-    // Copy the connector message into the new ConnectorMsg
-    memcpy(handle->connector_msg.data, (buffer+sizeof(FileConnectorMsgHdr)),
-        fc_hdr->connector_msg_length);
-    delete[] buffer;
+    uint8_t* data = new uint8_t[fc_hdr->connector_msg_length];
+    file.read((char*)data, fc_hdr->connector_msg_length);
 
-    return handle;
-}
-
-ConnectorMsgHandle* FileConnector::receive_message_text()
-{
-    char line_buffer[4*MAXIMUM_SC_MESSAGE_CONTENT];
-    char message[MAXIMUM_SC_MESSAGE_CONTENT];
-    char* current = line_buffer;
-    uint64_t time_seconds;
-    uint32_t time_u_seconds;
-    uint16_t port;
-    int length = 0;
-
-    // Read the record
-    file.getline(line_buffer, sizeof(line_buffer));
-
-    // If not present, then no message exists
-    //   (1 for type, 1 for colon, 1 for time, 1 for null)
-    if ( (unsigned)file.gcount() < 4 )
+    if ( (unsigned)file.gcount() < fc_hdr->connector_msg_length )
     {
-        return nullptr;
+        delete[] fc_hdr_buf;
+        delete[] data;
+        return ConnectorMsg();
     }
 
-    // FIXIT-L Add sanity/retval checking for sscanfs below
-    sscanf(line_buffer, "%hu:%" SCNu64 ".%" SCNu32, &port, &time_seconds, &time_u_seconds);
+    ConnectorMsg msg(data, fc_hdr->connector_msg_length, true);
+    delete[] fc_hdr_buf;
 
-    while ( (current = strchr(current,(int)',')) != nullptr )
-    {
-        current += 1;   // step to the character after the comma
-        sscanf(current,"%hhx",(unsigned char*)&(message[length++]));
-    }
-
-    // The message is valid, make a ConnectorMsg to contain it.
-    FileConnectorMsgHandle* handle = new FileConnectorMsgHandle(length+sizeof(SCMsgHdr));
-
-    // Populate the new message header
-    SCMsgHdr* hdr = (SCMsgHdr*) handle->connector_msg.data;
-    hdr->port = port;
-    hdr->sequence = 0;
-    hdr->time_seconds = time_seconds;
-    hdr->time_u_seconds = time_u_seconds;
-    // Copy the connector message into the new ConnectorMsg
-    memcpy((handle->connector_msg.data+sizeof(SCMsgHdr)), message, length);
-
-    return handle;
+    return msg;
 }
 
 // Reading messages from files can never block.  Either a message exists
 //  or it does not.
-ConnectorMsgHandle* FileConnector::receive_message(bool)
+ConnectorMsg FileConnector::receive_message(bool)
 {
     if ( !file.is_open() )
-        return nullptr;
-    else
+        return ConnectorMsg();
+
+    if ( cfg.text_format )
     {
-        const FileConnectorConfig* cfg = (const FileConnectorConfig*)config;
-        if ( cfg->text_format )
-        {
-            return( receive_message_text() );
-        }
-        else
-        {
-            return( receive_message_binary() );
-        }
+        std::string line;
+        std::getline(file, line, file.widen('\n'));
+
+        if ( line.empty() )
+            return ConnectorMsg();
+
+        uint8_t* data = new uint8_t[line.size()];
+        memcpy(data, line.c_str(), line.size());
+
+        return ConnectorMsg(data, line.size(), true);
     }
+
+    return receive_message_binary();
 }
 
 //-------------------------------------------------------------------------
@@ -235,8 +148,7 @@ static void mod_dtor(Module* m)
     delete m;
 }
 
-static Connector* file_connector_tinit_transmit(std::string filename,
-    FileConnectorConfig* cfg)
+static Connector* file_connector_tinit_transmit(std::string& filename, const FileConnectorConfig& cfg)
 {
     FileConnector* file_conn = new FileConnector(cfg);
     std::string pathname;
@@ -244,13 +156,12 @@ static Connector* file_connector_tinit_transmit(std::string filename,
     filename += "_transmit";
     (void)get_instance_file(pathname, filename.c_str());
     file_conn->file.open(pathname,
-        (std::ios::out | (cfg->text_format ? (std::ios::openmode)0 : std::ios::binary)) );
+        (std::ios::out | (cfg.text_format ? (std::ios::openmode)0 : std::ios::binary)) );
 
     return file_conn;
 }
 
-static Connector* file_connector_tinit_receive(std::string filename,
-    FileConnectorConfig* cfg)
+static Connector* file_connector_tinit_receive(std::string& filename, const FileConnectorConfig& cfg)
 {
     FileConnector* file_conn = new FileConnector(cfg);
     std::string pathname;
@@ -263,19 +174,19 @@ static Connector* file_connector_tinit_receive(std::string filename,
 }
 
 // Create a per-thread object
-static Connector* file_connector_tinit(ConnectorConfig* config)
+static Connector* file_connector_tinit(const ConnectorConfig& config)
 {
-    FileConnectorConfig* cfg = (FileConnectorConfig*)config;
+    const FileConnectorConfig& fconf = (const FileConnectorConfig&)config;
 
     std::string filename = FILE_CONNECTOR_NAME;
     filename += "_";
-    filename += cfg->name;
+    filename += fconf.name;
 
-    if ( cfg->direction == Connector::CONN_TRANSMIT )
-        return file_connector_tinit_transmit(filename,cfg);
+    if ( fconf.direction == Connector::CONN_TRANSMIT )
+        return file_connector_tinit_transmit(filename, fconf);
 
-    else if ( cfg->direction == Connector::CONN_RECEIVE )
-        return file_connector_tinit_receive(filename,cfg);
+    else if ( fconf.direction == Connector::CONN_RECEIVE )
+        return file_connector_tinit_receive(filename, fconf);
 
     return nullptr;
 }
@@ -309,7 +220,7 @@ const ConnectorApi file_connector_api =
         PT_CONNECTOR,
         sizeof(ConnectorApi),
         CONNECTOR_API_VERSION,
-        0,
+        1,
         API_RESERVED,
         API_OPTIONS,
         FILE_CONNECTOR_NAME,
index 0a0df25f5333d28e4fedb896578443dc40b47af6..f6cae75e1abb55957a717f6b2f71a2dcb4e43fe1 100644 (file)
@@ -43,14 +43,6 @@ public:
     uint32_t connector_msg_length;
 };
 
-class FileConnectorMsgHandle : public snort::ConnectorMsgHandle
-{
-public:
-    FileConnectorMsgHandle(const uint32_t length);
-    ~FileConnectorMsgHandle();
-    snort::ConnectorMsg connector_msg;
-};
-
 class FileConnectorCommon : public snort::ConnectorCommon
 {
 public:
@@ -61,23 +53,23 @@ public:
 class FileConnector : public snort::Connector
 {
 public:
-    FileConnector(FileConnectorConfig*);
-    snort::ConnectorMsgHandle* alloc_message(const uint32_t, const uint8_t**) override;
-    void discard_message(snort::ConnectorMsgHandle*) override;
-    bool transmit_message(snort::ConnectorMsgHandle*) override;
-    snort::ConnectorMsgHandle* receive_message(bool) override;
+    FileConnector(const FileConnectorConfig& conf) : Connector(conf), cfg(conf) {}
 
-    snort::ConnectorMsg* get_connector_msg(snort::ConnectorMsgHandle* handle) override
-    { return( &((FileConnectorMsgHandle*)handle)->connector_msg ); }
-    Direction get_connector_direction() override
-    { return( ((const FileConnectorConfig*)config)->direction ); }
+    bool transmit_message(const snort::ConnectorMsg&) override;
+    bool transmit_message(const snort::ConnectorMsg&&) override;
+
+    snort::ConnectorMsg receive_message(bool) override;
+
+    bool flush() override
+    { file.flush(); return file.good(); }
 
     std::fstream file;
 
 private:
-    snort::ConnectorMsgHandle* receive_message_binary();
-    snort::ConnectorMsgHandle* receive_message_text();
+    bool internal_transmit_message(const snort::ConnectorMsg&);
+    snort::ConnectorMsg receive_message_binary();
+
+    const FileConnectorConfig& cfg;
 };
 
 #endif
-
index 0154743e68993b43d1829a0b631c95af704a6389..490cfdb8b1d73c86e5dff19bec0384c886ce3c9a 100644 (file)
@@ -34,8 +34,8 @@ static const Parameter file_connector_params[] =
     { "name", Parameter::PT_STRING, nullptr, nullptr,
       "channel name" },
 
-    { "format", Parameter::PT_ENUM, "binary | text", nullptr,
-      "file format" },
+    { "text_format", Parameter::PT_BOOL, nullptr, "false",
+      "skip header and add newline at the end of the message" },
 
     { "direction", Parameter::PT_ENUM, "receive | transmit | duplex", nullptr,
       "usage" },
@@ -79,8 +79,8 @@ bool FileConnectorModule::set(const char*, Value& v, SnortConfig*)
     else if ( v.is("name") )
         config->name = v.get_string();
 
-    else if ( v.is("format") )
-        config->text_format = ( v.get_uint8() == 1 );
+    else if ( v.is("text_format") )
+        config->text_format = v.get_bool();
 
     else if ( v.is("direction") )
     {
index 7bcf432dcf7e960a17c2bf503594fb91b436d804..1c26908c6c151308bd4701c5aaf0a3f20f253ad3 100644 (file)
@@ -56,12 +56,12 @@ TEST(file_connector_module, test)
 {
     Value connector_val("rx");
     Value name_val("rx");
-    Value format_val("binary");
+    Value format_val(true);
     Value direction_val("receive");
     Parameter direction_param =
         {"direction", Parameter::PT_ENUM, "receive | transmit | duplex", nullptr, "direction"};
     Parameter format_param =
-        {"format", Parameter::PT_ENUM, "binary | text", nullptr, "format"};
+        {"text_format", Parameter::PT_BOOL, nullptr, "false", "format"};
     Parameter connector_param =
         {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
     Parameter name_param =
@@ -79,7 +79,7 @@ TEST(file_connector_module, test)
     module.set("file_connector.name", name_val, nullptr);
     module.set("file_connector.direction", direction_val, nullptr);
     module.set("file_connector.connector", connector_val, nullptr);
-    module.set("file_connector.format", format_val, nullptr);
+    module.set("file_connector.text_format", format_val, nullptr);
     module.end("file_connector", 1, nullptr);
     module.end("file_connector", 0, nullptr);
 
@@ -93,7 +93,7 @@ TEST(file_connector_module, test)
     CHECK("rx" == config.name);
     CHECK("rx" == config.connector_name);
     CHECK(Connector::CONN_RECEIVE == config.direction);
-    CHECK(false == config.text_format);
+    CHECK(true == config.text_format);
 
     for ( auto conf : *config_set )
         delete conf;
index f950401f40a9e93d03b7fee6ec849273f714195e..aa320b4bc7decf5695d2f2ecfafce6635aadc0c7 100644 (file)
@@ -149,10 +149,10 @@ TEST_GROUP(file_connector_tinit_tterm)
         CHECK(mod != nullptr);
         connector_common = fc_api->ctor(mod);
         CHECK(connector_common != nullptr);
-        connector_tt = fc_api->tinit(&connector_tx_text_config);
-        connector_rt = fc_api->tinit(&connector_rx_text_config);
-        connector_tb = fc_api->tinit(&connector_tx_binary_config);
-        connector_rb = fc_api->tinit(&connector_rx_binary_config);
+        connector_tt = fc_api->tinit(connector_tx_text_config);
+        connector_rt = fc_api->tinit(connector_rx_text_config);
+        connector_tb = fc_api->tinit(connector_tx_binary_config);
+        connector_rb = fc_api->tinit(connector_rx_binary_config);
         CHECK(connector_tt != nullptr);
         CHECK(connector_rt != nullptr);
         CHECK(connector_tb != nullptr);
@@ -170,18 +170,6 @@ TEST_GROUP(file_connector_tinit_tterm)
     }
 };
 
-TEST(file_connector_tinit_tterm, alloc_discard)
-{
-    const uint8_t* data = nullptr;
-    FileConnector* fc_rt = (FileConnector*)connector_rt;
-
-    FileConnectorMsgHandle* handle = (FileConnectorMsgHandle*)(fc_rt->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
-    CHECK(handle->connector_msg.data == data);
-    fc_rt->discard_message(handle);
-}
-
 TEST_GROUP(file_connector_text)
 {
     void setup() override
@@ -199,46 +187,61 @@ TEST_GROUP(file_connector_text)
     }
 };
 
-TEST(file_connector_text, alloc_transmit_rename_receive_discard)
+TEST(file_connector_text, transmit_rename_receive)
 {
-    mod = file_connector->mod_ctor();
-    CHECK(mod != nullptr);
-    connector_common = fc_api->ctor(mod);
-    CHECK(connector_common != nullptr);
-    connector_tt = fc_api->tinit(&connector_tx_text_config);
-    CHECK(connector_tt != nullptr);
-    FileConnector* fc_tt = (FileConnector*)connector_tt;
-
-    const uint8_t* data = nullptr;
-    FileConnectorMsgHandle* t_handle = (FileConnectorMsgHandle*)(fc_tt->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(t_handle->connector_msg.length == 40);
-    CHECK(t_handle->connector_msg.data == data);
-    CHECK(fc_tt->transmit_message(t_handle) == true);
-
-    fc_api->tterm(connector_tt);
-    fc_api->dtor(connector_common);
-    file_connector->mod_dtor(mod);
+    uint32_t len = sizeof("foobar") - 1;
+
+    {
+        mod = file_connector->mod_ctor();
+        CHECK(mod != nullptr);
+        connector_common = fc_api->ctor(mod);
+        CHECK(connector_common != nullptr);
+        connector_tt = fc_api->tinit(connector_tx_text_config);
+        CHECK(connector_tt != nullptr);
+        FileConnector* fc_tt = (FileConnector*)connector_tt;
+
+        uint8_t* data = new uint8_t[len];
+        memcpy(data, "foobar", len);
+        ConnectorMsg t_msg(data, len, true);
+
+        CHECK(fc_tt->transmit_message(t_msg));
+
+        fc_api->tterm(connector_tt);
+        fc_api->dtor(connector_common);
+        file_connector->mod_dtor(mod);
+    }
+
+    {
+        std::ifstream transmit_res_file("file_connector_tx_t_transmit");
+        CHECK(transmit_res_file.is_open());
+
+        std::stringstream buffer;
+        buffer << transmit_res_file.rdbuf();
+
+        CHECK(buffer.str().size() == len + 1);
+        CHECK(buffer.str() == "foobar\n");
+    }
 
     std::rename("file_connector_tx_t_transmit", "file_connector_rx_t_receive");
 
-    mod = file_connector->mod_ctor();
-    CHECK(mod != nullptr);
-    connector_common = fc_api->ctor(mod);
-    CHECK(connector_common != nullptr);
-    connector_rt = fc_api->tinit(&connector_rx_text_config);
-    CHECK(connector_rt != nullptr);
-    FileConnector* fc_rt = (FileConnector*)connector_rt;
+    {
+        mod = file_connector->mod_ctor();
+        CHECK(mod != nullptr);
+        connector_common = fc_api->ctor(mod);
+        CHECK(connector_common != nullptr);
+        connector_rt = fc_api->tinit(connector_rx_text_config);
+        CHECK(connector_rt != nullptr);
+        FileConnector* fc_rt = (FileConnector*)connector_rt;
 
-    FileConnectorMsgHandle* r_handle = (FileConnectorMsgHandle*)(fc_rt->receive_message(true));
-    CHECK(r_handle != nullptr);
-    CHECK(r_handle->connector_msg.length == 40);
+        ConnectorMsg t_msg = fc_rt->receive_message(true);
 
-    fc_rt->discard_message(r_handle);
+        CHECK(t_msg.get_length() == len);
+        CHECK(strncmp((const char*)t_msg.get_data(), "foobar", len) == 0);
 
-    fc_api->tterm(connector_rt);
-    fc_api->dtor(connector_common);
-    file_connector->mod_dtor(mod);
+        fc_api->tterm(connector_rt);
+        fc_api->dtor(connector_common);
+        file_connector->mod_dtor(mod);
+    }
 }
 
 TEST_GROUP(file_connector_binary)
@@ -258,57 +261,59 @@ TEST_GROUP(file_connector_binary)
     }
 };
 
-TEST(file_connector_binary, alloc_transmit_rename_receive_discard)
+TEST(file_connector_binary, transmit_rename_receive)
 {
-    mod = file_connector->mod_ctor();
-    CHECK(mod != nullptr);
-    connector_common = fc_api->ctor(mod);
-    CHECK(connector_common != nullptr);
-    connector_tb = fc_api->tinit(&connector_tx_binary_config);
-    CHECK(connector_tb != nullptr);
-    FileConnector* fc_tb = (FileConnector*)connector_tb;
-
-    const uint8_t* data = nullptr;
-    FileConnectorMsgHandle* t_handle = (FileConnectorMsgHandle*)(fc_tb->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(t_handle->connector_msg.length == 40);
-    CHECK(t_handle->connector_msg.data == data);
-    CHECK(fc_tb->transmit_message(t_handle) == true);
-
-    fc_api->tterm(connector_tb);
-    fc_api->dtor(connector_common);
-    file_connector->mod_dtor(mod);
+    const uint32_t len = sizeof("foobar") - 1;
+    {
+        mod = file_connector->mod_ctor();
+        CHECK(mod != nullptr);
+        connector_common = fc_api->ctor(mod);
+        CHECK(connector_common != nullptr);
+        connector_tb = fc_api->tinit(connector_tx_binary_config);
+        CHECK(connector_tb != nullptr);
+        FileConnector* fc_tb = (FileConnector*)connector_tb;
 
-    std::rename("file_connector_tx_b_transmit", "file_connector_rx_b_receive");
+        uint8_t* data = new uint8_t[len];
+        memcpy(data, "foobar", len);
+        ConnectorMsg t_msg(data, len, true);
 
-    mod = file_connector->mod_ctor();
-    CHECK(mod != nullptr);
-    connector_common = fc_api->ctor(mod);
-    CHECK(connector_common != nullptr);
-    connector_rb = fc_api->tinit(&connector_rx_binary_config);
-    CHECK(connector_rb != nullptr);
-    FileConnector* fc_rb = (FileConnector*)connector_rb;
+        CHECK(fc_tb->transmit_message(std::move(t_msg)));
 
-    FileConnectorMsgHandle* r_handle = (FileConnectorMsgHandle*)(fc_rb->receive_message(true));
-    CHECK(r_handle != nullptr);
-    CHECK(r_handle->connector_msg.length == 40);
+        fc_api->tterm(connector_tb);
+        fc_api->dtor(connector_common);
+        file_connector->mod_dtor(mod);
+    }
 
-    fc_rb->discard_message(r_handle);
+    {
+        std::ifstream transmit_res_file("file_connector_tx_b_transmit");
+        CHECK(transmit_res_file.is_open());
 
-    fc_api->tterm(connector_rb);
-    fc_api->dtor(connector_common);
-    file_connector->mod_dtor(mod);
-}
+        std::stringstream transmit_res;
+        transmit_res << transmit_res_file.rdbuf();
 
-TEST_GROUP(file_connector_msg_handle)
-{
-};
+        CHECK(transmit_res.str().size() == len + sizeof(FileConnectorMsgHdr));
+        CHECK(transmit_res.str().substr(sizeof(FileConnectorMsgHdr)) == ("foobar"));
+    }
 
-TEST(file_connector_msg_handle, test)
-{
-    FileConnectorMsgHandle handle(12);
-    CHECK(handle.connector_msg.length == 12);
-    CHECK(handle.connector_msg.data != nullptr);
+    std::rename("file_connector_tx_b_transmit", "file_connector_rx_b_receive");
+
+    {
+        mod = file_connector->mod_ctor();
+        CHECK(mod != nullptr);
+        connector_common = fc_api->ctor(mod);
+        CHECK(connector_common != nullptr);
+        connector_rb = fc_api->tinit(connector_rx_binary_config);
+        CHECK(connector_rb != nullptr);
+        FileConnector* fc_rt = (FileConnector*)connector_rb;
+
+        ConnectorMsg t_msg = fc_rt->receive_message(true);
+        CHECK(t_msg.get_length() == len);
+        CHECK(strncmp((const char*)t_msg.get_data(), "foobar", len) == 0);
+
+        fc_api->tterm(connector_rb);
+        fc_api->dtor(connector_common);
+        file_connector->mod_dtor(mod);
+    }
 }
 
 int main(int argc, char** argv)
index 10e2d1870202ec04635ed68e7f4afb6bb7d8bc47..cb0d37433ae85d9e2f69e258d576da2525bde121 100644 (file)
@@ -1,14 +1,13 @@
-Implement a connector plugin that reads and writes side channel messages across
-a TCP stream channel.
+Implements a connector plugin that reads and writes messages across a TCP stream channel.
 
 Each connector implements a duplex channel, both transmit and receive. When used
 by a side_channel object, a single TcpConnector object is used for both the
 transmit and receive connectors.
 
-An additional TcpConnector message header is pre-pended to each side channel
-message transmitted.  This header specifies the protocol format version and the
-length of the side channel message.  This length does not include the tcp
-connector message header, but does include the side channel message header.
+An additional TcpConnector message header is pre-pended to each message transmitted.
+This header specifies the protocol format version and the length of the message.
+This length does not include the tcp connector message header, but does include the
+user's message header.
 
 The tcp_connector Connector configuration results in ONE ConnectorCommon
 object which is used to contain a list of all Connectors being configured.
@@ -16,16 +15,17 @@ A vector<> in the ConnectorCommon object holds individual Connector config
 objects.  The ConnectorManager then uses this vector<> to instantiate the
 set of desired Connectors.
 
-TCP connector configuration includes a partner address, base port numbers, and
-connection setup direction.  The actual port number used is the base port 
-number added to the thread instance value.
+TCP connector configuration includes a partner address, port numbers and a
+connection setup direction. If the 'ports' list contains more than one port,
+the "per-thread" destination mode will be assumed. In this mode, each thread
+will connect to a corresponding destination port by selecting a port number
+from the list based on the instance_id.
 
 A TCP connector can be either the active partner and initiate the TCP connection
 or can be the passive partner and expect to be called by the active side.  This
 is controlled by the 'setup' configuration element.
 
 Receive messages are managed via separate thread and ring buffer queue structure.
-The thread's purpose is to read whole side channel messages from the stream and
-insert them into the queue.  Then the packet processing thread is able to read
-whole side messages from the queue.
+The thread's purpose is to read messages from the stream and insert them into the queue.
+Then the packet processing thread is able to read a whole message from the queue.
 
index aaaaf192909a21297eae138079832b45c8d6d4b9..f1f7a908971267e678af3ce78008c37db074d15d 100644 (file)
@@ -41,17 +41,6 @@ using namespace snort;
 THREAD_LOCAL SimpleStats tcp_connector_stats;
 THREAD_LOCAL ProfileStats tcp_connector_perfstats;
 
-TcpConnectorMsgHandle::TcpConnectorMsgHandle(const uint32_t length)
-{
-    connector_msg.length = length;
-    connector_msg.data = new uint8_t[length];
-}
-
-TcpConnectorMsgHandle::~TcpConnectorMsgHandle()
-{
-    delete[] connector_msg.data;
-}
-
 TcpConnectorCommon::TcpConnectorCommon(TcpConnectorConfig::TcpConnectorConfigSet* conf)
 {
     config_set = (ConnectorConfig::ConfigSet*)conf;
@@ -116,7 +105,7 @@ static ReadDataOutcome read_message_data(int sockfd, uint16_t length, uint8_t *d
 }
 
 
-static TcpConnectorMsgHandle* read_message(int sock_fd)
+ConnectorMsg* TcpConnector::read_message()
 {
     TcpConnectorMsgHdr hdr;
     ReadDataOutcome outcome;
@@ -137,19 +126,19 @@ static TcpConnectorMsgHandle* read_message(int sock_fd)
         return nullptr;
     }
 
-    TcpConnectorMsgHandle* handle = new TcpConnectorMsgHandle(hdr.connector_msg_length);
+    uint8_t* data = new uint8_t[hdr.connector_msg_length];
 
-    if ((outcome = read_message_data(sock_fd, hdr.connector_msg_length, handle->connector_msg.data)) != SUCCESS)
+    if ((outcome = read_message_data(sock_fd, hdr.connector_msg_length, data)) != SUCCESS)
     {
         if (outcome == CLOSED)
             LogMessage("TcpC Input Thread: Connection closed while reading message data");
         else
             ErrorMessage("TcpC Input Thread: Unable to receive local message data: %d\n", (int)outcome);
-        delete handle;
+        delete[] data;
         return nullptr;
     }
 
-    return handle;
+    return new ConnectorMsg(data, hdr.connector_msg_length, true);
 }
 
 void TcpConnector::process_receive()
@@ -181,18 +170,18 @@ void TcpConnector::process_receive()
     }
     else if (rval > 0 && pfds[0].revents & POLLIN)
     {
-        TcpConnectorMsgHandle* handle = read_message(sock_fd);
-        if (handle && !receive_ring->put(handle))
+        ConnectorMsg* connector_msg = read_message();
+        if (connector_msg && !receive_ring->put(connector_msg))
         {
             ErrorMessage("TcpC Input Thread: overrun\n");
-            delete handle;
+            delete connector_msg;
         }
     }
 }
 
 void TcpConnector::receive_processing_thread()
 {
-    while (run_thread)
+    while (run_thread.load(std::memory_order_relaxed))
     {
         process_receive();
     }
@@ -200,7 +189,7 @@ void TcpConnector::receive_processing_thread()
 
 void TcpConnector::start_receive_thread()
 {
-    run_thread = true;
+    run_thread.store(true, std::memory_order_relaxed);
     receive_thread = new std::thread(&TcpConnector::receive_processing_thread, this);
 }
 
@@ -208,19 +197,17 @@ void TcpConnector::stop_receive_thread()
 {
     if ( receive_thread != nullptr )
     {
-        run_thread = false;
+        run_thread.store(false, std::memory_order_relaxed);
         receive_thread->join();
         delete receive_thread;
     }
 }
 
-TcpConnector::TcpConnector(TcpConnectorConfig* tcp_connector_config, int sfd)
+TcpConnector::TcpConnector(const TcpConnectorConfig& tcp_connector_config, int sfd) :
+    Connector(tcp_connector_config), sock_fd(sfd), run_thread(false), receive_thread(nullptr),
+    receive_ring(new ReceiveRing(50))
 {
-    receive_thread = nullptr;
-    config = tcp_connector_config;
-    receive_ring = new ReceiveRing(50);
-    sock_fd = sfd;
-    if ( tcp_connector_config->async_receive )
+    if ( tcp_connector_config.async_receive )
         start_receive_thread();
 }
 
@@ -231,60 +218,52 @@ TcpConnector::~TcpConnector()
     close(sock_fd);
 }
 
-ConnectorMsgHandle* TcpConnector::alloc_message(const uint32_t length, const uint8_t** data)
-{
-    TcpConnectorMsgHandle* msg = new TcpConnectorMsgHandle(length);
-
-    *data = (uint8_t*)msg->connector_msg.data;
-
-    return msg;
-}
-
-void TcpConnector::discard_message(ConnectorMsgHandle* msg)
-{
-    TcpConnectorMsgHandle* tmsg = (TcpConnectorMsgHandle*)msg;
-    delete tmsg;
-}
-
-bool TcpConnector::transmit_message(ConnectorMsgHandle* msg)
+bool TcpConnector::internal_transmit_message(const ConnectorMsg& msg)
 {
-    TcpConnectorMsgHandle* tmsg = (TcpConnectorMsgHandle*)msg;
+    if ( !msg.get_data() or msg.get_length() == 0 )
+        return false;
 
     if ( sock_fd < 0 )
     {
         ErrorMessage("TcpConnector: transmitting to a closed socket\n");
-        delete tmsg;
         return false;
     }
 
-    TcpConnectorMsgHdr tcpc_hdr(tmsg->connector_msg.length);
+    TcpConnectorMsgHdr tcpc_hdr(msg.get_length());
 
     if ( send( sock_fd, (const char*)&tcpc_hdr, sizeof(tcpc_hdr), 0 ) != sizeof(tcpc_hdr) )
     {
         ErrorMessage("TcpConnector: failed to transmit header\n");
-        delete tmsg;
         return false;
     }
 
-    if ( send( sock_fd, (const char*)tmsg->connector_msg.data, tmsg->connector_msg.length, 0 ) !=
-        tmsg->connector_msg.length )
-    {
-        delete tmsg;
+    if ( send( sock_fd, (const char*)msg.get_data(), msg.get_length(), 0 ) != msg.get_length() )
         return false;
-    }
-
-    delete tmsg;
 
     return true;
 }
 
-ConnectorMsgHandle* TcpConnector::receive_message(bool)
+bool TcpConnector::transmit_message(const ConnectorMsg& msg)
+{ return internal_transmit_message(msg); }
+
+bool TcpConnector::transmit_message(const ConnectorMsg&& msg)
+{ return internal_transmit_message(msg); }
+
+ConnectorMsg TcpConnector::receive_message(bool)
 {
     // If socket isn't open, return 'no message'
     if ( sock_fd < 0 )
-        return nullptr;
+        return ConnectorMsg();
 
-    return receive_ring->get(nullptr);
+    ConnectorMsg* received_msg = receive_ring->get(nullptr);
+
+    if ( !received_msg )
+        return ConnectorMsg();
+
+    ConnectorMsg ret_msg(std::move(*received_msg));
+    delete received_msg;
+
+    return ret_msg;
 }
 
 //-------------------------------------------------------------------------
@@ -301,7 +280,7 @@ static void mod_dtor(Module* m)
     delete m;
 }
 
-static TcpConnector* tcp_connector_tinit_call(TcpConnectorConfig* cfg, const char* port)
+static TcpConnector* tcp_connector_tinit_call(const TcpConnectorConfig& cfg, const char* port)
 {
     struct addrinfo hints;
     struct addrinfo *result, *rp;
@@ -313,7 +292,7 @@ static TcpConnector* tcp_connector_tinit_call(TcpConnectorConfig* cfg, const cha
     hints.ai_flags = 0;
     hints.ai_protocol = 0;          /* Any protocol */
 
-    if ( (s = getaddrinfo(cfg->address.c_str(), port, &hints, &result)) != 0)
+    if ( (s = getaddrinfo(cfg.address.c_str(), port, &hints, &result)) != 0)
     {
         ErrorMessage("getaddrinfo: %s\n", gai_strerror(s));
         return nullptr;
@@ -348,7 +327,7 @@ static TcpConnector* tcp_connector_tinit_call(TcpConnectorConfig* cfg, const cha
     return tcp_conn;
 }
 
-static TcpConnector* tcp_connector_tinit_answer(TcpConnectorConfig* cfg, const char* port)
+static TcpConnector* tcp_connector_tinit_answer(const TcpConnectorConfig& cfg, const char* port)
 {
     struct addrinfo hints;
     struct addrinfo *result, *rp;
@@ -419,27 +398,23 @@ static TcpConnector* tcp_connector_tinit_answer(TcpConnectorConfig* cfg, const c
 }
 
 // Create a per-thread object
-static Connector* tcp_connector_tinit(ConnectorConfig* config)
+static Connector* tcp_connector_tinit(const ConnectorConfig& config)
 {
-    TcpConnectorConfig* cfg = (TcpConnectorConfig*)config;
-
-    uint16_t instance = (uint16_t)get_instance_id();
-    char port_string[6];  // size based on decimal representation of an uint16_t
+    const TcpConnectorConfig& cfg = (const TcpConnectorConfig&)config;
+    const auto& ports = cfg.ports;
+    auto idx = 0;
 
-    if ( ((uint32_t)cfg->base_port + (uint32_t)instance) > (uint32_t)UINT16_MAX )
-    {
-        ErrorMessage("tcp_connector with improper base_port: %d\n",cfg->base_port);
-        return nullptr;
-    }
+    if ( ports.size() > 1 )
+        idx = get_instance_id() % ports.size();
 
-    snprintf(port_string, sizeof(port_string), "%5hu", static_cast<uint16_t>(cfg->base_port + instance));
+    const char* port = ports[idx].c_str();
 
     TcpConnector* tcp_conn;
 
-    if ( cfg->setup == TcpConnectorConfig::Setup::CALL )
-        tcp_conn = tcp_connector_tinit_call(cfg, port_string);
-    else if ( cfg->setup == TcpConnectorConfig::Setup::ANSWER )
-        tcp_conn = tcp_connector_tinit_answer(cfg, port_string);
+    if ( cfg.setup == TcpConnectorConfig::Setup::CALL )
+        tcp_conn = tcp_connector_tinit_call(cfg, port);
+    else if ( cfg.setup == TcpConnectorConfig::Setup::ANSWER )
+        tcp_conn = tcp_connector_tinit_answer(cfg, port);
     else
         tcp_conn = nullptr;
 
@@ -474,7 +449,7 @@ const ConnectorApi tcp_connector_api =
         PT_CONNECTOR,
         sizeof(ConnectorApi),
         CONNECTOR_API_VERSION,
-        0,
+        1,
         API_RESERVED,
         API_OPTIONS,
         TCP_CONNECTOR_NAME,
index a1b0739ca4b29e47dee47db12889fd495012b159..36051ce0bf0d0d73908721016a5c05c934424f23 100644 (file)
@@ -21,6 +21,7 @@
 #ifndef TCP_CONNECTOR_H
 #define TCP_CONNECTOR_H
 
+#include <atomic>
 #include <thread>
 
 #include "framework/connector.h"
@@ -46,14 +47,6 @@ public:
     uint16_t connector_msg_length;
 };
 
-class TcpConnectorMsgHandle : public snort::ConnectorMsgHandle
-{
-public:
-    TcpConnectorMsgHandle(const uint32_t length);
-    ~TcpConnectorMsgHandle();
-    snort::ConnectorMsg connector_msg;
-};
-
 class TcpConnectorCommon : public snort::ConnectorCommon
 {
 public:
@@ -64,29 +57,29 @@ public:
 class TcpConnector : public snort::Connector
 {
 public:
-    typedef Ring<TcpConnectorMsgHandle*> ReceiveRing;
-
-    TcpConnector(TcpConnectorConfig*, int sock_fd);
+    TcpConnector(const TcpConnectorConfig&, int sock_fd);
     ~TcpConnector() override;
-    snort::ConnectorMsgHandle* alloc_message(const uint32_t, const uint8_t**) override;
-    void discard_message(snort::ConnectorMsgHandle*) override;
-    bool transmit_message(snort::ConnectorMsgHandle*) override;
-    snort::ConnectorMsgHandle* receive_message(bool) override;
-
-    snort::ConnectorMsg* get_connector_msg(snort::ConnectorMsgHandle* handle) override
-    { return( &((TcpConnectorMsgHandle*)handle)->connector_msg ); }
-    Direction get_connector_direction() override
-    { return Connector::CONN_DUPLEX; }
+
+    bool transmit_message(const snort::ConnectorMsg&) override;
+    bool transmit_message(const snort::ConnectorMsg&&) override;
+
+    snort::ConnectorMsg receive_message(bool) override;
+
     void process_receive();
 
     int sock_fd;
 
 private:
-    bool run_thread = false;
-    std::thread* receive_thread;
+    typedef Ring<snort::ConnectorMsg*> ReceiveRing;
+
     void start_receive_thread();
     void stop_receive_thread();
     void receive_processing_thread();
+    snort::ConnectorMsg* read_message();
+    bool internal_transmit_message(const snort::ConnectorMsg&);
+
+    std::atomic<bool> run_thread;
+    std::thread* receive_thread;
     ReceiveRing* receive_ring;
 };
 
index d6684ef1b63d06fb02a3180a7c16eea26d05fe6c..59381ee48409c0e246cd2dc95574223ef4149539 100644 (file)
@@ -21,6 +21,7 @@
 #ifndef TCP_CONNECTOR_CONFIG_H
 #define TCP_CONNECTOR_CONFIG_H
 
+#include <string>
 #include <vector>
 
 #include "framework/connector.h"
@@ -29,10 +30,11 @@ class TcpConnectorConfig : public snort::ConnectorConfig
 {
 public:
     enum Setup { CALL, ANSWER };
+
     TcpConnectorConfig()
     { direction = snort::Connector::CONN_DUPLEX; async_receive = true; }
 
-    uint16_t base_port = 0;
+    std::vector<std::string> ports;
     std::string address;
     Setup setup = {};
     bool async_receive;
index 284682f6f0b0bd30ae9fb91fbcb55f5d5f9e9b5a..3f34db8e3314e73f3e60d121c7824120a1393a76 100644 (file)
 
 #include "tcp_connector_module.h"
 
+#include <sstream>
+#include <string>
+
+#include "log/messages.h"
+#include "main/thread_config.h"
+
 using namespace snort;
+using namespace std;
 
 static const Parameter tcp_connector_params[] =
 {
     { "connector", Parameter::PT_STRING, nullptr, nullptr,
       "connector name" },
 
-    { "address", Parameter::PT_STRING, nullptr, nullptr,
-      "address" },
+    { "address", Parameter::PT_ADDR, nullptr, nullptr,
+      "address of the remote end-point" },
 
-    { "base_port", Parameter::PT_PORT, nullptr, nullptr,
-      "base port number" },
+    { "ports", Parameter::PT_INT_LIST, "65535", nullptr,
+      "list of ports of the remote end-point" },
 
     { "setup", Parameter::PT_ENUM, "call | answer", nullptr,
       "stream establishment" },
@@ -64,15 +71,22 @@ TcpConnectorModule::TcpConnectorModule() :
 
 TcpConnectorModule::~TcpConnectorModule()
 {
-    if ( config )
-        delete config;
-    if ( config_set )
-        delete config_set;
+    delete config;
+    delete config_set;
 }
 
 ProfileStats* TcpConnectorModule::get_profile() const
 { return &tcp_connector_perfstats; }
 
+static void fill_ports(vector<string>& ports, const string& s)
+{
+    string port;
+    stringstream ss(s);
+
+    while ( ss >> port )
+        ports.push_back(port);
+}
+
 bool TcpConnectorModule::set(const char*, Value& v, SnortConfig*)
 {
     if ( v.is("connector") )
@@ -81,8 +95,8 @@ bool TcpConnectorModule::set(const char*, Value& v, SnortConfig*)
     else if ( v.is("address") )
         config->address = v.get_string();
 
-    else if ( v.is("base_port") )
-        config->base_port = v.get_uint16();
+    else if ( v.is("ports") )
+        fill_ports(config->ports, v.get_string());
 
     else if ( v.is("setup") )
     {
@@ -98,6 +112,7 @@ bool TcpConnectorModule::set(const char*, Value& v, SnortConfig*)
             return false;
         }
     }
+
     return true;
 }
 
@@ -115,7 +130,9 @@ bool TcpConnectorModule::begin(const char*, int, SnortConfig*)
     if ( !config )
     {
         config = new TcpConnectorConfig;
+        config->direction = Connector::CONN_DUPLEX;
     }
+
     return true;
 }
 
@@ -123,6 +140,13 @@ bool TcpConnectorModule::end(const char*, int idx, SnortConfig*)
 {
     if (idx != 0)
     {
+        if ( config->ports.size() > 1 and config->ports.size() < ThreadConfig::get_instance_max() )
+        {
+            ParseError("The number of ports specified is insufficient to cover all threads. "
+                "Number of threads: %d.", ThreadConfig::get_instance_max());
+            return false;
+        }
+
         config_set->emplace_back(config);
         config = nullptr;
     }
index 16342d9529b7fdd772104af9f4c6dfa3eafcbc9e..50a0a7e39ef3263daf33bff980f44e7c84b3a714 100644 (file)
@@ -39,6 +39,7 @@ THREAD_LOCAL ProfileStats tcp_connector_perfstats;
 void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
 void show_stats(PegCount*, const PegInfo*, const std::vector<unsigned>&, const char*, FILE*) { }
 
+unsigned instance_max = 1;
 namespace snort
 {
 char* snort_strdup(const char* s)
@@ -46,7 +47,11 @@ char* snort_strdup(const char* s)
 
 unsigned get_instance_id()
 { return 0; }
-unsigned ThreadConfig::get_instance_max() { return 1; }
+
+unsigned ThreadConfig::get_instance_max()
+{ return instance_max; }
+
+void ParseError(const char*, ...) { }
 }
 
 TEST_GROUP(tcp_connector_module)
@@ -57,32 +62,38 @@ TEST(tcp_connector_module, test_call)
 {
     Value connector_val("tcp-c");
     Value address_val("127.0.0.1");
-    Value base_port_val((double)10000);
+    Value base_port_val("10000 20000");
     Value setup_val("call");
     Parameter address_param =
         {"address", Parameter::PT_STRING, nullptr, nullptr, "address"};
     Parameter connector_param =
         {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
     Parameter base_port_param =
-        {"base_port", Parameter::PT_PORT, nullptr, nullptr, "base_port"};
+        {"ports", Parameter::PT_INT_LIST, "65535", nullptr, "list of ports"};
     Parameter setup_param =
         {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
 
     TcpConnectorModule module;
 
     address_val.set(&address_param);
+    CHECK(true == address_param.validate(address_val));
     base_port_val.set(&base_port_param);
+    CHECK(true == base_port_param.validate(base_port_val));
     setup_val.set(&setup_param);
+    CHECK(true == setup_param.validate(setup_val));
     connector_val.set(&connector_param);
+    CHECK(true == connector_param.validate(connector_val));
+
+    instance_max = 2;
 
-    module.begin("tcp_connector", 0, nullptr);
-    module.begin("tcp_connector", 1, nullptr);
-    module.set("tcp_connector.base_port", base_port_val, nullptr);
-    module.set("tcp_connector.address", address_val, nullptr);
-    module.set("tcp_connector.connector", connector_val, nullptr);
-    module.set("tcp_connector.setup", setup_val, nullptr);
-    module.end("tcp_connector", 1, nullptr);
-    module.end("tcp_connector", 0, nullptr);
+    CHECK(module.begin("tcp_connector", 0, nullptr));
+    CHECK(module.begin("tcp_connector", 1, nullptr));
+    CHECK(module.set("tcp_connector.ports", base_port_val, nullptr));
+    CHECK(module.set("tcp_connector.address", address_val, nullptr));
+    CHECK(module.set("tcp_connector.connector", connector_val, nullptr));
+    CHECK(module.set("tcp_connector.setup", setup_val, nullptr));
+    CHECK(module.end("tcp_connector", 1, nullptr));
+    CHECK(module.end("tcp_connector", 0, nullptr));
 
     TcpConnectorConfig::TcpConnectorConfigSet* config_set = module.get_and_clear_config();
 
@@ -91,7 +102,8 @@ TEST(tcp_connector_module, test_call)
     CHECK(1 == config_set->size());
 
     TcpConnectorConfig config = *(config_set->front());
-    CHECK(10000 == config.base_port);
+    CHECK(10000 == std::stoi(config.ports[0]));
+    CHECK(20000 == std::stoi(config.ports[1]));
     CHECK("127.0.0.1" == config.address);
     CHECK(TcpConnectorConfig::Setup::CALL == config.setup);
     CHECK("tcp-c" == config.connector_name);
@@ -106,36 +118,37 @@ TEST(tcp_connector_module, test_call)
 
     config_set->clear();
     delete config_set;
+    instance_max = 1;
 }
 
-TEST(tcp_connector_module, test_answer)
+TEST(tcp_connector_module, test_ports_count_failure)
 {
     Value connector_val("tcp-a");
-    Value base_port_val((double)20000);
+    Value base_port_val("20000");
     Value setup_val("answer");
     Parameter connector_param =
         {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
     Parameter base_port_param =
-        {"base_port", Parameter::PT_PORT, nullptr, nullptr, "base_port"};
+        {"ports", Parameter::PT_INT_LIST, "65535", nullptr, "list of ports"};
     Parameter setup_param =
         {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
 
     TcpConnectorModule module;
 
     base_port_val.set(&base_port_param);
-    CHECK( true == base_port_param.validate(base_port_val) );
+    CHECK(true == base_port_param.validate(base_port_val));
     setup_val.set(&setup_param);
-    CHECK( true == setup_param.validate(setup_val) );
+    CHECK(true == setup_param.validate(setup_val));
     connector_val.set(&connector_param);
-    CHECK( true == connector_param.validate(connector_val) );
+    CHECK(true == connector_param.validate(connector_val));
 
-    module.begin("tcp_connector", 0, nullptr);
-    module.begin("tcp_connector", 1, nullptr);
-    module.set("tcp_connector.base_port", base_port_val, nullptr);
-    module.set("tcp_connector.connector", connector_val, nullptr);
-    module.set("tcp_connector.setup", setup_val, nullptr);
-    module.end("tcp_connector", 1, nullptr);
-    module.end("tcp_connector", 0, nullptr);
+    CHECK(module.begin("tcp_connector", 0, nullptr));
+    CHECK(module.begin("tcp_connector", 1, nullptr));
+    CHECK(module.set("tcp_connector.ports", base_port_val, nullptr));
+    CHECK(module.set("tcp_connector.connector", connector_val, nullptr));
+    CHECK(module.set("tcp_connector.setup", setup_val, nullptr));
+    CHECK(module.end("tcp_connector", 1, nullptr));
+    CHECK(module.end("tcp_connector", 0, nullptr));
 
     TcpConnectorConfig::TcpConnectorConfigSet* config_set = module.get_and_clear_config();
 
@@ -144,8 +157,8 @@ TEST(tcp_connector_module, test_answer)
     CHECK(1 == config_set->size());
 
     TcpConnectorConfig config = *(config_set->front());
-    CHECK(20000 == config.base_port);
-//    CHECK(config.setup == TcpConnectorConfig::Setup::ANSWER);
+    CHECK(20000 == stoi(config.ports[0]));
+    CHECK(config.setup == TcpConnectorConfig::Setup::ANSWER);
     CHECK("tcp-a" == config.connector_name);
     CHECK(Connector::CONN_DUPLEX == config.direction);
 
@@ -156,6 +169,37 @@ TEST(tcp_connector_module, test_answer)
     delete config_set;
 }
 
+TEST(tcp_connector_module, test_answer)
+{
+    Value connector_val("tcp-a");
+    Value base_port_val("20000 30000");
+    Value setup_val("answer");
+    Parameter connector_param =
+        {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
+    Parameter base_port_param =
+        {"ports", Parameter::PT_INT_LIST, "65535", nullptr, "list of ports"};
+    Parameter setup_param =
+        {"setup", Parameter::PT_ENUM, "call | answer", nullptr, "establishment"};
+
+    TcpConnectorModule module;
+
+    base_port_val.set(&base_port_param);
+    CHECK(true == base_port_param.validate(base_port_val));
+    setup_val.set(&setup_param);
+    CHECK(true == setup_param.validate(setup_val));
+    connector_val.set(&connector_param);
+    CHECK(true == connector_param.validate(connector_val));
+
+    instance_max = 3;
+    CHECK(module.begin("tcp_connector", 0, nullptr));
+    CHECK(module.begin("tcp_connector", 1, nullptr));
+    CHECK(module.set("tcp_connector.ports", base_port_val, nullptr));
+    CHECK(module.set("tcp_connector.connector", connector_val, nullptr));
+    CHECK(module.set("tcp_connector.setup", setup_val, nullptr));
+    CHECK(module.end("tcp_connector", 1, nullptr) == false);
+    CHECK(module.end("tcp_connector", 0, nullptr));
+}
+
 int main(int argc, char** argv)
 {
     return CommandLineTestRunner::RunAllTests(argc, argv);
index 76297a268fd5f3462365d85d6b62d4c465069350..62b8213202d50049fe888175838122ca1e5bf970 100644 (file)
@@ -199,10 +199,13 @@ TEST_GROUP(tcp_connector)
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp";
         connector_config.address = "127.0.0.1";
-        connector_config.base_port = 10000;
+        connector_config.ports.push_back("10000");
         connector_config.setup = TcpConnectorConfig::Setup::CALL;
         connector_config.async_receive = false;
     }
+
+    void teardown() override
+    { connector_config.ports = std::move(std::vector<std::string>()); }
 };
 
 TEST(tcp_connector, mod_ctor_dtor)
@@ -233,7 +236,7 @@ TEST_GROUP(tcp_connector_call_error)
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp";
         connector_config.address = "127.0.0.1";
-        connector_config.base_port = 10000;
+        connector_config.ports.push_back("10000");
         connector_config.setup = TcpConnectorConfig::Setup::CALL;
         connector_config.async_receive = false;
         CHECK(tcp_connector != nullptr);
@@ -245,15 +248,18 @@ TEST_GROUP(tcp_connector_call_error)
 
     void teardown() override
     {
-        connector = tcpc_api->tinit(&connector_config);
+        connector = tcpc_api->tinit(connector_config);
         CHECK(connector == nullptr);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
 TEST_GROUP(tcp_connector_call_other)
 {
+    void teardown()
+    { connector_config.ports = std::move(std::vector<std::string>()); }
 };
 
 TEST_GROUP(tcp_connector_answer_error)
@@ -264,7 +270,7 @@ TEST_GROUP(tcp_connector_answer_error)
         set_normal_status();
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp-a";
-        connector_config.base_port = 20000;
+        connector_config.ports.push_back("20000");
         connector_config.setup = TcpConnectorConfig::Setup::ANSWER;
         connector_config.async_receive = false;
         CHECK(tcp_connector != nullptr);
@@ -276,19 +282,14 @@ TEST_GROUP(tcp_connector_answer_error)
 
     void teardown() override
     {
-        connector = tcpc_api->tinit(&connector_config);
+        connector = tcpc_api->tinit(connector_config);
         CHECK(connector == nullptr);
-        tcpc_api->tterm(connector);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
-TEST(tcp_connector_call_error, bad_port)
-{
-    s_instance = 65000;
-}
-
 TEST(tcp_connector_call_error, bad_socket)
 {
     s_socket_return = -1;
@@ -327,7 +328,7 @@ TEST(tcp_connector_call_other, bad_setup)
     connector_config.direction = Connector::CONN_DUPLEX;
     connector_config.connector_name = "tcp";
     connector_config.address = "127.0.0.1";
-    connector_config.base_port = 10000;
+    connector_config.ports.push_back("10000");
     connector_config.setup = (TcpConnectorConfig::Setup)(-1);
     connector_config.async_receive = false;
     CHECK(tcp_connector != nullptr);
@@ -335,7 +336,7 @@ TEST(tcp_connector_call_other, bad_setup)
     CHECK(mod != nullptr);
     connector_common = tcpc_api->ctor(mod);
     CHECK(connector_common != nullptr);
-    connector = tcpc_api->tinit(&connector_config);
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector == nullptr);
     tcpc_api->dtor(connector_common);
     tcp_connector->mod_dtor(mod);
@@ -351,7 +352,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_thread_call)
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp";
         connector_config.address = "127.0.0.1";
-        connector_config.base_port = 10000;
+        connector_config.ports.push_back("10000");
         connector_config.setup = TcpConnectorConfig::Setup::CALL;
         connector_config.async_receive = true;
         CHECK(tcp_connector != nullptr);
@@ -359,7 +360,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_thread_call)
         CHECK(mod != nullptr);
         connector_common = tcpc_api->ctor(mod);
         CHECK(connector_common != nullptr);
-        connector = tcpc_api->tinit(&connector_config);
+        connector = tcpc_api->tinit(connector_config);
         CHECK(connector != nullptr);
         CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
     }
@@ -369,6 +370,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_thread_call)
         tcpc_api->tterm(connector);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
@@ -382,7 +384,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_call)
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp";
         connector_config.address = "127.0.0.1";
-        connector_config.base_port = 10000;
+        connector_config.ports.push_back("10000");
         connector_config.setup = TcpConnectorConfig::Setup::CALL;
         connector_config.async_receive = false;
         CHECK(tcp_connector != nullptr);
@@ -390,7 +392,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_call)
         CHECK(mod != nullptr);
         connector_common = tcpc_api->ctor(mod);
         CHECK(connector_common != nullptr);
-        connector = tcpc_api->tinit(&connector_config);
+        connector = tcpc_api->tinit(connector_config);
         CHECK(connector != nullptr);
         CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
     }
@@ -400,6 +402,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_call)
         tcpc_api->tterm(connector);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
@@ -413,7 +416,7 @@ TEST_GROUP(tcp_connector_no_tinit_tterm_call)
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp";
         connector_config.address = "127.0.0.1";
-        connector_config.base_port = 10000;
+        connector_config.ports.push_back("10000");
         connector_config.setup = TcpConnectorConfig::Setup::CALL;
         connector_config.async_receive = false;
         CHECK(tcp_connector != nullptr);
@@ -428,13 +431,14 @@ TEST_GROUP(tcp_connector_no_tinit_tterm_call)
         tcpc_api->tterm(connector);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
 TEST(tcp_connector_no_tinit_tterm_call, poll_undesirable)
 {
     s_poll_undesirable = true;
-    connector = tcpc_api->tinit(&connector_config);
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     size_t size = sizeof(TcpConnectorMsgHdr) + 10;
     uint8_t* message = new uint8_t[size];
@@ -449,15 +453,18 @@ TEST(tcp_connector_no_tinit_tterm_call, poll_undesirable)
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+
+    const ConnectorMsg msg = tcpc->receive_message(false);
+    CHECK(msg.get_data() == nullptr);
+    CHECK(msg.get_length() == 0);
+
     delete[] message;
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, poll_error)
 {
     s_poll_error = true;
-    connector = tcpc_api->tinit(&connector_config);
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     size_t size = sizeof(TcpConnectorMsgHdr) + 10;
     uint8_t* message = new uint8_t[size];
@@ -472,8 +479,11 @@ TEST(tcp_connector_no_tinit_tterm_call, poll_error)
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+
+    const ConnectorMsg msg = tcpc->receive_message(false);
+    CHECK(msg.get_data() == nullptr);
+    CHECK(msg.get_length() == 0);
+
     delete[] message;
 }
 
@@ -486,7 +496,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_answer)
         tcpc_api = (const ConnectorApi*) tcp_connector;
         connector_config.direction = Connector::CONN_DUPLEX;
         connector_config.connector_name = "tcp-a";
-        connector_config.base_port = 20000;
+        connector_config.ports.push_back("20000");
         connector_config.setup = TcpConnectorConfig::Setup::ANSWER;
         connector_config.async_receive = false;
         CHECK(tcp_connector != nullptr);
@@ -494,7 +504,7 @@ TEST_GROUP(tcp_connector_tinit_tterm_answer)
         CHECK(mod != nullptr);
         connector_common = tcpc_api->ctor(mod);
         CHECK(connector_common != nullptr);
-        connector = tcpc_api->tinit(&connector_config);
+        connector = tcpc_api->tinit(connector_config);
         CHECK(connector->get_connector_direction() == Connector::CONN_DUPLEX);
         CHECK(connector != nullptr);
     }
@@ -504,242 +514,273 @@ TEST_GROUP(tcp_connector_tinit_tterm_answer)
         tcpc_api->tterm(connector);
         tcpc_api->dtor(connector_common);
         tcp_connector->mod_dtor(mod);
+        connector_config.ports = std::move(std::vector<std::string>());
     }
 };
 
-TEST(tcp_connector_tinit_tterm_call, alloc_discard)
-{
-    const uint8_t* data = nullptr;
-    TcpConnector* tcpc = (TcpConnector*)connector;
-
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)(tcpc->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
-    CHECK(handle->connector_msg.data == data);
-    tcpc->discard_message(handle);
-}
-
 TEST(tcp_connector_tinit_tterm_call, alloc_transmit)
 {
-    const uint8_t* data = nullptr;
+    const uint32_t len = 40;
+    const uint8_t* data = new uint8_t[len];
     TcpConnector* tcpc = (TcpConnector*)connector;
     set_normal_status();
 
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)(tcpc->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
-    s_send_ret_other = 40;
-    CHECK(handle->connector_msg.data == data);
-    CHECK(tcpc->transmit_message(handle) == true);
+    ConnectorMsg msg(data, len, true);
+
+    CHECK(msg.get_length() == len);
+    CHECK(msg.get_data() == data);
+
+    s_send_ret_other = len;
+    CHECK(tcpc->transmit_message(msg) == true);
+    CHECK(tcpc->transmit_message(std::move(msg)) == true);
 }
 
 TEST(tcp_connector_tinit_tterm_call, alloc_transmit_header_fail)
 {
-    const uint8_t* data = nullptr;
+    const uint32_t len = 40;
+    const uint8_t* data = new uint8_t[len];
     TcpConnector* tcpc = (TcpConnector*)connector;
     set_normal_status();
 
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)(tcpc->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
+    ConnectorMsg msg(data, len, true);
+
+    CHECK(msg.get_length() == len);
+    CHECK(msg.get_data() == data);
+
     s_send_ret_header = sizeof(TcpConnectorMsgHdr)-1;
-    s_send_ret_other = 40;
-    CHECK(handle->connector_msg.data == data);
-    CHECK(tcpc->transmit_message(handle) == false);
+    s_send_ret_other = len;
+    CHECK(tcpc->transmit_message(msg) == false);
+    CHECK(tcpc->transmit_message(std::move(msg)) == false);
 }
 
 TEST(tcp_connector_tinit_tterm_call, alloc_transmit_body_fail)
 {
-    const uint8_t* data = nullptr;
+    const uint32_t len = 40;
+    const uint8_t* data = new uint8_t[len];
     TcpConnector* tcpc = (TcpConnector*)connector;
     set_normal_status();
 
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)(tcpc->alloc_message(40,&data));
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
+    ConnectorMsg msg(data, len, true);
+
+    CHECK(msg.get_length() == len);
+    CHECK(msg.get_data() == data);
+
     s_send_ret_other = 30;
-    CHECK(handle->connector_msg.data == data);
-    CHECK(tcpc->transmit_message(handle) == false);
+    CHECK(tcpc->transmit_message(msg) == false);
+    CHECK(tcpc->transmit_message(std::move(msg)) == false);
 }
 
 TEST(tcp_connector_tinit_tterm_call, alloc_transmit_no_sock)
 {
-    const uint8_t* data = nullptr;
+    const uint32_t len = 40;
+    const uint8_t* data = new uint8_t[len];
     TcpConnector* tcpc = (TcpConnector*)connector;
 
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)(tcpc->alloc_message(40,&data));
+    ConnectorMsg msg(data, len, true);
+
+    CHECK(msg.get_length() == len);
+    CHECK(msg.get_data() == data);
+
     tcpc->sock_fd = -1;
-    CHECK(data != nullptr);
-    CHECK(handle->connector_msg.length == 40);
-    CHECK(handle->connector_msg.data == data);
-    CHECK(tcpc->transmit_message(handle) == false);
+    CHECK(tcpc->transmit_message(msg) == false);
+    CHECK(tcpc->transmit_message(std::move(msg)) == false);
 }
 
 TEST(tcp_connector_tinit_tterm_call, receive_no_sock)
 {
     TcpConnector* tcpc = (TcpConnector*)connector;
     tcpc->sock_fd = -1;
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+    const ConnectorMsg msg = tcpc->receive_message(false);
+    CHECK(msg.get_data() == nullptr);
+    CHECK(msg.get_length() == 0);
 }
 
 TEST(tcp_connector_tinit_tterm_call, receive)
 {
+    const uint32_t cmsg_len = 10;
     TcpConnector* tcpc = (TcpConnector*)connector;
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION;
-    hdr->connector_msg_length = 10;
+    hdr->connector_msg_length = cmsg_len;
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
+
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    ConnectorMsg* conn_msg = tcpc->get_connector_msg(handle);
+    ConnectorMsg conn_msg = tcpc->receive_message(false);
+
+    CHECK(conn_msg.get_length() == cmsg_len);
+    CHECK(memcmp(conn_msg.get_data(), (message+sizeof(TcpConnectorMsgHdr)), cmsg_len) == 0);
 
-    CHECK(handle != nullptr);
-    CHECK(conn_msg->length == 10);
-    CHECK(memcmp( handle->connector_msg.data, (message+sizeof(TcpConnectorMsgHdr)), 10) == 0);
-    tcpc->discard_message(handle);
     delete[] message;
-    handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+
+    conn_msg = std::move(tcpc->receive_message(false));
+    CHECK(conn_msg.get_data() == nullptr);
+    CHECK(conn_msg.get_length() == 0);
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, receive_wrong_version)
 {
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    const uint32_t cmsg_len = 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION+1;
-    hdr->connector_msg_length = 10;
+    hdr->connector_msg_length = cmsg_len;
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
-    connector = tcpc_api->tinit(&connector_config);
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     TcpConnector* tcpc = (TcpConnector*)connector;
+
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+    const ConnectorMsg conn_msg = tcpc->receive_message(false);
+
+    CHECK(conn_msg.get_data() == nullptr);
+    CHECK(conn_msg.get_length() == 0);
     delete[] message;
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, receive_recv_error_EAGAIN)
 {
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    const uint32_t cmsg_len = 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION;
-    hdr->connector_msg_length = 10;
+    hdr->connector_msg_length = cmsg_len;
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
     s_rec_error = EAGAIN;
-    connector = tcpc_api->tinit(&connector_config);
+
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     TcpConnector* tcpc = (TcpConnector*)connector;
+
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle != nullptr);
-    tcpc->discard_message(handle);
+    const ConnectorMsg conn_msg = tcpc->receive_message(false);
+
+    CHECK(conn_msg.get_length() == cmsg_len);
+    CHECK(memcmp(conn_msg.get_data(), (message+sizeof(TcpConnectorMsgHdr)), cmsg_len) == 0);
+
     delete[] message;
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, receive_recv_error_EBADF)
 {
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    const uint32_t cmsg_len = 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION;
-    hdr->connector_msg_length = 10;
+    hdr->connector_msg_length = cmsg_len;
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
     s_rec_error = EBADF;
-    connector = tcpc_api->tinit(&connector_config);
+
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     TcpConnector* tcpc = (TcpConnector*)connector;
+
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle != nullptr);
-    tcpc->discard_message(handle);
+    const ConnectorMsg conn_msg = tcpc->receive_message(false);
+
+    CHECK(conn_msg.get_length() == cmsg_len);
+    CHECK(memcmp(conn_msg.get_data(), (message+sizeof(TcpConnectorMsgHdr)), cmsg_len) == 0);
+
     delete[] message;
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, receive_recv_closed)
 {
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    const uint32_t cmsg_len = 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION;
-    hdr->connector_msg_length = 10;
+    hdr->connector_msg_length = cmsg_len;
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
     s_rec_return_zero = true;
-    connector = tcpc_api->tinit(&connector_config);
+
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     TcpConnector* tcpc = (TcpConnector*)connector;
+
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
+    const ConnectorMsg conn_msg = tcpc->receive_message(false);
+
+    CHECK(conn_msg.get_data() == nullptr);
+    CHECK(conn_msg.get_length() == 0);
+
     delete[] message;
 }
 
 TEST(tcp_connector_no_tinit_tterm_call, receive_recv_body_closed)
 {
-    size_t size = sizeof(TcpConnectorMsgHdr) + 10;
+    const uint32_t cmsg_len = 10;
+    size_t size = sizeof(TcpConnectorMsgHdr) + cmsg_len;
     uint8_t* message = new uint8_t[size];
+
     for (int i = sizeof(TcpConnectorMsgHdr); i < (int)size; i++ )
         message[i] = i;
+
     TcpConnectorMsgHdr* hdr = (TcpConnectorMsgHdr*)message;
     hdr->version = TCP_FORMAT_VERSION;
-    hdr->connector_msg_length = 10;
-    s_rec_error_size = 10;  // only indicate the error on the 10 byte recv()
+    hdr->connector_msg_length = cmsg_len;
+    s_rec_error_size = cmsg_len;  // only indicate the error on the cmsg_len byte recv()
     s_rec_message = message;
     s_rec_message_size = size; // also trigger the read action
     s_poll_data_available = true;
     s_rec_return_zero = true;
-    connector = tcpc_api->tinit(&connector_config);
+
+    connector = tcpc_api->tinit(connector_config);
     CHECK(connector != nullptr);
     TcpConnector* tcpc = (TcpConnector*)connector;
+
     tcpc->process_receive();
     tcpc->process_receive();
     tcpc->process_receive();
-    TcpConnectorMsgHandle* handle = (TcpConnectorMsgHandle*)tcpc->receive_message(false);
-    CHECK(handle == nullptr);
-    delete[] message;
-}
+    const ConnectorMsg conn_msg = tcpc->receive_message(false);
 
-TEST_GROUP(tcp_connector_msg_handle)
-{
-};
+    CHECK(conn_msg.get_data() == nullptr);
+    CHECK(conn_msg.get_length() == 0);
 
-TEST(tcp_connector_msg_handle, test)
-{
-    TcpConnectorMsgHandle handle(12);
-    CHECK(handle.connector_msg.length == 12);
-    CHECK(handle.connector_msg.data != nullptr);
+    delete[] message;
 }
 
 int main(int argc, char** argv)
index f8a99131c0f97a6c9b79510e13d6687c86a7b489..6aad22003c66eebbd770c86566cfbde5638aafdb 100644 (file)
@@ -581,11 +581,13 @@ static void send_sc_update_message(Flow& flow, SideChannel& sc)
 
     SCMessage* sc_msg = sc.alloc_transmit_message((uint32_t) (header_len + content_len));
     assert(sc_msg);
+
     HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
 
-    write_msg_header(flow, HA_UPDATE_EVENT, header_len + content_len, ha_msg);
+    write_msg_header(flow, HA_UPDATE_EVENT, sc_msg->content_length, ha_msg);
     write_update_msg_content(flow, ha_msg, false);
     update_msg_header_length(ha_msg);
+
     sc.transmit_message(sc_msg);
 }
 
@@ -637,7 +639,10 @@ void HighAvailability::process_update(Flow* flow, Packet* p)
 static void send_sc_deletion_message(Flow& flow, SideChannel& sc)
 {
     const uint32_t msg_len = calculate_msg_header_length(flow);
+
     SCMessage* sc_msg = sc.alloc_transmit_message(msg_len);
+    assert(sc_msg);
+
     HAMessage ha_msg(sc_msg->content, sc_msg->content_length);
 
     // No content, only header+key
index 3eb21e125071b9771b26ba47d70a4052a6cc9be7..d3d3672bd15c3a5d6c34c8be4bffc0667b75d4f1 100644 (file)
@@ -217,6 +217,10 @@ int SFDAQInstance::ioctl(DAQ_IoctlCmd, void*, size_t) { return DAQ_SUCCESS; }
 
 FlowStash::~FlowStash() = default;
 
+
+SideChannel::SideChannel(ScMsgFormat)
+{ }
+
 SideChannel* SideChannelManager::get_side_channel(SCPort)
 {
     return (SideChannel*)mock().getData("s_side_channel").getObjectPointer();
@@ -236,21 +240,19 @@ void SideChannel::register_receive_handler(const SCProcessMsgFunc& handler)
 
 void SideChannel::unregister_receive_handler() { }
 
-bool SideChannel::discard_message(SCMessage*)
+bool SideChannel::discard_message(SCMessage*) const
 { return true; }
 
-static SCMsgHdr s_sc_header = { 0, 1, 0, 0 };
 bool SideChannel::process(int)
 {
     SCMessage* msg = (SCMessage*)mock().getData("message_content").getObjectPointer();
     SCProcessMsgFunc* s_handler = (SCProcessMsgFunc*)mock().getData("s_handler").getObjectPointer();
     if (s_handler && nullptr != *s_handler && msg && msg->content && msg->content_length != 0)
     {
-        SCMessage s_rec_sc_message = {};
+        SCMessage s_rec_sc_message((SideChannel*)mock().getData("s_side_channel").getObjectPointer(),
+            msg->connector, ConnectorMsg());
         s_rec_sc_message.content = msg->content;
         s_rec_sc_message.content_length = msg->content_length;
-        s_rec_sc_message.hdr = &s_sc_header;
-        s_rec_sc_message.sc = (SideChannel*)mock().getData("s_side_channel").getObjectPointer();;
         (*s_handler)(&s_rec_sc_message);
         return true;
     }
@@ -258,7 +260,7 @@ bool SideChannel::process(int)
         return false;
 }
 
-bool SideChannel::transmit_message(SCMessage* msg)
+bool SideChannel::transmit_message(SCMessage* msg) const
 {
     mock().actualCall("transmit_message");
     mock().setDataObject("message", "SCMessage", msg);
@@ -400,14 +402,14 @@ TEST_GROUP(high_availability_test)
     StreamHAClient* s_ha_client;
     FlowHAClient* s_other_ha_client;
     uint8_t s_message[MSG_SIZE];
-    SCMessage s_sc_message;
+    SCMessage s_sc_message = SCMessage(nullptr, nullptr, ConnectorMsg());
     Packet s_pkt;
     struct timeval s_packet_time;
     HighAvailabilityConfig hac;
     FlowHAState* ha_state;
     FlowKey flow_key;
     SCProcessMsgFunc handler;
-    SideChannel side_channel;
+    SideChannel side_channel = SideChannel(ScMsgFormat::BINARY);
     FlowKey s_flow_key;
 
     void setup() override
@@ -426,7 +428,6 @@ TEST_GROUP(high_availability_test)
         mock().setDataObject("flow", "Flow", &s_flow);
         active = {};
         memset(s_message, 0, sizeof(s_message));
-        s_sc_message = {};
         s_sc_message.content = s_message;
         mock().setDataObject("message_content", "SCMessage", &s_sc_message);
         s_pkt.active = &active;
index 3a03c388f6e975c79905b2227943834ded5ffbff..118f2414b2e213f5a1b1ea65e7aa980c058d40f8 100644 (file)
@@ -35,7 +35,7 @@
 namespace snort
 {
 // this is the current version of the api
-#define CONNECTOR_API_VERSION ((BASE_API_VERSION << 16) | 1)
+#define CONNECTOR_API_VERSION ((BASE_API_VERSION << 16) | 2)
 
 //-------------------------------------------------------------------------
 // api for class
@@ -45,14 +45,49 @@ namespace snort
 
 class ConnectorConfig;
 
-struct ConnectorMsg
+class ConnectorMsg
 {
-    uint32_t length;
-    uint8_t* data;
-};
+public:
+    ConnectorMsg() = default;
 
-class ConnectorMsgHandle
-{
+    ConnectorMsg(const uint8_t* data, uint32_t length, bool pass_ownership = false) :
+        data(data), length(length), owns(pass_ownership)
+    { }
+
+    ~ConnectorMsg()
+    { if (owns) delete[] data; }
+
+    ConnectorMsg(ConnectorMsg&) = delete;
+    ConnectorMsg& operator=(ConnectorMsg& other) = delete;
+
+    ConnectorMsg(ConnectorMsg&& other) :
+        data(other.data), length(other.length), owns(other.owns)
+    { other.owns = false; }
+
+    ConnectorMsg& operator=(ConnectorMsg&& other)
+    {
+        if ( owns )
+            delete[] data;
+
+        data = other.data;
+        length = other.length;
+        owns = other.owns;
+
+        other.owns = false;
+
+        return *this;
+    }
+
+    const uint8_t* get_data() const
+    { return data; }
+
+    uint32_t get_length() const
+    { return length; }
+
+private:
+    const uint8_t* data = nullptr;
+    uint32_t length = 0;
+    bool owns = false;
 };
 
 class SO_PUBLIC Connector
@@ -66,20 +101,25 @@ public:
         CONN_DUPLEX
     };
 
+    Connector(const ConnectorConfig& config) : config(config) { }
     virtual ~Connector() = default;
 
-    virtual ConnectorMsgHandle* alloc_message(const uint32_t, const uint8_t**) = 0;
-    virtual void discard_message(ConnectorMsgHandle*) = 0;
-    virtual bool transmit_message(ConnectorMsgHandle*) = 0;
-    virtual ConnectorMsgHandle* receive_message(bool block) = 0;
-    virtual ConnectorMsg* get_connector_msg(ConnectorMsgHandle*) = 0;
-    virtual Direction get_connector_direction() = 0;
+    virtual bool transmit_message(const ConnectorMsg&) = 0;
+    virtual bool transmit_message(const ConnectorMsg&&) = 0;
+
+    virtual ConnectorMsg receive_message(bool block) = 0;
 
-    const std::string connector_name;
-    const ConnectorConfig* config = nullptr;
+    virtual bool flush()
+    { return true; }
+
+    virtual void reinit()
+    { }
+
+    inline Direction get_connector_direction() const;
+    inline const std::string& get_connector_name() const;
 
 protected:
-    Connector() = default;
+    const ConnectorConfig& config;
 };
 
 class ConnectorConfig
@@ -92,6 +132,12 @@ public:
     virtual ~ConnectorConfig() = default;
 };
 
+Connector::Direction Connector::get_connector_direction() const
+{ return config.direction; }
+
+const std::string& Connector::get_connector_name() const
+{ return config.connector_name; }
+
 class SO_PUBLIC ConnectorCommon
 {
 public:
@@ -100,7 +146,7 @@ public:
 
 typedef ConnectorCommon* (* ConnectorNewFunc)(Module*);
 typedef void (* ConnectorDelFunc)(ConnectorCommon*);
-typedef Connector* (* ConnectorThreadInitFunc)(ConnectorConfig*);
+typedef Connector* (* ConnectorThreadInitFunc)(const ConnectorConfig&);
 typedef void (* ConnectorThreadTermFunc)(Connector*);
 typedef void (* ConnectorFunc)();
 
index e67657428bf7fad90e14aa2e1e16a59fe948fa3c..05f173eac35697b74c835c03db03f88ab38ea763 100644 (file)
@@ -52,6 +52,7 @@
 #include "main/swapper.h"
 #include "main.h"
 #include "managers/action_manager.h"
+#include "managers/connector_manager.h"
 #include "managers/codec_manager.h"
 #include "managers/inspector_manager.h"
 #include "managers/ips_manager.h"
@@ -642,6 +643,7 @@ void Analyzer::init_unprivileged()
     IpsManager::setup_options(sc);
     ActionManager::thread_init(sc);
     FileService::thread_init();
+    ConnectorManager::thread_init();
     SideChannelManager::thread_init();
     HighAvailabilityManager::thread_init(); // must be before InspectorManager::thread_init();
     InspectorManager::thread_init(sc);
@@ -667,6 +669,7 @@ void Analyzer::reinit(const SnortConfig* sc)
     ActionManager::thread_reinit(sc);
     TraceApi::thread_reinit(sc->trace_config);
     EventManager::reload_outputs();
+    ConnectorManager::thread_reinit();
 }
 
 void Analyzer::stop_removed(const SnortConfig* sc)
@@ -704,6 +707,7 @@ void Analyzer::term()
     CodecManager::thread_term();
     HighAvailabilityManager::thread_term();
     SideChannelManager::thread_term();
+    ConnectorManager::thread_term();
 
     oops_handler->set_current_message(nullptr, nullptr);
 
index e7aa89717b359bbc361675c66eb394ef28ea4356..8b2ef50a306894dd22adaaac3f0530b73cd396b7 100644 (file)
@@ -34,6 +34,7 @@
 #include "log/messages.h"
 #include "managers/action_manager.h"
 #include "managers/codec_manager.h"
+#include "managers/connector_manager.h"
 #include "managers/event_manager.h"
 #include "managers/inspector_manager.h"
 #include "managers/ips_manager.h"
@@ -99,6 +100,9 @@ void RuleLatency::tterm() { }
 void rule_latency::set_force_enable(bool) { }
 void PacketLatency::tterm() { }
 void packet_latency::set_force_enable(bool) { }
+void ConnectorManager::thread_init() { }
+void ConnectorManager::thread_reinit() { }
+void ConnectorManager::thread_term() { }
 void SideChannelManager::thread_init() { }
 void SideChannelManager::thread_term() { }
 void CodecManager::thread_init() { }
index c5b6b53054898303be726637d1577786c01a0a11..7774807ec8a05ec9c55a4f7b4514518731075dde 100644 (file)
@@ -30,6 +30,8 @@
 
 #include "framework/connector.h"
 #include "log/messages.h"
+#include "main/thread.h"
+#include "main/thread_config.h"
 #include "utils/util.h"
 
 using namespace snort;
@@ -39,8 +41,11 @@ using namespace snort;
 // One ConnectorElem for each Connector within the ConnectorCommon configuration
 struct ConnectorElem
 {
+    ConnectorElem() : config(nullptr), thread_connectors(ThreadConfig::get_instance_max(), nullptr)
+    { }
+
     ConnectorConfig* config;
-    std::unordered_map<pid_t, Connector*> thread_connectors;
+    std::vector<Connector*> thread_connectors;
 };
 
 // One ConnectorCommonElem created for each ConnectorCommon configured
@@ -99,12 +104,12 @@ Connector* ConnectorManager::get_connector(const std::string& connector_name)
 {
     for ( auto& sc : s_connector_commons )
     {
-        pid_t tid = gettid();
+        unsigned instance = get_instance_id();
         if ( sc.connectors.count(connector_name) > 0 )
         {
             ConnectorElem* map = sc.connectors[connector_name];
-            if ( map->thread_connectors.count(tid) == 1 )
-                return ( map->thread_connectors[tid] );
+            if ( map->thread_connectors[instance] )
+                return ( map->thread_connectors[instance] );
         }
     }
     return ( nullptr );
@@ -112,27 +117,42 @@ Connector* ConnectorManager::get_connector(const std::string& connector_name)
 
 void ConnectorManager::thread_init()
 {
-    pid_t tid = gettid();
+    unsigned instance = get_instance_id();
 
     for ( auto& sc : s_connector_commons )
     {
         if ( sc.api->tinit )
         {
-            for ( auto& conn : sc.connectors )
+            for ( const auto& conn : sc.connectors )
             {
-                /* There must NOT be a connector for this thread present. */
-                assert(conn.second->thread_connectors.count(tid) == 0);
+                assert(!conn.second->thread_connectors[instance]);
 
-                Connector* connector = sc.api->tinit(conn.second->config);
-                conn.second->thread_connectors.emplace(tid, std::move(connector));
+                Connector* connector = sc.api->tinit(*conn.second->config);
+                assert(connector);
+                conn.second->thread_connectors[instance] = std::move(connector);
             }
         }
     }
 }
 
+void ConnectorManager::thread_reinit()
+{
+    unsigned instance = get_instance_id();
+
+    for ( auto& sc : s_connector_commons )
+    {
+        for ( auto& conn : sc.connectors )
+        {
+            assert(conn.second->thread_connectors[instance]);
+
+            conn.second->thread_connectors[instance]->reinit();
+        }
+    }
+}
+
 void ConnectorManager::thread_term()
 {
-    pid_t tid = gettid();
+    unsigned instance = get_instance_id();
 
     for ( auto& sc : s_connector_commons )
     {
@@ -140,12 +160,10 @@ void ConnectorManager::thread_term()
         {
             for ( auto& conn : sc.connectors )
             {
-                /* There must be a connector for this thread present. */
-                assert(conn.second->thread_connectors.count(tid) != 0);
-
-                sc.api->tterm(conn.second->thread_connectors[tid]);
+                assert(conn.second->thread_connectors[instance]);
 
-                conn.second->thread_connectors.clear();
+                sc.api->tterm(conn.second->thread_connectors[instance]);
+                conn.second->thread_connectors[instance] = nullptr;
             }
         }
     }
index 6657b65bddb2a3302a4884bc9a24fef64a9384e8..adb805b2cad3b62b3436841cd4d36dba8fba472d 100644 (file)
@@ -44,6 +44,7 @@ public:
     static void instantiate(const snort::ConnectorApi*, snort::Module*, snort::SnortConfig*);
 
     static void thread_init();
+    static void thread_reinit();
     static void thread_term();
 
     /* get_connector() returns the thread-specific object. */
index 6190c87f3f478b7e01de0c27106d44602a601596..80ed01225c8bfb9f56aedc909fd9fc9ec6f06aaf 100644 (file)
@@ -4,6 +4,8 @@ add_library( side_channel OBJECT
     side_channel.h
     side_channel_module.cc
     side_channel_module.h
+    side_channel_format.cc
+    side_channel_format.h
 )
 
 add_subdirectory(test)
index 490f7130bd7d5fd3bf0c30b14f9164d24d8571e8..fefa661671c7b77ef9107c3eae6ef1e9b300163a 100644 (file)
@@ -67,5 +67,12 @@ The dynamic message allocation architecture is flexible and asynchronous.  Upon
 transmitting the message, the client transfers ownership to the connector.  The
 connector then discards the message upon transmission.
 
+Side Channel can format messages in text and binary modes. In text mode
+messages are prepended with header in format "<port>:<time_sec>.<time_usec>"
+and content is printed with hex values separated with a comma. By using
+alloc_transmit_message() and discard_message() user ensures proper translation,
+since in text mode we need to allocate a buffer that will accommodate a
+converted message.
+
 A side channel can be bidirectional, but does not implement a request/reply
 paradigm.  Rather it should be viewed as two simplex channels.
index a197ecb541aeb30b091483a8bb9310d72f323bb6..e2c83703131e6e95a26a02dbb6680fbb3734181e 100644 (file)
 
 #include <algorithm>
 #include <cassert>
+#include <cstring>
 
 #include "framework/counts.h"
 #include "managers/connector_manager.h"
 #include "profiler/profiler_defs.h"
 
+#include "side_channel_format.h"
+
 using namespace snort;
 
 /* Globals ****************************************************************/
@@ -45,6 +48,7 @@ struct SideChannelMapping
     SideChannel* sc;
     std::vector<std::string> connectors;
     PortBitSet ports;
+    ScMsgFormat format;
 };
 
 typedef std::vector<SideChannelMapping*> SCMaps;
@@ -68,25 +72,19 @@ SideChannel* SideChannelManager::get_side_channel(SCPort port)
     return nullptr;
 }
 
-void SideChannel::set_message_port(SCMessage* msg, SCPort port)
-{
-    assert ( msg );
-    assert ( msg->hdr );
-    msg->hdr->port = port;
-}
-
 void SideChannel::set_default_port(SCPort port)
 {
     default_port = port;
 }
 
-void SideChannelManager::instantiate(const SCConnectors* connectors, const PortBitSet* ports)
+void SideChannelManager::instantiate(const SCConnectors* connectors, const PortBitSet* ports, ScMsgFormat fmt)
 {
     SideChannelMapping* scm = new SideChannelMapping;
 
     scm->sc = nullptr;
     scm->connectors = *connectors;
     scm->ports = *ports;
+    scm->format = fmt;
 
     s_maps.emplace_back(scm);
 }
@@ -100,10 +98,6 @@ void SideChannelManager::pre_config_init()
 // Within each thread, instantiate the connectors, etc.
 void SideChannelManager::thread_init()
 {
-
-    // First startup the connectors
-    ConnectorManager::thread_init();
-
     /* New SideChannel map vector just for this thread */
     SCMaps* map_list = new SCMaps;
 
@@ -112,11 +106,11 @@ void SideChannelManager::thread_init()
     {
         /* New SideChannel just for this thread */
         SideChannelMapping* map = new SideChannelMapping;
-        SideChannel* sc = new SideChannel;
+        SideChannel* sc = new SideChannel(scm->format);
         map->sc = sc;
         map->ports = scm->ports;
 
-        for ( auto& conn_name : scm->connectors )
+        for ( const auto& conn_name : scm->connectors )
         {
             Connector* connector = ConnectorManager::get_connector(conn_name);
             if ( connector == nullptr )
@@ -150,13 +144,9 @@ void SideChannelManager::thread_init()
 // Within each thread, shutdown the sidechannel
 void SideChannelManager::thread_term()
 {
-
-    // First shutdown the connectors
-    ConnectorManager::thread_term();
-
     if (tls_maps)
     {
-        for ( auto& map : *tls_maps )
+        for ( const auto& map : *tls_maps )
         {
             delete map->sc;
             delete map;
@@ -175,6 +165,9 @@ void SideChannelManager::term()
     s_maps.shrink_to_fit();
 }
 
+SideChannel::SideChannel(ScMsgFormat fmt) : msg_format(fmt)
+{ }
+
 // receive at most max_messages.  Zero indicates unlimited.
 // return true iff we received any messages.
 bool SideChannel::process(int max_messages)
@@ -184,35 +177,32 @@ bool SideChannel::process(int max_messages)
     while (true)
     {
         // get message if one is available.
-        ConnectorMsgHandle* handle = connector_receive->receive_message(false);
+        ConnectorMsg connector_msg = connector_receive->receive_message(false);
+
+        if ( connector_msg.get_length() > 0 and msg_format == ScMsgFormat::TEXT )
+        {
+            connector_msg = from_text((const char*)connector_msg.get_data(), connector_msg.get_length());
+        }
 
-        // if none, we are complete
-        if ( !handle )
+        // if none or invalid, we are complete
+        if ( connector_msg.get_length() == 0 )
             break;
 
-        else if ( receive_handler )
+        if ( receive_handler )
         {
-            SCMessage* msg = new SCMessage;
+            SCMessage* msg = new SCMessage(this, connector_receive, std::move(connector_msg));
 
-            // get the ConnectorMsg from the (at this point) abstract class
-            ConnectorMsg* connector_msg = connector_receive->get_connector_msg(handle);
-
-            msg->content = connector_msg->data;
-            msg->content_length = connector_msg->length;
+            msg->content = const_cast<uint8_t*>(msg->cmsg.get_data());
+            msg->content_length = msg->cmsg.get_length();
 
             // if the message is longer than the header, assume we have a header
-            if ( connector_msg->length >= sizeof(SCMsgHdr) )
+            if ( msg->cmsg.get_length() > sizeof(SCMsgHdr) )
             {
-                msg->sc = this;
-                msg->connector = connector_receive;
-                msg->hdr = (SCMsgHdr*)connector_msg->data;
                 msg->content += sizeof(SCMsgHdr);
                 msg->content_length -= sizeof( SCMsgHdr );
             }
 
-            msg->handle = handle;   // link back to the underlying SCC message
             received_message = true;
-
             receive_handler(msg);
         }
 
@@ -232,48 +222,101 @@ void SideChannel::unregister_receive_handler()
     receive_handler = nullptr;
 }
 
+SCMsgHdr SideChannel::get_header()
+{
+    struct timeval tm;
+    (void)gettimeofday(&tm, nullptr);
+
+    SCMsgHdr hdr;
+    hdr.port = default_port;
+    hdr.time_seconds = (uint64_t)tm.tv_sec;
+    hdr.time_u_seconds = (uint32_t)tm.tv_usec;
+    hdr.sequence = sequence++;
+
+    return hdr;
+}
+
 SCMessage* SideChannel::alloc_transmit_message(uint32_t content_length)
 {
-    SCMessage* msg = new SCMessage;
-    msg->handle = connector_transmit->alloc_message((content_length + sizeof(SCMsgHdr)),
-        const_cast<const uint8_t**>(reinterpret_cast<uint8_t**>(&msg->hdr)));
-    assert(msg->handle);
+    SCMessage* msg = nullptr;
+    const SCMsgHdr sc_hdr = get_header();
+
+    switch (msg_format)
+    {
+    case ScMsgFormat::BINARY:
+    {
+        uint8_t* msg_data = new uint8_t[sizeof(SCMsgHdr) + content_length];
+
+        memcpy(msg_data, &sc_hdr, sizeof(SCMsgHdr));
 
-    msg->sc = this;
-    msg->connector = connector_transmit;
-    msg->content_length = content_length;
-    msg->content = (uint8_t*)msg->hdr + sizeof(SCMsgHdr);
-    msg->hdr->port = default_port;
+        ConnectorMsg bin_cmsg(msg_data, sizeof(SCMsgHdr) + content_length, true);
+
+        msg = new SCMessage(this, connector_transmit, std::move(bin_cmsg));
+        msg->content = msg_data + sizeof(SCMsgHdr);
+        msg->content_length = content_length;
+
+        break;
+    }
+
+    case ScMsgFormat::TEXT:
+    {
+        std::string hdr_text = sc_msg_hdr_to_text(&sc_hdr);
+
+        if (hdr_text.empty())
+            break;
+
+        const uint32_t msg_len = hdr_text.size() + (content_length * TXT_UNIT_LEN);
+        uint8_t* msg_data = new uint8_t[msg_len];
+
+        memcpy(msg_data, hdr_text.c_str(), hdr_text.size());
+
+        ConnectorMsg text_cmsg(msg_data, msg_len, true);
+
+        msg = new SCMessage(this, connector_transmit, std::move(text_cmsg));
+        msg->content = msg_data + hdr_text.size();
+        msg->content_length = content_length;
+
+        break;
+    }
+
+    default:
+        break;
+    }
 
     return msg;
 }
 
-bool SideChannel::discard_message(SCMessage* msg)
+bool SideChannel::discard_message(SCMessage* msg) const
 {
     assert(msg);
-    assert(msg->handle);
 
-    msg->connector->discard_message (msg->handle);
     delete msg;
+
     return true;
 }
 
-bool SideChannel::transmit_message(SCMessage* msg)
+bool SideChannel::transmit_message(SCMessage* msg) const
 {
-    bool return_value = false;
+    if ( !connector_transmit or !msg )
+        return false;
 
-    if ( connector_transmit && msg->handle )
+    if ( msg_format == ScMsgFormat::TEXT )
     {
-        struct timeval tm;
-        (void)gettimeofday(&tm,nullptr);
-        msg->hdr->time_seconds = (uint64_t)tm.tv_sec;
-        msg->hdr->time_u_seconds = (uint32_t)tm.tv_usec;
-        msg->hdr->sequence = sequence++;
-
-        return_value = connector_transmit->transmit_message(msg->handle);
-        delete msg;
+        std::string text = sc_msg_data_to_text(msg->content, msg->content_length);
+
+        if ( text.size() != msg->cmsg.get_length() - (uint32_t)(msg->content - msg->cmsg.get_data()) )
+        {
+            delete msg;
+            return false;
+        }
+
+        memcpy(msg->content, text.c_str(), text.size());
     }
 
+    bool return_value = connector_transmit->transmit_message(msg->cmsg);
+
+    delete msg;
+
     return return_value;
 }
 
index c73b4326114d4e3d098f97b8d06bbf50e8e255f8..f7cc9267310861190dc705cd0d014233968a35f2 100644 (file)
@@ -24,7 +24,6 @@
 #include "framework/connector.h"
 #include "utils/bits.h"
 
-#define MAXIMUM_SC_MESSAGE_CONTENT 1024
 #define DISPATCH_ALL_RECEIVE 0
 
 class SideChannel;
@@ -36,20 +35,29 @@ typedef std::vector<std::string> SCConnectors;
 
 struct __attribute__((__packed__)) SCMsgHdr
 {
-    uint16_t port;
-    uint16_t sequence;
-    uint32_t time_u_seconds;
-    uint64_t time_seconds;
+    uint16_t port = 0;
+    uint16_t sequence = 0;
+    uint32_t time_u_seconds = 0;
+    uint64_t time_seconds = 0;
+};
+
+enum ScMsgFormat : uint8_t
+{
+    BINARY,
+    TEXT
 };
 
 struct SCMessage
 {
-    SideChannel* sc;
-    snort::Connector* connector;
-    snort::ConnectorMsgHandle* handle;
-    SCMsgHdr* hdr;
-    uint8_t* content;
-    uint32_t content_length;
+    SCMessage(const SideChannel* sc, const snort::Connector* conn, snort::ConnectorMsg&& cmsg) :
+        sc(sc), connector(conn), cmsg(std::move(cmsg))
+    {}
+
+    const SideChannel* sc;
+    const snort::Connector* connector;
+    const snort::ConnectorMsg cmsg;
+    uint8_t* content = nullptr;
+    uint32_t content_length = 0;
 };
 
 typedef std::function<void(SCMessage*)> SCProcessMsgFunc;
@@ -58,16 +66,15 @@ typedef std::function<void(SCMessage*)> SCProcessMsgFunc;
 class SideChannel
 {
 public:
-    SideChannel() = default;
+    SideChannel(ScMsgFormat);
 
     void register_receive_handler(const SCProcessMsgFunc& handler);
     void unregister_receive_handler();
 
     bool process(int max_messages);
     SCMessage* alloc_transmit_message(uint32_t content_length);
-    bool discard_message(SCMessage* msg);
-    bool transmit_message(SCMessage* msg);
-    void set_message_port(SCMessage* msg, SCPort port);
+    bool discard_message(SCMessage* msg) const;
+    bool transmit_message(SCMessage* msg) const;
     void set_default_port(SCPort port);
     snort::Connector::Direction get_direction();
 
@@ -75,9 +82,12 @@ public:
     snort::Connector* connector_transmit = nullptr;
 
 private:
+    SCMsgHdr get_header();
+
     SCSequence sequence = 0;
     SCPort default_port = 0;
     SCProcessMsgFunc receive_handler = nullptr;
+    ScMsgFormat msg_format;
 };
 
 // SideChannelManager is primary interface with Snort.
@@ -85,7 +95,7 @@ class SideChannelManager
 {
 public:
     // Instantiate new SideChannel configuration
-    static void instantiate(const SCConnectors* connectors, const PortBitSet* ports);
+    static void instantiate(const SCConnectors* connectors, const PortBitSet* ports, ScMsgFormat fmt);
 
     // Main thread, pre-config init
     static void pre_config_init();
diff --git a/src/side_channel/side_channel_format.cc b/src/side_channel/side_channel_format.cc
new file mode 100644 (file)
index 0000000..3c55f43
--- /dev/null
@@ -0,0 +1,311 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2024 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+
+// side_channel_format.cc author Vitalii Horbatov <vhorbato@cisco.com>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "side_channel_format.h"
+
+#include <cmath>
+#include <cstring>
+#include <iomanip>
+#include <sstream>
+
+using namespace snort;
+
+std::string sc_msg_hdr_to_text(const SCMsgHdr* sc_hdr)
+{
+    if ( !sc_hdr )
+        return std::string();
+
+    std::stringstream ss;
+
+    ss << sc_hdr->port << ':' << sc_hdr->time_seconds << '.' << sc_hdr->time_u_seconds;
+
+    if ( !ss.good() )
+        return std::string();
+
+    return ss.str();
+}
+
+std::string sc_msg_data_to_text(const uint8_t* data, uint32_t length)
+{
+    if ( !data or length == 0 )
+        return std::string();
+
+    std::stringstream ss;
+
+    const uint8_t* data_ptr = data;
+
+    for ( uint32_t i = 0; i < length; i++ )
+        ss << ',' << std::setfill('0') << std::setw(2) << std::hex << std::uppercase
+            << static_cast<int>(*data_ptr++);
+
+    if ( !ss.good() )
+        return std::string();
+
+    return ss.str();
+}
+
+ConnectorMsg from_text(const char* str_ptr, uint32_t size)
+{
+    if ( !str_ptr or size == 0 )
+        return ConnectorMsg();
+
+    const char* txt_data_ptr = strchr(str_ptr, ',');
+
+    if ( !txt_data_ptr )
+        return ConnectorMsg();
+
+    uint32_t hdr_len = (uint32_t)(txt_data_ptr - str_ptr);
+
+    if ( hdr_len < sizeof("1:1.1") - 1 )
+        return ConnectorMsg();
+
+    uint8_t* new_data = new uint8_t[sizeof(SCMsgHdr) + (uint32_t)ceil((double)(size - hdr_len) / TXT_UNIT_LEN)];
+    SCMsgHdr* sc_hdr = (SCMsgHdr*)new_data;
+
+    uint16_t port;
+    uint64_t time_seconds;
+    uint32_t time_u_seconds;
+
+    if ( sscanf(str_ptr, "%hu:%" SCNu64 ".%" SCNu32, &port, &time_seconds, &time_u_seconds) != 3 )
+    {
+        delete[] new_data;
+        return ConnectorMsg();
+    }
+
+    sc_hdr->port = port;
+    sc_hdr->time_seconds = time_seconds;
+    sc_hdr->time_u_seconds = time_u_seconds;
+    sc_hdr->sequence = 0;
+
+    uint32_t data_pos = sizeof(SCMsgHdr);
+    const char* txt_data_end = str_ptr + size;
+
+    do
+    {
+        int bytes_consumed = 0;
+
+        txt_data_ptr += 1;   // step to the character after the comma
+
+        if ( sscanf(txt_data_ptr, "%hhx%n", (unsigned char*)&(new_data[data_pos++]), &bytes_consumed) != 1 )
+        {
+            delete[] new_data;
+            return ConnectorMsg();
+        }
+
+        txt_data_ptr += bytes_consumed;
+    } while ( txt_data_ptr < txt_data_end and (txt_data_ptr = strchr(txt_data_ptr, (int)',')) != nullptr );
+
+    if ( data_pos <= sizeof(SCMsgHdr) )
+    {
+        delete[] new_data;
+        return ConnectorMsg();
+    }
+
+    return ConnectorMsg(new_data, data_pos, true);
+}
+
+
+//-------------------------------------------------------------------------
+// UNIT TESTS
+//-------------------------------------------------------------------------
+#ifdef UNIT_TEST
+#include "catch/snort_catch.h"
+
+#define CHECK_CMSG(cmsg, expected_hdr, expected_msg)                                                 \
+    REQUIRE(cmsg.get_data());                                                                       \
+    REQUIRE(cmsg.get_length() == sizeof(expected_hdr) + sizeof(expected_msg));                      \
+    CHECK(memcmp(cmsg.get_data(), &expected_hdr, sizeof(expected_hdr)) == 0);                       \
+    CHECK(memcmp(cmsg.get_data() + sizeof(expected_hdr), expected_msg, sizeof(expected_msg)) == 0); \
+
+#define CHECK_NO_CMSG(cmsg)                                                                          \
+    REQUIRE(cmsg.get_data() == nullptr);                                                            \
+    REQUIRE(cmsg.get_length() == 0);                                                                \
+
+TEST_CASE("hdr_to_text", "[side_channel]")
+{
+    SECTION("basic")
+    {
+        SCMsgHdr data = {1, 2, 3, 4};
+        std::string expected_txt("1:4.3");
+
+        std::string txt = sc_msg_hdr_to_text(&data);
+
+        CHECK(txt.size() == expected_txt.size());
+        CHECK(txt == expected_txt);
+    }
+    SECTION("max_values")
+    {
+        SCMsgHdr data = {UINT16_MAX, UINT16_MAX, UINT32_MAX, UINT64_MAX};
+        std::string expected_txt("65535:18446744073709551615.4294967295");
+
+        std::string txt = sc_msg_hdr_to_text(&data);
+
+        CHECK(txt.size() == expected_txt.size());
+        CHECK(txt == expected_txt);
+    }
+    SECTION("zeroes")
+    {
+        SCMsgHdr data = {0, 0, 0, 0};
+        std::string expected_txt("0:0.0");
+
+        std::string txt = sc_msg_hdr_to_text(&data);
+
+        CHECK(txt.size() == expected_txt.size());
+        CHECK(txt == expected_txt);
+    }
+}
+
+TEST_CASE("content_to_text", "[side_channel]")
+{
+    SECTION("basic")
+    {
+        uint8_t data_len = 6;
+        uint8_t* data = new uint8_t[data_len]{0x00, 0x01, 0x0a, 0xab, 0xbb, 0xff};
+
+        std::string expected_txt(",00,01,0A,AB,BB,FF");
+
+        std::string txt = sc_msg_data_to_text(data, data_len);
+
+        CHECK(txt.size() == expected_txt.size());
+        CHECK(txt == expected_txt);
+
+        delete[] data;
+    }
+    SECTION("single_value")
+    {
+        uint8_t data_len = 1;
+        uint8_t* data = new uint8_t[data_len]{0x00};
+
+        std::string expected_txt(",00");
+
+        std::string txt = sc_msg_data_to_text(data, data_len);
+
+        CHECK(txt.size() == expected_txt.size());
+        CHECK(txt == expected_txt);
+
+        delete[] data;
+    }
+    SECTION("data - nullptr")
+    {
+        std::string txt = sc_msg_data_to_text(nullptr, 10);
+
+        CHECK(txt.empty());
+    }
+    SECTION("zero length")
+    {
+        uint8_t data_len = 1;
+        uint8_t* data = new uint8_t[data_len]{0x00};
+
+        std::string txt = sc_msg_data_to_text(data, 0);
+
+        CHECK(txt.empty());
+
+        delete[] data;
+    }
+}
+
+TEST_CASE("from_text", "[side_channel]")
+{
+    SECTION("positive")
+    {
+        SECTION("basic")
+        {
+            std::string txt_msg("1:2.3,00,01,0A,AB,BB,FF");
+
+            SCMsgHdr expected_hdr = {1, 0, 3, 2};
+            uint8_t expected_data[] = {0x00, 0x01, 0x0a, 0xab, 0xbb, 0xff};
+
+            ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_CMSG(cmsg, expected_hdr, expected_data);
+        }
+        SECTION("single_char_msg")
+        {
+            std::string txt_msg("1:2.3,00");
+
+            SCMsgHdr expected_hdr = {1, 0, 3, 2};
+            uint8_t expected_data[] = {0x00};
+
+            ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_CMSG(cmsg, expected_hdr, expected_data);
+        }
+        SECTION("header_zero")
+        {
+            std::string txt_msg("0:0.0,00");
+
+            SCMsgHdr expected_hdr = {0, 0, 0, 0};
+            uint8_t expected_data[] = {0x00};
+
+            ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_CMSG(cmsg, expected_hdr, expected_data);
+        }
+        SECTION("hdr_max_values")
+        {
+            std::string txt_msg("65535:18446744073709551615.4294967295,00");
+
+            SCMsgHdr expected_hdr = {UINT16_MAX, 0, UINT32_MAX, UINT64_MAX};
+            uint8_t expected_data[] = {0x00};
+
+            ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_CMSG(cmsg, expected_hdr, expected_data);
+        }
+    }
+    SECTION("negative")
+    {
+        SECTION("empty_string")
+        {
+            const ConnectorMsg cmsg = from_text("", 0);
+            CHECK_NO_CMSG(cmsg);
+        }
+        SECTION("too_short")
+        {
+            std::string txt_msg("65535");
+
+            const ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_NO_CMSG(cmsg);
+        }
+        SECTION("invalid_hdr_format")
+        {
+            std::string txt_msg("11111,00");
+
+            const ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_NO_CMSG(cmsg);
+        }
+        SECTION("no_msg")
+        {
+            std::string txt_msg("1:2.3");
+
+            const ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_NO_CMSG(cmsg);
+        }
+        SECTION("invalid_msg_format")
+        {
+            std::string txt_msg("1:2.3.foobar");
+
+            const ConnectorMsg cmsg = from_text(txt_msg.c_str(), txt_msg.size());
+            CHECK_NO_CMSG(cmsg);
+        }
+    }
+}
+
+#endif
diff --git a/src/side_channel/side_channel_format.h b/src/side_channel/side_channel_format.h
new file mode 100644 (file)
index 0000000..ee2e614
--- /dev/null
@@ -0,0 +1,33 @@
+//--------------------------------------------------------------------------
+// Copyright (C) 2015-2024 Cisco and/or its affiliates. All rights reserved.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License Version 2 as published
+// by the Free Software Foundation.  You may not use, modify or distribute
+// this program under any other version of the GNU General Public License.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program; if not, write to the Free Software Foundation, Inc.,
+// 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//--------------------------------------------------------------------------
+
+#ifndef SIDE_CHANNEL_FORMAT_H
+#define SIDE_CHANNEL_FORMAT_H
+
+#include <string>
+
+#include "framework/connector.h"
+#include "side_channel/side_channel.h"
+
+#define TXT_UNIT_LEN                3
+
+std::string sc_msg_hdr_to_text(const SCMsgHdr* hdr);
+std::string sc_msg_data_to_text(const uint8_t* data, uint32_t length);
+snort::ConnectorMsg from_text(const char* str_ptr, uint32_t size);
+
+#endif
index af37cda9438df39b3f3146b34974e72cca49e686..95223159b3f88edf69d3a38559386b771ea4ca87 100644 (file)
@@ -55,6 +55,9 @@ static const Parameter sc_params[] =
     { "connector", Parameter::PT_STRING, nullptr, nullptr,
       "connector handle" },
 
+    { "format", Parameter::PT_ENUM, "binary | text", nullptr,
+      "data output format" },
+
     { nullptr, Parameter::PT_MAX, nullptr, nullptr, nullptr }
 };
 
@@ -97,7 +100,10 @@ bool SideChannelModule::set(const char*, Value& v, SnortConfig*)
 {
     assert(config);
 
-    if ( v.is("connector") )
+    if ( v.is("format") )
+        config->format = (ScMsgFormat)v.get_uint8();
+
+    else if ( v.is("connector") )
         config->connectors.emplace_back(std::move(v.get_string()));
 
     else if ( v.is("ports") )
@@ -134,7 +140,7 @@ bool SideChannelModule::end(const char* fqn, int idx, SnortConfig*)
 
     // Instantiate the side channel.  The name links
     // to the side channel connector(s).
-    SideChannelManager::instantiate(&config->connectors, config->ports);
+    SideChannelManager::instantiate(&config->connectors, config->ports, config->format);
 
     delete config->ports;
     delete config;
index ffa80c44352c2b5ea1b810b2592ed02c2b7c0e90..097cb6068a80a3af51fd9c7cbfa671e45475eac2 100644 (file)
@@ -31,6 +31,7 @@ struct SideChannelConfig
 {
     PortBitSet* ports = nullptr;
     SCConnectors connectors;
+    ScMsgFormat format;
 };
 
 extern THREAD_LOCAL SimpleStats sc_stats;
index f09d3c530661e443904964db4f87bba9ea01e577..dcfd60d1a195ff8bc39666f6aa35c7316389020d 100644 (file)
@@ -1,5 +1,7 @@
 add_cpputest( side_channel_test
-    SOURCES ../side_channel.cc
+    SOURCES
+        ../side_channel.cc
+        $<TARGET_OBJECTS:catch_tests>
 )
 
 add_cpputest( side_channel_module_test
index fcd1d0e8f6bfc99e59b47b8770bd18d11624cd2f..1766b0103c236fbc8dd75162f56aacd91a156d60 100644 (file)
@@ -39,6 +39,7 @@ THREAD_LOCAL SimpleStats sc_stats;
 THREAD_LOCAL ProfileStats sc_perf_stats;
 
 static bool port_1_set = false;
+static bool text_fmt_set = false;
 
 static char* make_bit_string(int bit)
 {
@@ -51,9 +52,10 @@ static char* make_bit_string(int bit)
     return bit_string;
 }
 
-void SideChannelManager::instantiate(const SCConnectors*, const PortBitSet* ports)
+void SideChannelManager::instantiate(const SCConnectors*, const PortBitSet* ports, ScMsgFormat fmt)
 {
     port_1_set = ports->test(1);
+    text_fmt_set = fmt == ScMsgFormat::TEXT;
 }
 
 void show_stats(PegCount*, const PegInfo*, unsigned, const char*) { }
@@ -78,14 +80,17 @@ TEST(side_channel_module, test_connector_module_valid)
     Value ports_val(make_bit_string(1));
     Value connector_t_val("transmit");
     Value connector_r_val("receive");
+    Value fmt_val((uint64_t)1);
     Parameter ports_param = {"ports", Parameter::PT_BIT_LIST, "65535", nullptr, "ports"};
     Parameter connector_param = {"connector", Parameter::PT_STRING, nullptr, nullptr, "connector"};
+    Parameter format_param = {"format", Parameter::PT_ENUM, "binary | text", nullptr, "format"};
 
     SideChannelModule module;
 
     ports_val.set(&ports_param);
     connector_t_val.set(&connector_param);
     connector_r_val.set(&connector_param);
+    fmt_val.set(&format_param);
 
     module.begin("side_channel", 0, nullptr);
     module.begin("side_channel", 1, nullptr);
@@ -98,10 +103,12 @@ TEST(side_channel_module, test_connector_module_valid)
     module.end("side_channel.connectors", 2, nullptr);
     module.end("side_channel.connectors", 0, nullptr);
     module.set("side_channel.ports", ports_val, nullptr);
+    module.set("side_channel.format", fmt_val, nullptr);
     module.end("side_channel", 1, nullptr);
     module.end("side_channel", 0, nullptr);
 
     CHECK(port_1_set == true);
+    CHECK(text_fmt_set == true);
 }
 
 int main(int argc, char** argv)
index 5471606f0e261fcc847ed1203ff952650adc5a4f..415e6db715d102d1d59492668f1a21eae19d8f6a 100644 (file)
@@ -24,6 +24,7 @@
 #endif
 
 #include "side_channel/side_channel.h"
+#include "side_channel/side_channel_format.h"
 
 #include "log/messages.h"
 #include "managers/connector_manager.h"
 
 using namespace snort;
 
-class TestConnectorMsgHandle : public ConnectorMsgHandle
+ConnectorConfig duplex_conf;
+
+class DuplexConnector : public Connector
 {
 public:
-    TestConnectorMsgHandle(const uint32_t length)
-    {
-        connector_msg.length = length;
-        connector_msg.data = new uint8_t[length];
-    }
+    DuplexConnector() : Connector(duplex_conf) { }
 
-    ~TestConnectorMsgHandle()
+    bool transmit_message(const ConnectorMsg&) override
+    { return true; }
+    bool transmit_message(const ConnectorMsg&&) override
+    { return true; }
+    ConnectorMsg receive_message(bool) override
     {
-        delete[] connector_msg.data;
-    }
-
-    ConnectorMsg connector_msg;
-};
+        const uint32_t length = 30;
+        uint8_t* data = new uint8_t[sizeof(SCMsgHdr) + length];
 
-class DuplexConnector : public Connector
-{
-    ConnectorMsgHandle* alloc_message(const uint32_t length, const uint8_t** data) override
-    {
-        TestConnectorMsgHandle* msg = new TestConnectorMsgHandle(length);
-        *data = (uint8_t*)msg->connector_msg.data;
-        return msg;
+        return ConnectorMsg(data, length, true);
     }
-    void discard_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; }
-    bool transmit_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; return true; }
-    ConnectorMsgHandle* receive_message(bool) override
-    { return new TestConnectorMsgHandle(30); }
-    ConnectorMsg* get_connector_msg(ConnectorMsgHandle* handle) override
-    { return &((TestConnectorMsgHandle*)handle)->connector_msg; }
-    Direction get_connector_direction() override
-    { return CONN_DUPLEX; }
 };
 
+ConnectorConfig receive_conf;
+
 class ReceiveConnector : public Connector
 {
-    ConnectorMsgHandle* alloc_message(const uint32_t length, const uint8_t** data) override
+public:
+    ReceiveConnector() : Connector(receive_conf) { }
+
+    bool transmit_message(const ConnectorMsg&) override
+    { return false; }
+    bool transmit_message(const ConnectorMsg&&) override
+    { return false; }
+    ConnectorMsg receive_message(bool) override
     {
-        TestConnectorMsgHandle* msg = new TestConnectorMsgHandle(length);
-        *data = (uint8_t*)msg->connector_msg.data;
-        return msg;
+        const uint32_t length = 30;
+        uint8_t* data = new uint8_t[sizeof(SCMsgHdr) + length];
+
+        return ConnectorMsg(data, length, true);
     }
-    void discard_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; }
-    bool transmit_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; return true; }
-    ConnectorMsgHandle* receive_message(bool) override
-    { return new TestConnectorMsgHandle(30); }
-    ConnectorMsg* get_connector_msg(ConnectorMsgHandle* handle) override
-    { return &((TestConnectorMsgHandle*)handle)->connector_msg; }
-    Direction get_connector_direction() override
-    { return CONN_RECEIVE; }
 };
 
+ConnectorConfig transmit_conf;
+
 class TransmitConnector : public Connector
 {
-    ConnectorMsgHandle* alloc_message(const uint32_t length, const uint8_t** data) override
-    {
-        TestConnectorMsgHandle* msg = new TestConnectorMsgHandle(length);
-        *data = (uint8_t*)msg->connector_msg.data;
-        return msg;
-    }
-    void discard_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; }
-    bool transmit_message(ConnectorMsgHandle* msg) override
-    { delete (TestConnectorMsgHandle*)msg; return true; }
-    ConnectorMsgHandle* receive_message(bool) override
-    { return nullptr; }
-    ConnectorMsg* get_connector_msg(ConnectorMsgHandle*) override
-    { return nullptr; }
-    Direction get_connector_direction() override
-    { return CONN_TRANSMIT; }
+public:
+    TransmitConnector() : Connector(transmit_conf) { }
+
+    bool transmit_message(const ConnectorMsg&) override
+    { return true; }
+    bool transmit_message(const ConnectorMsg&&) override
+    { return true; }
+    ConnectorMsg receive_message(bool) override
+    { return ConnectorMsg(); }
 };
 
 Connector* receive_connector = nullptr;
@@ -116,8 +95,11 @@ Connector* duplex_connector = nullptr;
 
 void ConnectorManager::thread_init()
 {
+    receive_conf.direction = Connector::Direction::CONN_RECEIVE;
     receive_connector = new ReceiveConnector;
+    transmit_conf.direction = Connector::Direction::CONN_TRANSMIT;
     transmit_connector = new TransmitConnector;
+    duplex_conf.direction = Connector::Direction::CONN_DUPLEX;
     duplex_connector = new DuplexConnector;
 }
 
@@ -142,6 +124,19 @@ Connector* ConnectorManager::get_connector(const std::string& connector_name)
 
 void ParseWarning(WarningGroup, const char*, ...) { }
 
+std::string text_test_hdr;
+std::string sc_msg_hdr_to_text(const SCMsgHdr*)
+{ return text_test_hdr; }
+
+std::string text_test_data;
+std::string sc_msg_data_to_text(const uint8_t*, uint32_t)
+{ return text_test_data; }
+
+ConnectorMsg converted_test_msg;
+ConnectorMsg from_text(const char*, uint32_t)
+{ return std::move(converted_test_msg); }
+
+
 TEST_GROUP(side_channel)
 {
     void setup() override
@@ -154,7 +149,7 @@ TEST_GROUP(side_channel)
         test_connectors.emplace_back("T");
         test_ports.set(1);
 
-        SideChannelManager::instantiate(&test_connectors, &test_ports);
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::BINARY);
 
         test_connectors.clear();
         test_ports.reset(1);
@@ -162,7 +157,7 @@ TEST_GROUP(side_channel)
         test_connectors.emplace_back("R");
         test_ports.set(2);
 
-        SideChannelManager::instantiate(&test_connectors, &test_ports);
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::BINARY);
 
         test_connectors.clear();
         test_ports.reset(2);
@@ -170,14 +165,14 @@ TEST_GROUP(side_channel)
         test_connectors.emplace_back("T");
         test_ports.set(3);
 
-        SideChannelManager::instantiate(&test_connectors, &test_ports);
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::BINARY);
 
         test_connectors.clear();
         test_ports.reset(3);
 
         test_ports.set(4);
 
-        SideChannelManager::instantiate(&test_connectors, &test_ports);
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::BINARY);
 
         test_connectors.clear();
         test_ports.reset(4);
@@ -185,28 +180,45 @@ TEST_GROUP(side_channel)
         test_connectors.emplace_back("D");
         test_ports.set(5);
 
-        SideChannelManager::instantiate(&test_connectors, &test_ports);
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::BINARY);
 
         test_connectors.clear();
         test_ports.reset(5);
 
+        test_connectors.emplace_back("R");
+        test_ports.set(6);
+
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::TEXT);
+
+        test_connectors.clear();
+        test_ports.reset(6);
+
+        test_connectors.emplace_back("T");
+        test_ports.set(7);
+
+        SideChannelManager::instantiate(&test_connectors, &test_ports, ScMsgFormat::TEXT);
+
+        test_connectors.clear();
+        test_ports.reset(7);
+
+        ConnectorManager::thread_init();
         SideChannelManager::thread_init();
     }
 
     void teardown() override
     {
+        ConnectorManager::thread_term();
         SideChannelManager::thread_term();
         SideChannelManager::term();
+
+        text_test_hdr.clear();
+        text_test_data.clear();
     }
 };
 
-TEST(side_channel, test_connector_null)
-{
-}
-
 TEST(side_channel, test_connector_get_none)
 {
-    SideChannel* sc = SideChannelManager::get_side_channel(6);
+    const SideChannel* sc = SideChannelManager::get_side_channel(8);
     CHECK(sc == nullptr);
 }
 
@@ -233,7 +245,7 @@ TEST(side_channel, test_connector_directions)
     CHECK(sc->get_direction() == Connector::CONN_DUPLEX);
 }
 
-TEST(side_channel, test_connector_alloc_discard)
+TEST(side_channel, test_connector_alloc_discard_binary)
 {
     SideChannel* sc = SideChannelManager::get_side_channel(1);
     CHECK(sc != nullptr);
@@ -244,11 +256,37 @@ TEST(side_channel, test_connector_alloc_discard)
     SCMessage* msg = sc->alloc_transmit_message(30);
     CHECK(msg != nullptr);
 
+    CHECK(sc->discard_message(msg) == true);
+}
+
+TEST(side_channel, test_connector_alloc_discard_text)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(7);
+    CHECK(sc != nullptr);
+
+    sc->set_default_port(1);
+    text_test_hdr = "1:2.3";
+
+    SCMessage* msg = sc->alloc_transmit_message(30);
+    CHECK(msg != nullptr);
+
     bool success = sc->discard_message(msg);
     CHECK(success == true);
 }
 
-TEST(side_channel, test_connector_alloc_transmit)
+TEST(side_channel, test_connector_alloc_text_invalid_hdr)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(7);
+    CHECK(sc != nullptr);
+
+    sc->set_default_port(1);
+    text_test_hdr = "";
+
+    const SCMessage* msg = sc->alloc_transmit_message(30);
+    CHECK(msg == nullptr);
+}
+
+TEST(side_channel, test_connector_alloc_transmit_binary)
 {
     SideChannel* sc = SideChannelManager::get_side_channel(1);
     CHECK(sc != nullptr);
@@ -259,30 +297,92 @@ TEST(side_channel, test_connector_alloc_transmit)
     CHECK(msg != nullptr);
     CHECK(msg->content_length == 30);
     CHECK(msg->content != nullptr);
-    CHECK(msg->hdr->port == 2);
 
-    sc->set_message_port(msg,1);
-    CHECK(msg->hdr->port == 1);
+    CHECK(msg->cmsg.get_length() == 30 + sizeof(SCMsgHdr));
+    CHECK(msg->cmsg.get_data() != nullptr);
 
-    bool success = sc->transmit_message(msg);
-    CHECK(success == true);
+    CHECK(((SCMsgHdr*)msg->cmsg.get_data())->port == 2);
+;
+    CHECK(sc->transmit_message(msg) == true);
+}
+
+TEST(side_channel, test_connector_alloc_transmit_text)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(7);
+    CHECK(sc != nullptr);
+
+    sc->set_default_port(2);
+    text_test_hdr = "1:2.3";
+    text_test_data = ",01,02,03";
+
+    SCMessage* msg = sc->alloc_transmit_message(3);
+    CHECK(msg != nullptr);
+    CHECK(msg->content_length == 3);
+    CHECK(msg->content != nullptr);
+
+    CHECK(msg->cmsg.get_length() == 3 * TXT_UNIT_LEN + sc_msg_hdr_to_text(nullptr).length());
+    CHECK(msg->cmsg.get_data() != nullptr);
+
+    CHECK(sc->transmit_message(msg) == true);
+}
+
+TEST(side_channel, test_connector_alloc_transmit_text_invalid_data)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(7);
+    CHECK(sc != nullptr);
+
+    sc->set_default_port(2);
+    text_test_hdr = "1:2.3";
+    text_test_data = "";
+
+    SCMessage* msg = sc->alloc_transmit_message(3);
+    CHECK(msg != nullptr);
+
+    CHECK(sc->transmit_message(msg) == false);
 }
 
 static void receive_handler(SCMessage* sc_msg)
 {
     CHECK(sc_msg != nullptr);
+    CHECK(sc_msg->cmsg.get_data() != nullptr);
+    CHECK(sc_msg->cmsg.get_length() != 0);
     CHECK(sc_msg->sc != nullptr);
+    CHECK(sc_msg->connector != nullptr);
     sc_msg->sc->discard_message(sc_msg);
 }
 
-TEST(side_channel, test_connector_receive_process_dispatch_discard)
+TEST(side_channel, test_connector_receive_process_dispatch_discard_binary)
 {
     SideChannel* sc = SideChannelManager::get_side_channel(1);
     CHECK(sc != nullptr);
     sc->register_receive_handler(receive_handler);
 
-    bool success = sc->process(1);
-    CHECK(success == true);
+    CHECK(sc->process(1) == true);
+
+    sc->unregister_receive_handler();
+}
+
+TEST(side_channel, test_connector_receive_process_dispatch_discard_text)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(6);
+    CHECK(sc != nullptr);
+    sc->register_receive_handler(receive_handler);
+
+    converted_test_msg = ConnectorMsg((uint8_t*)"test", sizeof("test"), false);
+    CHECK(sc->process(1) == true);
+
+    sc->unregister_receive_handler();
+}
+
+TEST(side_channel, test_connector_receive_process_dispatch_discard_text_invalid)
+{
+    SideChannel* sc = SideChannelManager::get_side_channel(6);
+    CHECK(sc != nullptr);
+    sc->register_receive_handler(receive_handler);
+
+    converted_test_msg = ConnectorMsg();
+
+    CHECK(sc->process(1) == false);
 
     sc->unregister_receive_handler();
 }