static THREAD_LOCAL bool is_main_thread = false;
typedef void (* sighandler_t)(int);
-static int add_signal(int sig, sighandler_t, int check_needed);
+static bool add_signal(int sig, sighandler_t, bool check_needed);
static bool exit_pronto = true;
if ( !is_main_thread )
Snort::capture_packet();
- add_signal(signal, SIG_DFL, 0);
+ add_signal(signal, SIG_DFL, false);
raise(signal);
}
// SIG_IGN nor SIG_DFL
// FIXIT-L convert sigaction, etc. to c++11
-static int add_signal(int sig, sighandler_t signal_handler, int check_needed)
+static bool add_signal(int sig, sighandler_t signal_handler, bool check_needed)
{
sighandler_t pre_handler;
#endif
if (SIG_ERR == pre_handler)
{
- ErrorMessage("Could not add handler for signal %d \n", sig);
- return 0;
+ ParseError("Could not add handler for signal %d \n", sig);
+ return false;
}
else if (check_needed && (SIG_IGN != pre_handler) && (SIG_DFL!= pre_handler))
{
ParseWarning(WARN_CONF, "handler is already installed for signal %d.\n", sig);
}
- return 1;
+ return true;
}
void init_signals()
* Windows doesn't like all of these signals, and will
* set errno for some. Ignore/reset this error so it
* doesn't interfere with later checks of errno value. */
- add_signal(SIGTERM, exit_handler, 1);
- add_signal(SIGINT, exit_handler, 1);
- add_signal(SIGQUIT, dirty_handler, 1);
+ add_signal(SIGTERM, exit_handler, true);
+ add_signal(SIGINT, exit_handler, true);
+ add_signal(SIGQUIT, dirty_handler, true);
- add_signal(SIGNAL_SNORT_DUMP_STATS, dump_stats_handler, 1);
- add_signal(SIGNAL_SNORT_ROTATE_STATS, rotate_stats_handler, 1);
- add_signal(SIGNAL_SNORT_RELOAD, reload_config_handler, 1);
- add_signal(SIGNAL_SNORT_READ_ATTR_TBL, reload_attrib_handler, 1);
+ add_signal(SIGNAL_SNORT_DUMP_STATS, dump_stats_handler, true);
+ add_signal(SIGNAL_SNORT_ROTATE_STATS, rotate_stats_handler, true);
+ add_signal(SIGNAL_SNORT_RELOAD, reload_config_handler, true);
+ add_signal(SIGNAL_SNORT_READ_ATTR_TBL, reload_attrib_handler, true);
- add_signal(SIGPIPE, ignore_handler, 1);
- add_signal(SIGABRT, oops_handler, 1);
- add_signal(SIGSEGV, oops_handler, 1);
- add_signal(SIGBUS, oops_handler, 1);
+ add_signal(SIGPIPE, ignore_handler, true);
+ add_signal(SIGABRT, oops_handler, true);
+ add_signal(SIGSEGV, oops_handler, true);
+ add_signal(SIGBUS, oops_handler, true);
errno = 0;
}
+void term_signals()
+{
+ add_signal(SIGTERM, SIG_DFL, false);
+ add_signal(SIGINT, SIG_DFL, false);
+ add_signal(SIGQUIT, SIG_DFL, false);
+
+ add_signal(SIGNAL_SNORT_DUMP_STATS, SIG_DFL, false);
+ add_signal(SIGNAL_SNORT_ROTATE_STATS, SIG_DFL, false);
+ add_signal(SIGNAL_SNORT_RELOAD, SIG_DFL, false);
+ add_signal(SIGNAL_SNORT_READ_ATTR_TBL, SIG_DFL, false);
+
+ add_signal(SIGPIPE, SIG_DFL, false);
+ add_signal(SIGABRT, SIG_DFL, false);
+ add_signal(SIGSEGV, SIG_DFL, false);
+ add_signal(SIGBUS, SIG_DFL, false);
+}
+
static void help_signal(unsigned n, const char* name, const char* h)
{
cout << Markup::item();
LogMessage("initializing daemon mode\n");
// register signal handler so that parent can trap signal
- add_signal(SIGNAL_SNORT_CHILD_READY, child_ready_handler, 1);
+ add_signal(SIGNAL_SNORT_CHILD_READY, child_ready_handler, true);
pid_t cpid = fork();
const char* get_signal_name(PigSignal);
void init_signals();
+void term_signals();
void help_signals();
void daemonize();
#include <netinet/in.h>
#endif
+#include <mutex>
#include <string>
#include <thread>
+#include "helpers/ring.h"
#include "log/messages.h"
#include "main/analyzer.h"
#include "main/shell.h"
std::mutex Swapper::mutex;
static Swapper* swapper = NULL;
-static int exit_requested = 0;
+static bool exit_requested = false;
static int main_exit_code = 0;
static bool paused = false;
static bool shell_enabled = false;
#endif
+static std::mutex poke_mutex;
+static Ring<unsigned>* pig_poke = nullptr;
+
const struct timespec main_sleep = { 0, 1000000 }; // 0.001 sec
static const char* prompt = "o\")~ ";
#endif
}
+void main_poke(unsigned id)
+{
+ std::lock_guard<std::mutex> lock(poke_mutex);
+ pig_poke->put(id);
+}
+
+int main_read()
+{
+ std::lock_guard<std::mutex> lock(poke_mutex);
+ return pig_poke->get(-1);
+}
+
//-------------------------------------------------------------------------
// swap foo
//-------------------------------------------------------------------------
Analyzer* analyzer;
bool awaiting_privilege_change = false;
- Pig() { analyzer = nullptr; athread = nullptr; idx = (unsigned) -1; }
+ Pig() { analyzer = nullptr; athread = nullptr; idx = (unsigned)-1; }
void set_index(unsigned index) { idx = index; }
void Pig::start()
{
- if (!athread)
- {
- Swapper* ps = new Swapper(snort_conf, SFAT_GetConfig());
- LogMessage("++ [%u] %s\n", idx, analyzer->get_source());
- athread = new std::thread(std::ref(*analyzer), ps);
- }
+ assert(!athread);
+ LogMessage("++ [%u] %s\n", idx, analyzer->get_source());
+
+ Swapper* ps = new Swapper(snort_conf, SFAT_GetConfig());
+ athread = new std::thread(std::ref(*analyzer), ps);
}
void Pig::stop()
{
- if (analyzer)
- {
- if (athread)
- {
- if (analyzer->get_state() != Analyzer::State::STOPPED)
- execute(AC_STOP);
+ assert(analyzer);
+ assert(athread);
- athread->join();
- delete athread;
- athread = nullptr;
+ athread->join();
+ delete athread;
+ athread = nullptr;
- LogMessage("-- [%u] %s\n", idx, analyzer->get_source());
- }
+ LogMessage("-- [%u] %s\n", idx, analyzer->get_source());
- delete analyzer;
- analyzer = nullptr;
- }
+ delete analyzer;
+ analyzer = nullptr;
}
bool Pig::attentive()
analyzer->execute(ac);
return true;
}
+ assert(false);
return false;
}
static Pig* pigs = nullptr;
static unsigned max_pigs = 0;
+static Pig* get_lazy_pig(unsigned max)
+{
+ for ( unsigned i = 0; i < max; ++i )
+ if ( !pigs[i].analyzer )
+ return pigs + i;
+
+ assert(false);
+ return nullptr;
+}
+
//-------------------------------------------------------------------------
// main commands
//-------------------------------------------------------------------------
-static void broadcast(AnalyzerCommand ac)
+static bool broadcast(AnalyzerCommand ac)
{
- // FIXIT-H X - Broadcast should either wait for all pigs to be attentive
- // or queue commands for later processing.
+ for (unsigned idx = 0; idx < max_pigs; ++idx)
+ {
+ if ( !pigs[idx].attentive() )
+ {
+ // FIXIT-L queue commands when busy
+ request.respond("== busy, try again later\n");
+ return false;
+ }
+ }
for (unsigned idx = 0; idx < max_pigs; ++idx)
pigs[idx].execute(ac);
+
+ return true;
}
int main_dump_stats(lua_State*)
int main_rotate_stats(lua_State*)
{
- request.respond("== rotating stats\n");
- broadcast(AC_ROTATE);
+ if ( broadcast(AC_ROTATE) )
+ request.respond("== rotating stats\n");
+
return 0;
}
+static void main_load(Swapper* ps)
+{
+ std::lock_guard<std::mutex> lock(Swapper::mutex);
+
+ for ( unsigned idx = 0; idx < max_pigs; ++idx )
+ pigs[idx].swap(ps);
+}
+
int main_reload_config(lua_State* L)
{
if ( swapper )
proc_stats.conf_reloads++;
swapper = new Swapper(old, sc);
- std::lock_guard<std::mutex> lock(Swapper::mutex);
-
- for ( unsigned idx = 0; idx < max_pigs; ++idx )
- pigs[idx].swap(swapper);
-
+ main_load(swapper);
return 0;
}
return 0;
}
swapper = new Swapper(old, tc);
- std::lock_guard<std::mutex> lock(Swapper::mutex);
-
- for ( unsigned idx = 0; idx < max_pigs; ++idx )
- pigs[idx].swap(swapper);
-
+ main_load(swapper);
return 0;
}
int main_pause(lua_State*)
{
- request.respond("== pausing\n");
- broadcast(AC_PAUSE);
- paused = true;
+ if ( broadcast(AC_PAUSE) )
+ {
+ request.respond("== pausing\n");
+ paused = true;
+ }
return 0;
}
int main_resume(lua_State*)
{
- request.respond("== resuming\n");
- broadcast(AC_RESUME);
- paused = false;
+ if ( broadcast(AC_RESUME) )
+ {
+ request.respond("== resuming\n");
+ paused = false;
+ }
return 0;
}
int main_quit(lua_State*)
{
- exit_requested = 1;
- request.respond("== stopping\n");
- broadcast(AC_STOP);
+ if ( broadcast(AC_STOP) )
+ {
+ request.respond("== stopping\n");
+ exit_requested = true;
+ }
return 0;
}
return true;
}
+static void handle(Pig& pig, unsigned& swine, unsigned& pending_privileges)
+{
+ switch (pig.analyzer->get_state())
+ {
+ case Analyzer::State::NEW:
+ pig.start();
+ break;
+
+ case Analyzer::State::INITIALIZED:
+ if (pig.analyzer->requires_privileged_start() && pending_privileges &&
+ !Snort::has_dropped_privileges())
+ {
+ if (!pig.awaiting_privilege_change)
+ {
+ pig.awaiting_privilege_change = true;
+ pending_privileges--;
+ }
+ if (pending_privileges)
+ break;
+ Snort::drop_privileges();
+ }
+ pig.execute(AC_START);
+ break;
+
+ case Analyzer::State::STARTED:
+ if (!pig.analyzer->requires_privileged_start() && pending_privileges &&
+ !Snort::has_dropped_privileges())
+ {
+ if (!pig.awaiting_privilege_change)
+ {
+ pig.awaiting_privilege_change = true;
+ pending_privileges--;
+ }
+ if (pending_privileges)
+ break;
+ Snort::drop_privileges();
+ }
+ pig.execute(AC_RUN);
+ break;
+
+ case Analyzer::State::STOPPED:
+ pig.stop();
+ --swine;
+ break;
+
+ default:
+ break;
+ }
+}
+
static void main_loop()
{
- unsigned idx = max_pigs, swine = 0, pending_privileges = 0;
+ unsigned swine = 0, pending_privileges = 0;
if (SnortConfig::change_privileges())
pending_privileges = max_pigs;
if (!SnortConfig::read_mode())
{
for (swine = 0; swine < max_pigs; swine++)
- pigs[swine].prep(SFDAQ::get_input_spec(snort_conf, idx));
+ pigs[swine].prep(SFDAQ::get_input_spec(snort_conf, swine));
}
// Iterate over the drove, spawn them as allowed, and handle their deaths.
// FIXIT-L X - If an exit has been requested, we might want to have some mechanism
- // for forcing inconsiderate pigs to die in timely fashion.
- while (swine || (!exit_requested && (paused || Trough::has_next())))
+ // for forcing inconsiderate pigs to die in timely fashion.
+ while ( swine or paused or Trough::has_next() )
{
- if ( ++idx >= max_pigs )
- idx = 0;
+ const char* src;
+ int idx = paused ? -1 : main_read();
- if (!paused)
+ if ( idx >= 0 )
{
Pig& pig = pigs[idx];
- if (pig.analyzer)
- {
- switch (pig.analyzer->get_state())
- {
- case Analyzer::State::NEW:
- pig.start();
- break;
-
- case Analyzer::State::INITIALIZED:
- if (pig.analyzer->requires_privileged_start() && pending_privileges &&
- !Snort::has_dropped_privileges())
- {
- if (!pig.awaiting_privilege_change)
- {
- pig.awaiting_privilege_change = true;
- pending_privileges--;
- }
- if (pending_privileges)
- break;
- Snort::drop_privileges();
- }
- pig.execute(AC_START);
- break;
-
- case Analyzer::State::STARTED:
- if (!pig.analyzer->requires_privileged_start() && pending_privileges &&
- !Snort::has_dropped_privileges())
- {
- if (!pig.awaiting_privilege_change)
- {
- pig.awaiting_privilege_change = true;
- pending_privileges--;
- }
- if (pending_privileges)
- break;
- Snort::drop_privileges();
- }
- pig.execute(AC_RUN);
- break;
-
- case Analyzer::State::STOPPED:
- pig.stop();
- --swine;
- break;
-
- default:
- break;
- }
- }
- else if (const char* src = Trough::get_next())
- {
- pig.prep(src);
- ++swine;
- continue;
- }
- else if (pending_privileges)
+ if ( pig.analyzer )
+ handle(pig, swine, pending_privileges);
+
+ else if ( pending_privileges )
pending_privileges--;
+
+ if ( !swine and exit_requested )
+ break;
+
+ continue;
+ }
+ if ( !exit_requested and !paused and (swine < max_pigs) and (src = Trough::get_next()) )
+ {
+ Pig* pig = get_lazy_pig(max_pigs);
+ pig->prep(src);
+ ++swine;
+ continue;
}
service_check();
}
max_pigs = ThreadConfig::get_instance_max();
assert(max_pigs > 0);
+ pig_poke = new Ring<unsigned>(max_pigs+2);
pigs = new Pig[max_pigs];
for (unsigned idx = 0; idx < max_pigs; idx++)
main_loop();
- for (unsigned idx = 0; idx < max_pigs; idx++)
- {
- Pig& pig = pigs[idx];
- if ( pig.analyzer )
- pig.stop();
- }
+ delete pig_poke;
delete[] pigs;
pigs = nullptr;
int main_detach(lua_State* = nullptr);
#endif
+void main_poke(unsigned);
+
#endif
#include <chrono>
#include <thread>
+#include "main.h"
#include "snort.h"
#include "snort_debug.h"
#include "thread.h"
// analyzer
//-------------------------------------------------------------------------
+void Analyzer::set_state(State s)
+{
+ state = s;
+ main_poke(id);
+}
+
const char* Analyzer::get_state_string()
{
State s = get_state(); // can't use atomic in switch with optimization
Analyzer::Analyzer(unsigned i, const char* s)
{
- state = State::NEW;
- count = 0;
id = i;
source = s;
command = AC_NONE;
swap = nullptr;
daq_instance = nullptr;
privileged_start = false;
+ set_state(State::NEW);
}
void Analyzer::operator()(Swapper* ps)
{
set_thread_type(STHREAD_TYPE_PACKET);
-
set_instance_id(id);
+
ps->apply();
+ delete ps;
if (Snort::thread_init_privileged(source))
{
daq_instance = SFDAQ::get_local_instance();
privileged_start = daq_instance->can_start_unprivileged();
- state = State::INITIALIZED;
+ set_state(State::INITIALIZED);
analyze();
Snort::thread_term();
}
- delete ps;
- state = State::STOPPED;
+ set_state(State::STOPPED);
}
/* Note: This will be called from the main thread. Everything it does must be
switch ( ac )
{
case AC_START:
- if (state != State::INITIALIZED)
- {
- if (state != State::STARTED)
- ErrorMessage("Analyzer: Received START command while in state %s\n",
- get_state_string());
- command = AC_NONE;
- return false;
- }
+ assert(state == State::INITIALIZED);
+
if (!daq_instance->start())
{
ErrorMessage("Analyzer: Failed to start DAQ instance\n");
- command = AC_NONE;
return false;
}
- state = State::STARTED;
- DebugMessage(DEBUG_ANALYZER, "Handled START command\n");
command = AC_NONE;
+ set_state(State::STARTED);
+ DebugMessage(DEBUG_ANALYZER, "Handled START command\n");
break;
case AC_RUN:
- if (state != State::STARTED)
- {
- if (state != State::RUNNING)
- ErrorMessage("Analyzer: Received RUN command while in state %s\n",
- get_state_string());
- command = AC_NONE;
- return false;
- }
+ assert(state == State::STARTED);
Snort::thread_init_unprivileged();
- state = State::RUNNING;
- DebugMessage(DEBUG_ANALYZER, "Handled RUN command\n");
command = AC_NONE;
+ set_state(State::RUNNING);
+ DebugMessage(DEBUG_ANALYZER, "Handled RUN command\n");
break;
case AC_STOP:
- DebugMessage(DEBUG_ANALYZER, "Handled STOP command\n");
command = AC_NONE;
+ DebugMessage(DEBUG_ANALYZER, "Handled STOP command\n");
return false;
case AC_PAUSE:
+ command = AC_NONE;
if (state == State::RUNNING)
- state = State::PAUSED;
+ set_state(State::PAUSED);
else
ErrorMessage("Analyzer: Received PAUSE command while in state %s\n",
get_state_string());
- command = AC_NONE;
break;
case AC_RESUME:
+ command = AC_NONE;
if (state == State::PAUSED)
- state = State::RUNNING;
+ set_state(State::RUNNING);
else
ErrorMessage("Analyzer: Received RESUME command while in state %s\n",
get_state_string());
- command = AC_NONE;
break;
case AC_ROTATE:
case AC_SWAP:
if (swap)
- {
swap->apply();
- // do not clear swap in this thread; causes race cond
- }
+
+ // clear cmd only; swap ptr cleared by main thread
command = AC_NONE;
break;
- default:
- command = AC_NONE;
+ case AC_NONE:
break;
}
return true;
State get_state() { return state; }
const char* get_state_string();
- uint64_t get_count() { return count; }
const char* get_source() { return source; }
// FIXIT-M add asynchronous response too
private:
void analyze();
bool handle_command();
+ void set_state(State);
private:
std::atomic<State> state;
std::atomic<AnalyzerCommand> command;
std::atomic<bool> privileged_start;
- uint64_t count;
unsigned id;
const char* source;
void Snort::init(int argc, char** argv)
{
init_signals();
-
ThreadConfig::init();
#if defined(NOCOREFILE)
* double-freeing any memory. Not guaranteed to be
* thread-safe, but it will prevent the simple cases.
*/
- static int already_exiting = 0;
- if ( already_exiting != 0 )
- {
+ static bool already_exiting = false;
+ if ( already_exiting )
return;
- }
- already_exiting = 1;
- initializing = false; /* just in case we cut out early */
+ already_exiting = true;
+ initializing = false; // just in case we cut out early
+
+ term_signals();
IpsManager::global_term(snort_conf);
SFAT_Cleanup();
host_cache.clear();
{
init = new bool[ThreadConfig::get_instance_max()];
term = new bool[ThreadConfig::get_instance_max()];
+
for ( unsigned i = 0; i < ThreadConfig::get_instance_max(); ++i )
init[i] = term[i] = true;
}
if (err && err != DAQ_READFILE_EOF)
LogMessage("Can't acquire (%d) - %s\n", err, daq_get_error(daq_mod, daq_hand));
- if (s_error != DAQ_SUCCESS)
+ if (s_error != DAQ_SUCCESS) // FIXIT-L tsan race read pkt thread
{
err = s_error;
s_error = DAQ_SUCCESS;
bool SFDAQInstance::break_loop(int error)
{
- s_error = error;
+ s_error = error; // FIXIT-L tsan race write main thread
return (daq_breakloop(daq_mod, daq_hand) == DAQ_SUCCESS);
}