]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Update the ERF file runmodes to support autofp and single.
authorJason Ish <jason.ish@endace.com>
Wed, 28 Mar 2012 21:21:00 +0000 (15:21 -0600)
committerVictor Julien <victor@inliniac.net>
Wed, 4 Apr 2012 07:44:48 +0000 (09:44 +0200)
src/runmode-erf-file.c
src/runmode-erf-file.h
src/source-erf-file.c

index 0b3c8420a9fcb19ae5152657a76e2911bc7855eb..b3ce2248a25a6730eb5885dbe6ea40c68fddaca9 100644 (file)
@@ -44,96 +44,100 @@ const char *RunModeErfFileGetDefaultMode(void)
 
 void RunModeErfFileRegister(void)
 {
-    default_mode = "auto";
-    RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "auto",
-                              "Multi threaded Erf File mode",
-                              RunModeErfFileAuto);
+    default_mode = "autofp";
+
+    RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "single",
+        "Single threaded ERF file mode",
+        RunModeErfFileSingle);
+
+    RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "autofp",
+        "Multi threaded ERF file mode.  Packets from "
+        "each flow are assigned to a single detect thread",
+        RunModeErfFileAutoFp);
 
     return;
 }
 
-int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
+int RunModeErfFileSingle(DetectEngineCtx *de_ctx)
 {
-    SCEnter();
-    char tname[12];
-    uint16_t cpu = 0;
+    int ret;
+    char *file;
 
-    /* Available cpus */
-    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
-
-    RunModeInitialize();
+    SCEnter();
 
-    char *file = NULL;
     if (ConfGet("erf-file.file", &file) == 0) {
-        SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-file.file "
-                   "from Conf");
+        SCLogError(SC_ERR_RUNMODE, "Failed to get erf-file.file from config.");
         exit(EXIT_FAILURE);
     }
-    SCLogDebug("file %s", file);
+
+    RunModeInitialize();
+
     TimeModeSetOffline();
 
-    /* create the threads */
-    ThreadVars *tv_receiveerf =
-        TmThreadCreatePacketHandler("ReceiveErfFile",
-                                    "packetpool", "packetpool",
-                                    "pickup-queue", "simple",
-                                    "1slot");
-    if (tv_receiveerf == NULL) {
+    /* Basically the same setup as PCAP files. */
+
+    ThreadVars *tv = TmThreadCreatePacketHandler("ErfFile",
+        "packetpool", "packetpool",
+        "packetpool", "packetpool",
+        "pktacqloop");
+    if (tv == NULL) {
         printf("ERROR: TmThreadsCreate failed\n");
         exit(EXIT_FAILURE);
     }
+
     TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
     if (tm_module == NULL) {
         printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
         exit(EXIT_FAILURE);
     }
-    TmSlotSetFuncAppend(tv_receiveerf, tm_module, file);
+    TmSlotSetFuncAppend(tv, tm_module, file);
 
-    if (threading_set_cpu_affinity) {
-        TmThreadSetCPUAffinity(tv_receiveerf, 0);
-        if (ncpus > 1)
-            TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
-    }
-
-    if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
-        printf("ERROR: TmThreadSpawn failed\n");
-        exit(EXIT_FAILURE);
-    }
-
-    ThreadVars *tv_decode1 =
-        TmThreadCreatePacketHandler("Decode & Stream",
-                                    "pickup-queue", "simple",
-                                    "stream-queue1", "simple",
-                                    "varslot");
-    if (tv_decode1 == NULL) {
-        printf("ERROR: TmThreadsCreate failed for Decode1\n");
-        exit(EXIT_FAILURE);
-    }
     tm_module = TmModuleGetByName("DecodeErfFile");
     if (tm_module == NULL) {
         printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
         exit(EXIT_FAILURE);
     }
-    TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
+    TmSlotSetFuncAppend(tv, tm_module, NULL);
 
     tm_module = TmModuleGetByName("StreamTcp");
     if (tm_module == NULL) {
         printf("ERROR: TmModuleGetByName StreamTcp failed\n");
         exit(EXIT_FAILURE);
     }
-    TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
+    TmSlotSetFuncAppend(tv, tm_module, NULL);
 
-    if (threading_set_cpu_affinity) {
-        TmThreadSetCPUAffinity(tv_decode1, 0);
-        if (ncpus > 1)
-            TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
+    tm_module = TmModuleGetByName("Detect");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName Detect failed\n");
+        exit(EXIT_FAILURE);
     }
+    TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
 
-    if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
+    SetupOutputs(tv);
+
+    if (TmThreadSpawn(tv) != TM_ECODE_OK) {
         printf("ERROR: TmThreadSpawn failed\n");
         exit(EXIT_FAILURE);
     }
 
