From: Jason Ish Date: Wed, 28 Mar 2012 21:21:00 +0000 (-0600) Subject: Update the ERF file runmodes to support autofp and single. X-Git-Tag: suricata-1.3beta1~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=90548837e3b1a82c7ec997870cbd03a3e63b1047;p=thirdparty%2Fsuricata.git Update the ERF file runmodes to support autofp and single. --- diff --git a/src/runmode-erf-file.c b/src/runmode-erf-file.c index 0b3c8420a9..b3ce2248a2 100644 --- a/src/runmode-erf-file.c +++ b/src/runmode-erf-file.c @@ -44,96 +44,100 @@ const char *RunModeErfFileGetDefaultMode(void) void RunModeErfFileRegister(void) { - default_mode = "auto"; - RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "auto", - "Multi threaded Erf File mode", - RunModeErfFileAuto); + default_mode = "autofp"; + + RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "single", + "Single threaded ERF file mode", + RunModeErfFileSingle); + + RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "autofp", + "Multi threaded ERF file mode. Packets from " + "each flow are assigned to a single detect thread", + RunModeErfFileAutoFp); return; } -int RunModeErfFileAuto(DetectEngineCtx *de_ctx) +int RunModeErfFileSingle(DetectEngineCtx *de_ctx) { - SCEnter(); - char tname[12]; - uint16_t cpu = 0; + int ret; + char *file; - /* Available cpus */ - uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); - - RunModeInitialize(); + SCEnter(); - char *file = NULL; if (ConfGet("erf-file.file", &file) == 0) { - SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-file.file " - "from Conf"); + SCLogError(SC_ERR_RUNMODE, "Failed to get erf-file.file from config."); exit(EXIT_FAILURE); } - SCLogDebug("file %s", file); + + RunModeInitialize(); + TimeModeSetOffline(); - /* create the threads */ - ThreadVars *tv_receiveerf = - TmThreadCreatePacketHandler("ReceiveErfFile", - "packetpool", "packetpool", - "pickup-queue", "simple", - "1slot"); - if (tv_receiveerf == NULL) { + /* Basically the same setup as PCAP files. */ + + ThreadVars *tv = TmThreadCreatePacketHandler("ErfFile", + "packetpool", "packetpool", + "packetpool", "packetpool", + "pktacqloop"); + if (tv == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); } + TmModule *tm_module = TmModuleGetByName("ReceiveErfFile"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_receiveerf, tm_module, file); + TmSlotSetFuncAppend(tv, tm_module, file); - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_receiveerf, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM); - } - - if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - - ThreadVars *tv_decode1 = - TmThreadCreatePacketHandler("Decode & Stream", - "pickup-queue", "simple", - "stream-queue1", "simple", - "varslot"); - if (tv_decode1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Decode1\n"); - exit(EXIT_FAILURE); - } tm_module = TmModuleGetByName("DecodeErfFile"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName DecodeErfFile failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + TmSlotSetFuncAppend(tv, tm_module, NULL); tm_module = TmModuleGetByName("StreamTcp"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName StreamTcp failed\n"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + TmSlotSetFuncAppend(tv, tm_module, NULL); - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_decode1, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM); + tm_module = TmModuleGetByName("Detect"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName Detect failed\n"); + exit(EXIT_FAILURE); } + TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx); - if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { + SetupOutputs(tv); + + if (TmThreadSpawn(tv) != TM_ECODE_OK) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); } + SCLogInfo("RunModeErfFileSingle initialised"); + + SCReturnInt(0); +} + +int RunModeErfFileAutoFp(DetectEngineCtx *de_ctx) +{ + SCEnter(); + char tname[12]; + char qname[12]; + uint16_t cpu = 0; + char queues[2048] = ""; + + RunModeInitialize(); + + /* Available cpus */ + uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); + /* start with cpu 1 so that if we're creating an odd number of detect * threads we're not creating the most on CPU0. */ if (ncpus > 0) @@ -147,21 +151,84 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx) thread_max = 1; int thread; + for (thread = 0; thread < thread_max; thread++) { + if (strlen(queues) > 0) + strlcat(queues, ",", sizeof(queues)); + + snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); + strlcat(queues, qname, sizeof(queues)); + } + SCLogDebug("queues %s", queues); + + char *file = NULL; + if (ConfGet("erf-file.file", &file) == 0) { + SCLogError(SC_ERR_RUNMODE, + "Failed retrieving erf-file.file from config"); + exit(EXIT_FAILURE); + } + + TimeModeSetOffline(); + + /* create the threads */ + ThreadVars *tv = + TmThreadCreatePacketHandler("ReceiveErfFile", + "packetpool", "packetpool", + queues, "flow", + "pktacqloop"); + if (tv == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + TmModule *tm_module = TmModuleGetByName("ReceiveErfFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv, tm_module, file); + + tm_module = TmModuleGetByName("DecodeErfFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodeErfFile failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv, tm_module, NULL); + + if (threading_set_cpu_affinity) { + TmThreadSetCPUAffinity(tv, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv, PRIO_MEDIUM); + } + + if (TmThreadSpawn(tv) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + for (thread = 0; thread < thread_max; thread++) { snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1); + snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1); + + SCLogDebug("tname %s, qname %s", tname, qname); char *thread_name = SCStrdup(tname); SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name, - "stream-queue1", "simple", - "alert-queue1", "simple", - "1slot"); + qname, "flow", + "packetpool", "packetpool", + "varslot"); if (tv_detect_ncpu == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); } + tm_module = TmModuleGetByName("StreamTcp"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + exit(EXIT_FAILURE); + } + TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL); + tm_module = TmModuleGetByName("Detect"); if (tm_module == NULL) { printf("ERROR: TmModuleGetByName Detect failed\n"); @@ -189,6 +256,9 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx) } tv_detect_ncpu->thread_group_name = thread_group_name; + /* add outputs as well */ + SetupOutputs(tv_detect_ncpu); + if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { printf("ERROR: TmThreadSpawn failed\n"); exit(EXIT_FAILURE); @@ -200,28 +270,7 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx) cpu++; } - ThreadVars *tv_outputs = - TmThreadCreatePacketHandler("Outputs", - "alert-queue1", "simple", - "packetpool", "packetpool", - "varslot"); - if (tv_outputs == NULL) { - printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n"); - exit(EXIT_FAILURE); - } - - SetupOutputs(tv_outputs); - - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_outputs, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM); - } - - if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } + SCLogInfo("RunModeErfFileAutoFp initialised"); - return 0; + SCReturnInt(0); } diff --git a/src/runmode-erf-file.h b/src/runmode-erf-file.h index b2455395eb..0612e9fa43 100644 --- a/src/runmode-erf-file.h +++ b/src/runmode-erf-file.h @@ -23,7 +23,8 @@ #ifndef __RUNMODE_ERF_FILE_H__ #define __RUNMODE_ERF_FILE_H__ -int RunModeErfFileAuto(DetectEngineCtx *); +int RunModeErfFileSingle(DetectEngineCtx *); +int RunModeErfFileAutoFp(DetectEngineCtx *); void RunModeErfFileRegister(void); const char *RunModeErfFileGetDefaultMode(void); diff --git a/src/source-erf-file.c b/src/source-erf-file.c index 7bb0dd26c8..f93afdfde4 100644 --- a/src/source-erf-file.c +++ b/src/source-erf-file.c @@ -52,14 +52,17 @@ typedef struct DagRecord_ { } __attribute__((packed)) DagRecord; typedef struct ErfFileThreadVars_ { - FILE *erf; ThreadVars *tv; + TmSlot *slot; + + FILE *erf; uint32_t pkts; uint64_t bytes; } ErfFileThreadVars; -TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +static inline TmEcode ReadErfRecord(ThreadVars *, Packet *, void *); +TmEcode ReceiveErfFileLoop(ThreadVars *, void *, void *); TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **); void ReceiveErfFileThreadExitStats(ThreadVars *, void *); TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *); @@ -75,7 +78,8 @@ TmModuleReceiveErfFileRegister(void) { tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile"; tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit; - tmm_modules[TMM_RECEIVEERFFILE].Func = ReceiveErfFile; + tmm_modules[TMM_RECEIVEERFFILE].Func = NULL; + tmm_modules[TMM_RECEIVEERFFILE].PktAcqLoop = ReceiveErfFileLoop; tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats = ReceiveErfFileThreadExitStats; tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL; @@ -100,13 +104,51 @@ TmModuleDecodeErfFileRegister(void) } /** - * \brief Thread entry function for ERF reading. - * - * Reads a new ERF record from the file and sets up the Packet for - * decoding. + * \brief ERF file reading loop. */ -TmEcode -ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) +TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot) +{ + ErfFileThreadVars *etv = (ErfFileThreadVars *)data; + etv->slot = ((TmSlot *)slot)->slot_next; + Packet *p; + uint16_t packet_q_len = 0; + + while (1) { + if (suricata_ctl_flags & SURICATA_STOP || + suricata_ctl_flags & SURICATA_KILL) { + SCReturnInt(TM_ECODE_OK); + } + + /* Make sure we have at least one packet in the packet pool, + * to prevent us from alloc'ing packets at line rate. */ + do { + packet_q_len = PacketPoolSize(); + if (unlikely(packet_q_len == 0)) { + PacketPoolWait(); + } + } while (packet_q_len == 0); + + p = PacketGetFromQueueOrAlloc(); + if (unlikely(p == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a packet."); + EngineStop(); + SCReturnInt(TM_ECODE_FAILED); + } + + if (ReadErfRecord(tv, p, data) != TM_ECODE_OK) { + TmqhOutputPacketpool(etv->tv, p); + EngineStop(); + SCReturnInt(TM_ECODE_FAILED); + } + + if (TmThreadsSlotProcessPkt(etv->tv, etv->slot, p) != TM_ECODE_OK) { + EngineStop(); + SCReturnInt(TM_ECODE_FAILED); + } + } +} + +static inline TmEcode ReadErfRecord(ThreadVars *tv, Packet *p, void *data) { SCEnter(); @@ -115,16 +157,24 @@ ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQue int r = fread(&dr, sizeof(DagRecord), 1, etv->erf); if (r < 1) { - SCLogInfo("End of ERF file reached or an error occurred."); - EngineStop(); + if (feof(etv->erf)) { + SCLogInfo("End of ERF file reached"); + } + else { + SCLogInfo("Error reading ERF record"); + } SCReturnInt(TM_ECODE_FAILED); } int rlen = ntohs(dr.rlen); int wlen = ntohs(dr.wlen); r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf); if (r < 1) { - SCLogInfo("End of ERF file reached or an error occurred."); - EngineStop(); + if (feof(etv->erf)) { + SCLogInfo("End of ERF file reached"); + } + else { + SCLogInfo("Error reading ERF record"); + } SCReturnInt(TM_ECODE_FAILED); }