const char* msg;
};
-class ReloadMemcapManager
-{
-public:
- virtual ~ReloadMemcapManager() = default;
-
- virtual bool tune_memcap() = 0;
- virtual bool tune_memcap_idle() = 0;
-protected:
- ReloadMemcapManager() = default;
-};
-
class SO_PUBLIC Module
{
public:
void enable_trace();
- const ReloadMemcapManager* get_reload_mcm() const
- { return reload_mcm; }
-
protected:
Module(const char* name, const char* help);
Module(const char* name, const char* help, const Parameter*,
int table_level = 0;
Trace* trace;
- ReloadMemcapManager* reload_mcm = nullptr;
void set_peg_count(int index, PegCount value)
{
}
}
-void ACShellCmd::execute(Analyzer& analyzer)
+bool ACShellCmd::execute(Analyzer& analyzer, void** state)
{
ControlConn* control_conn = ControlMgmt::find_control(control_fd);
if( control_conn )
control_conn->send_queued_response();
- ac->execute(analyzer);
+ return ac->execute(analyzer, state);
}
ACShellCmd::~ACShellCmd()
public:
ACShellCmd() = delete;
ACShellCmd(int fd, snort::AnalyzerCommand* ac_cmd);
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return ac->stringify(); }
~ACShellCmd() override;
static MainHook_f main_hook = snort_ignore;
THREAD_LOCAL ProfileStats daqPerfStats;
-THREAD_LOCAL std::list<snort::ReloadMemcapManager *> *rel_managers;
-
static THREAD_LOCAL Analyzer* local_analyzer = nullptr;
//-------------------------------------------------------------------------
{
InspectorManager::thread_reinit(sc);
ActionManager::thread_reinit(sc);
- rel_managers = new std::list<snort::ReloadMemcapManager *>(sc->get_reload_memcap_managers());
}
void Analyzer::term()
Active::thread_term();
delete switcher;
- delete rel_managers;
-
sfthreshold_free();
RateFilter_Cleanup();
}
if (!ac)
return false;
- ac->execute(*this);
-
- add_command_to_completed_queue(ac);
+ void* ac_state = nullptr;
+ if ( ac->execute(*this, &ac_state) )
+ add_command_to_completed_queue(ac);
+ else
+ add_command_to_uncompleted_queue(ac, ac_state);
return true;
}
-void Analyzer::add_command_to_completed_queue(AnalyzerCommand *ac)
+void Analyzer::add_command_to_uncompleted_queue(AnalyzerCommand* aci, void* acs)
+{
+ UncompletedAnalyzerCommand* cac = new UncompletedAnalyzerCommand(aci, acs);
+
+ uncompleted_work_queue.push_back(cac);
+}
+
+void Analyzer::add_command_to_completed_queue(AnalyzerCommand* ac)
{
- if (ac->is_complete())
- {
completed_work_queue_mutex.lock();
completed_work_queue.push(ac);
completed_work_queue_mutex.unlock();
- } else
- cache_analyzer_command(ac);
}
void Analyzer::handle_commands()
;
}
+void Analyzer::handle_uncompleted_commands()
+{
+ std::list<UncompletedAnalyzerCommand*>::iterator it = uncompleted_work_queue.begin();
+ while (it != uncompleted_work_queue.end() )
+ {
+ UncompletedAnalyzerCommand* cac = *it;
+
+ if (cac->command->execute(*this, &cac->state) )
+ {
+ add_command_to_completed_queue(cac->command);
+ it = uncompleted_work_queue.erase(it);
+ delete cac;
+ }
+ else
+ ++it;
+ }
+}
+
DAQ_RecvStatus Analyzer::process_messages()
{
// Max receive becomes the minimum of the configured batch size, the remaining exit_after
process_daq_msg(msg, false);
DetectionEngine::onload();
process_retry_queue();
-
- if (rel_managers and rel_managers->size())
- {
- auto manager = rel_managers->front();
- if (manager->tune_memcap())
- {
- rel_managers->pop_front();
- }
- }
- else
- {
- if(ac)
- add_command_to_completed_queue(ac);
- }
-
+ handle_uncompleted_commands();
}
if (exit_after_cnt && (exit_after_cnt -= num_recv) == 0)
#include <daq_common.h>
#include <atomic>
+#include <list>
#include <mutex>
#include <queue>
#include <string>
-#include <list>
#include "thread.h"
namespace snort
{
class AnalyzerCommand;
+class ReloadResourceTuner;
class SFDAQInstance;
struct Packet;
struct SnortConfig;
struct ProfileStats;
-class ReloadMemcapManager;
}
typedef bool (* MainHook_f)(snort::Packet*);
+class UncompletedAnalyzerCommand
+{
+public:
+ UncompletedAnalyzerCommand(snort::AnalyzerCommand* ac, void* acs) : command(ac), state(acs)
+ { }
+
+ snort::AnalyzerCommand* command = nullptr;
+ void* state = nullptr;
+};
+
class Analyzer
{
public:
void analyze();
bool handle_command();
void handle_commands();
+ void handle_uncompleted_commands();
DAQ_RecvStatus process_messages();
void process_daq_msg(DAQ_Msg_h, bool retry);
void process_daq_pkt_msg(DAQ_Msg_h, bool retry);
void init_unprivileged();
void term();
void show_source();
- void cache_analyzer_command(snort::AnalyzerCommand* aci) { ac = aci; }
- void add_command_to_completed_queue(snort::AnalyzerCommand *ac);
- snort::AnalyzerCommand* get_analyzer_command() { return ac; }
+ void add_command_to_uncompleted_queue(snort::AnalyzerCommand*, void*);
+ void add_command_to_completed_queue(snort::AnalyzerCommand*);
+
public:
std::queue<snort::AnalyzerCommand*> completed_work_queue;
std::mutex completed_work_queue_mutex;
RetryQueue* retry_queue = nullptr;
OopsHandler* oops_handler = nullptr;
ContextSwitcher* switcher = nullptr;
- snort::AnalyzerCommand* ac = nullptr;
-
std::mutex pending_work_queue_mutex;
+ std::list<UncompletedAnalyzerCommand*> uncompleted_work_queue;
};
extern THREAD_LOCAL snort::ProfileStats daqPerfStats;
#include <cassert>
+#include "framework/module.h"
#include "log/messages.h"
#include "managers/module_manager.h"
#include "utils/stats.h"
#include "snort_config.h"
#include "swapper.h"
-void ACStart::execute(Analyzer& analyzer)
+bool ACStart::execute(Analyzer& analyzer, void**)
{
analyzer.start();
+ return true;
}
-void ACRun::execute(Analyzer& analyzer)
+bool ACRun::execute(Analyzer& analyzer, void**)
{
analyzer.run(paused);
paused = false;
+ return true;
}
-void ACStop::execute(Analyzer& analyzer)
+bool ACStop::execute(Analyzer& analyzer, void**)
{
analyzer.stop();
+ return true;
}
-void ACPause::execute(Analyzer& analyzer)
+bool ACPause::execute(Analyzer& analyzer, void**)
{
analyzer.pause();
+ return true;
}
-void ACResume::execute(Analyzer& analyzer)
+bool ACResume::execute(Analyzer& analyzer, void**)
{
analyzer.resume(msg_cnt);
+ return true;
}
-void ACRotate::execute(Analyzer& analyzer)
+bool ACRotate::execute(Analyzer& analyzer, void**)
{
analyzer.rotate();
+ return true;
}
-void ACGetStats::execute(Analyzer&)
+bool ACGetStats::execute(Analyzer&, void**)
{
// FIXIT-P This incurs locking on all threads to retrieve stats. It
// could be reimplemented to optimize for large thread counts by
// retrieving stats in the command and accumulating in the main thread.
snort::ModuleManager::accumulate(snort::SnortConfig::get_conf());
+ return true;
}
ACGetStats::~ACGetStats()
Swapper::set_reload_in_progress(true);
}
-void ACSwap::execute(Analyzer& analyzer)
+bool ACSwap::execute(Analyzer& analyzer, void** ac_state)
{
if (ps)
+ {
ps->apply(analyzer);
+
+ snort::SnortConfig* sc = ps->get_new_conf();
+ if ( sc )
+ {
+ std::list<snort::ReloadResourceTuner*>* reload_tuners;
+
+ if ( !*ac_state )
+ {
+ reload_tuners = new std::list<snort::ReloadResourceTuner*>(sc->get_reload_resource_tuners());
+ *ac_state = reload_tuners;
+ }
+ else
+ reload_tuners = (std::list<snort::ReloadResourceTuner*>*)*ac_state;
+
+ if ( !reload_tuners->empty() )
+ {
+ auto rrt = reload_tuners->front();
+ if (rrt->tune_resources())
+ reload_tuners->pop_front();
+ }
+
+ // check for empty again and free list instance if we are done
+ if ( reload_tuners->empty() )
+ {
+ delete reload_tuners;
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ return true;
}
ACSwap::~ACSwap()
{
+ if (ps)
+ {
+ snort::SnortConfig* sc = ps->get_new_conf();
+ if ( sc )
+ sc->clear_reload_resource_tuner_list();
+ }
delete ps;
Swapper::set_reload_in_progress(false);
snort::LogMessage("== reload complete\n");
request->respond("== reload complete\n", from_shell, true);
}
-void ACDAQSwap::execute(Analyzer& analyzer)
+bool ACDAQSwap::execute(Analyzer& analyzer, void**)
{
analyzer.reload_daq();
+ return true;
}
ACDAQSwap::~ACDAQSwap()
{
public:
virtual ~AnalyzerCommand() = default;
- virtual void execute(Analyzer&) = 0;
+ virtual bool execute(Analyzer&, void**) = 0;
virtual const char* stringify() = 0;
unsigned get() { return ++ref_count; }
unsigned put() { return --ref_count; }
- bool is_complete() { return completion_status; }
- void set_completion_status(bool status) { completion_status = status; }
SO_PUBLIC static snort::SFDAQInstance* get_daq_instance(Analyzer& analyzer);
+
private:
unsigned ref_count = 0;
- bool completion_status = true;
};
}
class ACGetStats : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "GET_STATS"; }
~ACGetStats() override;
};
class ACPause : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "PAUSE"; }
};
{
public:
ACResume(uint64_t msg_cnt): msg_cnt(msg_cnt) { }
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "RESUME"; }
private:
uint64_t msg_cnt;
class ACRotate : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "ROTATE"; }
};
public:
ACRun() = delete;
ACRun(bool is_paused = false ) { paused = is_paused; }
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "RUN"; }
private:
bool paused = false;
class ACStart : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "START"; }
};
class ACStop : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "STOP"; }
};
public:
ACSwap() = delete;
ACSwap(Swapper* ps, Request* req, bool from_shell);
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "SWAP"; }
~ACSwap() override;
private:
class ACDAQSwap : public snort::AnalyzerCommand
{
public:
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "DAQ_SWAP"; }
~ACDAQSwap() override;
};
delete daq_config;
delete proto_ref;
- reload_managers.clear();
+ reload_tuners.clear();
trim_heap();
}
set_policies(sc, sh);
}
}
-
-SO_PUBLIC bool SnortConfig::register_reload_memcap_manager(ReloadMemcapManager *memcap_manager)
-{
- reload_managers.push_back(memcap_manager);
- return true;
-}
-
-std::list<ReloadMemcapManager *> SnortConfig::get_reload_memcap_managers()
-{
- return reload_managers;
-}
#include <sys/types.h>
+#include <list>
+
#include "events/event_queue.h"
#include "framework/bits.h"
#include "main/policy.h"
#include "main/thread.h"
#include "sfip/sf_cidr.h"
-#include <list>
-
#define DEFAULT_LOG_DIR "."
enum RunFlag
namespace snort
{
-struct ProfilerConfig;
class ProtocolReference;
+class ReloadResourceTuner;
+struct ProfilerConfig;
struct GHash;
struct XHash;
-
-class ReloadMemcapManager;
-
struct SnortConfig;
+
typedef void (* ScScratchFunc)(SnortConfig* sc);
+class ReloadResourceTuner
+{
+public:
+ static const unsigned RELOAD_MAX_WORK_PER_PACKET = 3;
+ static const unsigned RELOAD_MAX_WORK_WHEN_IDLE = 10;
+
+ virtual ~ReloadResourceTuner() = default;
+
+ virtual bool tune_resources() = 0;
+ virtual bool tune_resources_idle() = 0;
+
+protected:
+ ReloadResourceTuner() = default;
+
+ unsigned max_work = RELOAD_MAX_WORK_PER_PACKET;
+ unsigned max_work_idle = RELOAD_MAX_WORK_WHEN_IDLE;
+};
+
struct SnortConfig
{
private:
- std::list<ReloadMemcapManager *> reload_managers;
-
void init(const SnortConfig* const, ProtocolReference*);
bool verify_stream_inspectors();
SnortConfig(const SnortConfig&) = delete;
- SO_PUBLIC bool register_reload_memcap_manager(ReloadMemcapManager *);
- std::list<ReloadMemcapManager *> get_reload_memcap_managers();
-
void setup();
void post_setup();
bool verify();
bool cloned = false;
+private:
+ std::list<ReloadResourceTuner*> reload_tuners;
+
+public:
//------------------------------------------------------
// decoding related
uint8_t get_num_layers() const
static void set_conf(SnortConfig*);
SO_PUBLIC static SnortConfig* get_conf();
+
+ SO_PUBLIC void register_reload_resource_tuner(ReloadResourceTuner& rrt)
+ { reload_tuners.push_back(&rrt); }
+
+ const std::list<ReloadResourceTuner*>& get_reload_resource_tuners() const
+ { return reload_tuners; }
+
+ void clear_reload_resource_tuner_list()
+ { reload_tuners.clear(); }
};
}
~Swapper();
void apply(Analyzer&);
+ snort::SnortConfig* get_new_conf() { return new_conf; }
static bool get_reload_in_progress() { return reload_in_progress; }
static void set_reload_in_progress(bool rip) { reload_in_progress = rip; }
{
public:
AcAppIdDebug(AppIdDebugSessionConstraints* cs);
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "APPID_DEBUG"; }
private:
}
}
-void AcAppIdDebug::execute(Analyzer&)
+bool AcAppIdDebug::execute(Analyzer&, void**)
{
if (appidDebug)
{
appidDebug->set_constraints("appid", nullptr);
}
// FIXIT-L Add a warning if command was called without appid configured?
+
+ return true;
}
static int enable_debug(lua_State* L)
{
public:
PacketCaptureDebug(const char* f);
- void execute(Analyzer&) override;
+ bool execute(Analyzer&, void**) override;
const char* stringify() override { return "PACKET_CAPTURE_DEBUG"; }
private:
bool enable = false;
}
}
-void PacketCaptureDebug::execute(Analyzer&)
+bool PacketCaptureDebug::execute(Analyzer&, void**)
{
if (enable)
packet_capture_enable(filter);
else
packet_capture_disable();
+
+ return true;
}
CaptureModule::CaptureModule() :
class PacketTracerDebug : public AnalyzerCommand
{
public:
- PacketTracerDebug(PTSessionConstraints *cs);
- void execute(Analyzer &) override;
+ PacketTracerDebug(PTSessionConstraints* cs);
+ bool execute(Analyzer&, void**) override;
const char *stringify() override { return "PACKET_TRACER_DEBUG"; }
private:
bool enable = false;
};
-PacketTracerDebug::PacketTracerDebug(PTSessionConstraints *cs)
+PacketTracerDebug::PacketTracerDebug(PTSessionConstraints* cs)
{
if (cs)
{
}
}
-void PacketTracerDebug::execute(Analyzer &)
+bool PacketTracerDebug::execute(Analyzer&, void**)
{
if (enable)
PacketTracer::set_constraints(&constraints);
else
PacketTracer::set_constraints(nullptr);
+
+ return true;
}
static int enable(lua_State* L)
add_subdirectory(test)
set (STREAM_INCLUDES
- flush_bucket.h
paf.h
stream.h
stream_splitter.h