void RunModeErfFileRegister(void)
{
- default_mode = "auto";
- RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "auto",
- "Multi threaded Erf File mode",
- RunModeErfFileAuto);
+ default_mode = "autofp";
+
+ RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "single",
+ "Single threaded ERF file mode",
+ RunModeErfFileSingle);
+
+ RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "autofp",
+ "Multi threaded ERF file mode. Packets from "
+ "each flow are assigned to a single detect thread",
+ RunModeErfFileAutoFp);
return;
}
-int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
+int RunModeErfFileSingle(DetectEngineCtx *de_ctx)
{
- SCEnter();
- char tname[12];
- uint16_t cpu = 0;
+ int ret;
+ char *file;
- /* Available cpus */
- uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
-
- RunModeInitialize();
+ SCEnter();
- char *file = NULL;
if (ConfGet("erf-file.file", &file) == 0) {
- SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-file.file "
- "from Conf");
+ SCLogError(SC_ERR_RUNMODE, "Failed to get erf-file.file from config.");
exit(EXIT_FAILURE);
}
- SCLogDebug("file %s", file);
+
+ RunModeInitialize();
+
TimeModeSetOffline();
- /* create the threads */
- ThreadVars *tv_receiveerf =
- TmThreadCreatePacketHandler("ReceiveErfFile",
- "packetpool", "packetpool",
- "pickup-queue", "simple",
- "1slot");
- if (tv_receiveerf == NULL) {
+ /* Basically the same setup as PCAP files. */
+
+ ThreadVars *tv = TmThreadCreatePacketHandler("ErfFile",
+ "packetpool", "packetpool",
+ "packetpool", "packetpool",
+ "pktacqloop");
+ if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
+
TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
exit(EXIT_FAILURE);
}
- TmSlotSetFuncAppend(tv_receiveerf, tm_module, file);
+ TmSlotSetFuncAppend(tv, tm_module, file);
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_receiveerf, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
- }
-
- if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
-
- ThreadVars *tv_decode1 =
- TmThreadCreatePacketHandler("Decode & Stream",
- "pickup-queue", "simple",
- "stream-queue1", "simple",
- "varslot");
- if (tv_decode1 == NULL) {
- printf("ERROR: TmThreadsCreate failed for Decode1\n");
- exit(EXIT_FAILURE);
- }
tm_module = TmModuleGetByName("DecodeErfFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
exit(EXIT_FAILURE);
}
- TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
+ TmSlotSetFuncAppend(tv, tm_module, NULL);
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
- TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
+ TmSlotSetFuncAppend(tv, tm_module, NULL);
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_decode1, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
+ tm_module = TmModuleGetByName("Detect");
+ if (tm_module == NULL) {
+ printf("ERROR: TmModuleGetByName Detect failed\n");
+ exit(EXIT_FAILURE);
}
+ TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
- if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
+ SetupOutputs(tv);
+
+ if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
+ SCLogInfo("RunModeErfFileSingle initialised");
+
+ SCReturnInt(0);
+}
+
+int RunModeErfFileAutoFp(DetectEngineCtx *de_ctx)
+{
+ SCEnter();
+ char tname[12];
+ char qname[12];
+ uint16_t cpu = 0;
+ char queues[2048] = "";
+
+ RunModeInitialize();
+
+ /* Available cpus */
+ uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
+
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
thread_max = 1;
int thread;
+ for (thread = 0; thread < thread_max; thread++) {
+ if (strlen(queues) > 0)
+ strlcat(queues, ",", sizeof(queues));
+
+ snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
+ strlcat(queues, qname, sizeof(queues));
+ }
+ SCLogDebug("queues %s", queues);
+
+ char *file = NULL;
+ if (ConfGet("erf-file.file", &file) == 0) {
+ SCLogError(SC_ERR_RUNMODE,
+ "Failed retrieving erf-file.file from config");
+ exit(EXIT_FAILURE);
+ }
+
+ TimeModeSetOffline();
+
+ /* create the threads */
+ ThreadVars *tv =
+ TmThreadCreatePacketHandler("ReceiveErfFile",
+ "packetpool", "packetpool",
+ queues, "flow",
+ "pktacqloop");
+ if (tv == NULL) {
+ printf("ERROR: TmThreadsCreate failed\n");
+ exit(EXIT_FAILURE);
+ }
+ TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
+ if (tm_module == NULL) {
+ printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
+ exit(EXIT_FAILURE);
+ }
+ TmSlotSetFuncAppend(tv, tm_module, file);
+
+ tm_module = TmModuleGetByName("DecodeErfFile");
+ if (tm_module == NULL) {
+ printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
+ exit(EXIT_FAILURE);
+ }
+ TmSlotSetFuncAppend(tv, tm_module, NULL);
+
+ if (threading_set_cpu_affinity) {
+ TmThreadSetCPUAffinity(tv, 0);
+ if (ncpus > 1)
+ TmThreadSetThreadPriority(tv, PRIO_MEDIUM);
+ }
+
+ if (TmThreadSpawn(tv) != TM_ECODE_OK) {
+ printf("ERROR: TmThreadSpawn failed\n");
+ exit(EXIT_FAILURE);
+ }
+
for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
+ snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
+
+ SCLogDebug("tname %s, qname %s", tname, qname);
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name,
- "stream-queue1", "simple",
- "alert-queue1", "simple",
- "1slot");
+ qname, "flow",
+ "packetpool", "packetpool",
+ "varslot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
+ tm_module = TmModuleGetByName("StreamTcp");
+ if (tm_module == NULL) {
+ printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+ exit(EXIT_FAILURE);
+ }
+ TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
+
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
}
tv_detect_ncpu->thread_group_name = thread_group_name;
+ /* add outputs as well */
+ SetupOutputs(tv_detect_ncpu);
+
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
cpu++;
}
- ThreadVars *tv_outputs =
- TmThreadCreatePacketHandler("Outputs",
- "alert-queue1", "simple",
- "packetpool", "packetpool",
- "varslot");
- if (tv_outputs == NULL) {
- printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
- exit(EXIT_FAILURE);
- }
-
- SetupOutputs(tv_outputs);
-
- if (threading_set_cpu_affinity) {
- TmThreadSetCPUAffinity(tv_outputs, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
- }
-
- if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
+ SCLogInfo("RunModeErfFileAutoFp initialised");
- return 0;
+ SCReturnInt(0);
}
} __attribute__((packed)) DagRecord;
typedef struct ErfFileThreadVars_ {
- FILE *erf;
ThreadVars *tv;
+ TmSlot *slot;
+
+ FILE *erf;
uint32_t pkts;
uint64_t bytes;
} ErfFileThreadVars;
-TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+static inline TmEcode ReadErfRecord(ThreadVars *, Packet *, void *);
+TmEcode ReceiveErfFileLoop(ThreadVars *, void *, void *);
TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **);
void ReceiveErfFileThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *);
{
tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile";
tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit;
- tmm_modules[TMM_RECEIVEERFFILE].Func = ReceiveErfFile;
+ tmm_modules[TMM_RECEIVEERFFILE].Func = NULL;
+ tmm_modules[TMM_RECEIVEERFFILE].PktAcqLoop = ReceiveErfFileLoop;
tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats =
ReceiveErfFileThreadExitStats;
tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL;
}
/**
- * \brief Thread entry function for ERF reading.
- *
- * Reads a new ERF record from the file and sets up the Packet for
- * decoding.
+ * \brief ERF file reading loop.
*/
-TmEcode
-ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
+TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
+{
+ ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
+ etv->slot = ((TmSlot *)slot)->slot_next;
+ Packet *p;
+ uint16_t packet_q_len = 0;
+
+ while (1) {
+ if (suricata_ctl_flags & SURICATA_STOP ||
+ suricata_ctl_flags & SURICATA_KILL) {
+ SCReturnInt(TM_ECODE_OK);
+ }
+
+ /* Make sure we have at least one packet in the packet pool,
+ * to prevent us from alloc'ing packets at line rate. */
+ do {
+ packet_q_len = PacketPoolSize();
+ if (unlikely(packet_q_len == 0)) {
+ PacketPoolWait();
+ }
+ } while (packet_q_len == 0);
+
+ p = PacketGetFromQueueOrAlloc();
+ if (unlikely(p == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a packet.");
+ EngineStop();
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (ReadErfRecord(tv, p, data) != TM_ECODE_OK) {
+ TmqhOutputPacketpool(etv->tv, p);
+ EngineStop();
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (TmThreadsSlotProcessPkt(etv->tv, etv->slot, p) != TM_ECODE_OK) {
+ EngineStop();
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ }
+}
+
+static inline TmEcode ReadErfRecord(ThreadVars *tv, Packet *p, void *data)
{
SCEnter();
int r = fread(&dr, sizeof(DagRecord), 1, etv->erf);
if (r < 1) {
- SCLogInfo("End of ERF file reached or an error occurred.");
- EngineStop();
+ if (feof(etv->erf)) {
+ SCLogInfo("End of ERF file reached");
+ }
+ else {
+ SCLogInfo("Error reading ERF record");
+ }
SCReturnInt(TM_ECODE_FAILED);
}
int rlen = ntohs(dr.rlen);
int wlen = ntohs(dr.wlen);
r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf);
if (r < 1) {
- SCLogInfo("End of ERF file reached or an error occurred.");
- EngineStop();
+ if (feof(etv->erf)) {
+ SCLogInfo("End of ERF file reached");
+ }
+ else {
+ SCLogInfo("Error reading ERF record");
+ }
SCReturnInt(TM_ECODE_FAILED);
}