+    SCLogInfo("RunModeErfFileSingle initialised");
+
+    SCReturnInt(0);
+}
+
+int RunModeErfFileAutoFp(DetectEngineCtx *de_ctx)
+{
+    SCEnter();
+    char tname[12];
+    char qname[12];
+    uint16_t cpu = 0;
+    char queues[2048] = "";
+
+    RunModeInitialize();
+
+    /* Available cpus */
+    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
+
     /* start with cpu 1 so that if we're creating an odd number of detect
      * threads we're not creating the most on CPU0. */
     if (ncpus > 0)
@@ -147,21 +151,84 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
         thread_max = 1;
 
     int thread;
+    for (thread = 0; thread < thread_max; thread++) {
+        if (strlen(queues) > 0)
+            strlcat(queues, ",", sizeof(queues));
+
+        snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
+        strlcat(queues, qname, sizeof(queues));
+    }
+    SCLogDebug("queues %s", queues);
+
+    char *file = NULL;
+    if (ConfGet("erf-file.file", &file) == 0) {
+        SCLogError(SC_ERR_RUNMODE,
+            "Failed retrieving erf-file.file from config");
+        exit(EXIT_FAILURE);
+    }
+
+    TimeModeSetOffline();
+
+    /* create the threads */
+    ThreadVars *tv =
+        TmThreadCreatePacketHandler("ReceiveErfFile",
+                                    "packetpool", "packetpool",
+                                    queues, "flow",
+                                    "pktacqloop");
+    if (tv == NULL) {
+        printf("ERROR: TmThreadsCreate failed\n");
+        exit(EXIT_FAILURE);
+    }
+    TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
+        exit(EXIT_FAILURE);
+    }
+    TmSlotSetFuncAppend(tv, tm_module, file);
+
+    tm_module = TmModuleGetByName("DecodeErfFile");
+    if (tm_module == NULL) {
+        printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
+        exit(EXIT_FAILURE);
+    }
+    TmSlotSetFuncAppend(tv, tm_module, NULL);
+
+    if (threading_set_cpu_affinity) {
+        TmThreadSetCPUAffinity(tv, 0);
+        if (ncpus > 1)
+            TmThreadSetThreadPriority(tv, PRIO_MEDIUM);
+    }
+
+    if (TmThreadSpawn(tv) != TM_ECODE_OK) {
+        printf("ERROR: TmThreadSpawn failed\n");
+        exit(EXIT_FAILURE);
+    }
+
     for (thread = 0; thread < thread_max; thread++) {
         snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
+        snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
+
+        SCLogDebug("tname %s, qname %s", tname, qname);
 
         char *thread_name = SCStrdup(tname);
         SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
 
         ThreadVars *tv_detect_ncpu =
             TmThreadCreatePacketHandler(thread_name,
-                                        "stream-queue1", "simple",
-                                        "alert-queue1", "simple",
-                                        "1slot");
+                                        qname, "flow",
+                                        "packetpool", "packetpool",
+                                        "varslot");
         if (tv_detect_ncpu == NULL) {
             printf("ERROR: TmThreadsCreate failed\n");
             exit(EXIT_FAILURE);
         }
+        tm_module = TmModuleGetByName("StreamTcp");
+        if (tm_module == NULL) {
+            printf("ERROR: TmModuleGetByName StreamTcp failed\n");
+            exit(EXIT_FAILURE);
+        }
+        TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
+
         tm_module = TmModuleGetByName("Detect");
         if (tm_module == NULL) {
             printf("ERROR: TmModuleGetByName Detect failed\n");
@@ -189,6 +256,9 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
         }
         tv_detect_ncpu->thread_group_name = thread_group_name;
 
+        /* add outputs as well */
+        SetupOutputs(tv_detect_ncpu);
+
         if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
             printf("ERROR: TmThreadSpawn failed\n");
             exit(EXIT_FAILURE);
@@ -200,28 +270,7 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
             cpu++;
     }
 
-    ThreadVars *tv_outputs =
-        TmThreadCreatePacketHandler("Outputs",
-                                    "alert-queue1", "simple",
-                                    "packetpool", "packetpool",
-                                    "varslot");
-    if (tv_outputs == NULL) {
-        printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
-        exit(EXIT_FAILURE);
-    }
-
-    SetupOutputs(tv_outputs);
-
-    if (threading_set_cpu_affinity) {
-        TmThreadSetCPUAffinity(tv_outputs, 0);
-        if (ncpus > 1)
-            TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
-    }
-
-    if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
-        printf("ERROR: TmThreadSpawn failed\n");
-        exit(EXIT_FAILURE);
-    }
+    SCLogInfo("RunModeErfFileAutoFp initialised");
 
-    return 0;
+    SCReturnInt(0);
 }
index b2455395ebb6ddfcf2e8baa00fee22c90a7af48c..0612e9fa436cbe169095b8b00a704af704aea94d 100644 (file)
@@ -23,7 +23,8 @@
 #ifndef __RUNMODE_ERF_FILE_H__
 #define __RUNMODE_ERF_FILE_H__
 
-int RunModeErfFileAuto(DetectEngineCtx *);
+int RunModeErfFileSingle(DetectEngineCtx *);
+int RunModeErfFileAutoFp(DetectEngineCtx *);
 void RunModeErfFileRegister(void);
 const char *RunModeErfFileGetDefaultMode(void);
 
index 7bb0dd26c80e4e0d6fcb5970ff6d0087e0cdabdc..f93afdfde4687a2e3ab9162ee6167e9081edaa11 100644 (file)
@@ -52,14 +52,17 @@ typedef struct DagRecord_ {
 } __attribute__((packed)) DagRecord;
 
 typedef struct ErfFileThreadVars_ {
-    FILE *erf;
     ThreadVars *tv;
+    TmSlot *slot;
+
+    FILE *erf;
 
     uint32_t pkts;
     uint64_t bytes;
 } ErfFileThreadVars;
 
-TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
+static inline TmEcode ReadErfRecord(ThreadVars *, Packet *, void *);
+TmEcode ReceiveErfFileLoop(ThreadVars *, void *, void *);
 TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **);
 void ReceiveErfFileThreadExitStats(ThreadVars *, void *);
 TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *);
@@ -75,7 +78,8 @@ TmModuleReceiveErfFileRegister(void)
 {
     tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile";
     tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit;
-    tmm_modules[TMM_RECEIVEERFFILE].Func = ReceiveErfFile;
+    tmm_modules[TMM_RECEIVEERFFILE].Func = NULL;
+    tmm_modules[TMM_RECEIVEERFFILE].PktAcqLoop = ReceiveErfFileLoop;
     tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats =
         ReceiveErfFileThreadExitStats;
     tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL;
@@ -100,13 +104,51 @@ TmModuleDecodeErfFileRegister(void)
 }
 
 /**
- * \brief Thread entry function for ERF reading.
- *
- * Reads a new ERF record from the file and sets up the Packet for
- * decoding.
+ * \brief ERF file reading loop.
  */
-TmEcode
-ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
+TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
+{
+    ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
+    etv->slot = ((TmSlot *)slot)->slot_next;
+    Packet *p;
+    uint16_t packet_q_len = 0;
+
+    while (1) {
+        if (suricata_ctl_flags & SURICATA_STOP ||
+            suricata_ctl_flags & SURICATA_KILL) {
+            SCReturnInt(TM_ECODE_OK);
+        }
+
+        /* Make sure we have at least one packet in the packet pool,
+         * to prevent us from alloc'ing packets at line rate. */
+        do {
+            packet_q_len = PacketPoolSize();
+            if (unlikely(packet_q_len == 0)) {
+                PacketPoolWait();
+            }
+        } while (packet_q_len == 0);
+
+        p = PacketGetFromQueueOrAlloc();
+        if (unlikely(p == NULL)) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a packet.");
+            EngineStop();
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+
+        if (ReadErfRecord(tv, p, data) != TM_ECODE_OK) {
+            TmqhOutputPacketpool(etv->tv, p);
+            EngineStop();
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+
+        if (TmThreadsSlotProcessPkt(etv->tv, etv->slot, p) != TM_ECODE_OK) {
+            EngineStop();
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+    }
+}
+
+static inline TmEcode ReadErfRecord(ThreadVars *tv, Packet *p, void *data)
 {
     SCEnter();
 
@@ -115,16 +157,24 @@ ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQue
 
     int r = fread(&dr, sizeof(DagRecord), 1, etv->erf);
     if (r < 1) {
-        SCLogInfo("End of ERF file reached or an error occurred.");
-        EngineStop();
+        if (feof(etv->erf)) {
+            SCLogInfo("End of ERF file reached");
+        }
+        else {
+            SCLogInfo("Error reading ERF record");
+        }
         SCReturnInt(TM_ECODE_FAILED);
     }
     int rlen = ntohs(dr.rlen);
     int wlen = ntohs(dr.wlen);
     r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf);
     if (r < 1) {
-        SCLogInfo("End of ERF file reached or an error occurred.");
-        EngineStop();
+        if (feof(etv->erf)) {
+            SCLogInfo("End of ERF file reached");
+        }
+        else {
+            SCLogInfo("Error reading ERF record");
+        }
         SCReturnInt(TM_ECODE_FAILED);
     }