From: Jason Ish Date: Mon, 26 Mar 2012 17:58:24 +0000 (-0400) Subject: Implement single, autofp and workers run modes for DAG interfaces. Includes multiple... X-Git-Tag: suricata-1.3beta1~36 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=105173939b09c115a353b0e811578b20d64fddb9;p=thirdparty%2Fsuricata.git Implement single, autofp and workers run modes for DAG interfaces. Includes multiple interface support. Remove auto mode due to bad performance. --- diff --git a/src/runmode-erf-dag.c b/src/runmode-erf-dag.c index 8f0ad528ba..8d87d5e099 100644 --- a/src/runmode-erf-dag.c +++ b/src/runmode-erf-dag.c @@ -34,9 +34,20 @@ #include "util-time.h" #include "util-cpu.h" #include "util-affinity.h" +#include "util-runmodes.h" static const char *default_mode; +static int DagConfigGetThreadCount(void *conf) +{ + return 1; +} + +static void *ParseDagConfig(const char *iface) +{ + return (void *)iface; +} + const char *RunModeErfDagGetDefaultMode(void) { return default_mode; @@ -44,184 +55,105 @@ const char *RunModeErfDagGetDefaultMode(void) void RunModeErfDagRegister(void) { - default_mode = "auto"; - RunModeRegisterNewRunMode(RUNMODE_DAG, "auto", - "Multi threaded Erf dag mode", - RunModeErfDagAuto); + default_mode = "autofp"; + + RunModeRegisterNewRunMode(RUNMODE_DAG, "autofp", + "Multi threaded DAG mode. Packets from " + "each flow are assigned to a single detect " + "thread, unlike \"dag_auto\" where packets " + "from the same flow can be processed by any " + "detect thread", + RunModeIdsErfDagAutoFp); + + RunModeRegisterNewRunMode(RUNMODE_DAG, "single", + "Singled threaded DAG mode", + RunModeIdsErfDagSingle); + + RunModeRegisterNewRunMode(RUNMODE_DAG, "workers", + "Workers DAG mode, each thread does all " + " tasks from acquisition to logging", + RunModeIdsErfDagWorkers); return; } -/** - * - * \brief Sets up support for reading from a DAG card. - * - * \param de_ctx - * \param file - * \notes Currently only supports a single interface. - */ -int RunModeErfDagAuto(DetectEngineCtx *de_ctx) +int RunModeIdsErfDagSingle(DetectEngineCtx *de_ctx) { - SCEnter(); - char tname[12]; - uint16_t cpu = 0; + int ret; - /* Available cpus */ - uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); + SCEnter(); RunModeInitialize(); - char *iface = NULL; - if (ConfGet("erf-dag.iface", &iface) == 0) { - SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-dag.iface from Conf"); + TimeModeSetLive(); + + ret = RunModeSetLiveCaptureSingle(de_ctx, + ParseDagConfig, + DagConfigGetThreadCount, + "ReceiveErfDag", + "DecodeErfDag", + "RxDAG", + NULL); + if (ret != 0) { + SCLogError(SC_ERR_RUNMODE, "DAG single runmode failed to start"); exit(EXIT_FAILURE); } - SCLogDebug("iface %s", iface); - TimeModeSetOffline(); + SCLogInfo("RunModeIdsDagSingle initialised"); - /* @TODO/JNM: We need to create a separate processing pipeliine for each - * interface supported by the - */ - - ThreadVars *tv_receiveerf = - TmThreadCreatePacketHandler("ReceiveErfDag", - "packetpool","packetpool", - "pickup-queue","simple", - "1slot"); - if (tv_receiveerf == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(EXIT_FAILURE); - } - TmModule *tm_module = TmModuleGetByName("ReceiveErfDag"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_receiveerf, tm_module, iface); + SCReturnInt(0); +} - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_receiveerf, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM); - } +int RunModeIdsErfDagAutoFp(DetectEngineCtx *de_ctx) +{ + int ret; - if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } + SCEnter(); - ThreadVars *tv_decode1 = - TmThreadCreatePacketHandler("Decode & Stream", - "pickup-queue","simple", - "stream-queue1","simple", - "varslot"); - if (tv_decode1 == NULL) { - printf("ERROR: TmThreadsCreate failed for Decode1\n"); - exit(EXIT_FAILURE); - } - tm_module = TmModuleGetByName("DecodeErfDag"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName DecodeErfDag failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); + RunModeInitialize(); - tm_module = TmModuleGetByName("StreamTcp"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + TimeModeSetLive(); + + ret = RunModeSetLiveCaptureAutoFp(de_ctx, + ParseDagConfig, + DagConfigGetThreadCount, + "ReceiveErfDag", + "DecodeErfDag", + "RxDAG", + NULL); + if (ret != 0) { + SCLogError(SC_ERR_RUNMODE, "DAG autofp runmode failed to start"); exit(EXIT_FAILURE); } - TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_decode1, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM); - } + SCLogInfo("RunModeIdsDagAutoFp initialised"); - if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - - /* start with cpu 1 so that if we're creating an odd number of detect - * threads we're not creating the most on CPU0. */ - if (ncpus > 0) - cpu = 1; - - /* always create at least one thread */ - int thread_max = TmThreadGetNbThreads(DETECT_CPU_SET); - if (thread_max == 0) - thread_max = ncpus * threading_detect_ratio; - if (thread_max < 1) - thread_max = 1; - - int thread; - for (thread = 0; thread < thread_max; thread++) { - snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1); - - char *thread_name = SCStrdup(tname); - SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); - - ThreadVars *tv_detect_ncpu = - TmThreadCreatePacketHandler(thread_name, - "stream-queue1","simple", - "alert-queue1","simple", - "1slot"); - if (tv_detect_ncpu == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(EXIT_FAILURE); - } - tm_module = TmModuleGetByName("Detect"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName Detect failed\n"); - exit(EXIT_FAILURE); - } - TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx); - - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu); - - /* If we have more than one core/cpu, the first Detect thread - * (at cpu 0) will have less priority (higher 'nice' value) - * In this case we will set the thread priority to +10 (default is 0) - */ - if (cpu == 0 && ncpus > 1) { - TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW); - } else if (ncpus > 1) { - TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM); - } - } - - if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(EXIT_FAILURE); - } - } + SCReturnInt(0); +} - ThreadVars *tv_outputs = - TmThreadCreatePacketHandler("Outputs", - "alert-queue1", "simple", - "packetpool", "packetpool", - "varslot"); - if (tv_outputs == NULL) { - printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n"); - exit(EXIT_FAILURE); - } +int RunModeIdsErfDagWorkers(DetectEngineCtx *de_ctx) +{ + int ret; - SetupOutputs(tv_outputs); + SCEnter(); - if (threading_set_cpu_affinity) { - TmThreadSetCPUAffinity(tv_outputs, 0); - if (ncpus > 1) - TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM); - } + RunModeInitialize(); - if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); + TimeModeSetLive(); + + ret = RunModeSetLiveCaptureWorkers(de_ctx, + ParseDagConfig, + DagConfigGetThreadCount, + "ReceiveErfDag", + "DecodeErfDag", + "RxDAG", + NULL); + if (ret != 0) { + SCLogError(SC_ERR_RUNMODE, "DAG workers runmode failed to start"); exit(EXIT_FAILURE); } - return 0; + SCLogInfo("RunModeIdsErfDagWorkers initialised"); + + SCReturnInt(0); } diff --git a/src/runmode-erf-dag.h b/src/runmode-erf-dag.h index 254b498ef1..2a4b20ea52 100644 --- a/src/runmode-erf-dag.h +++ b/src/runmode-erf-dag.h @@ -23,7 +23,9 @@ #ifndef __RUNMODE_ERF_DAG_H__ #define __RUNMODE_ERF_DAG_H__ -int RunModeErfDagAuto(DetectEngineCtx *); +int RunModeIdsErfDagAutoFp(DetectEngineCtx *); +int RunModeIdsErfDagSingle(DetectEngineCtx *); +int RunModeIdsErfDagWorkers(DetectEngineCtx *); void RunModeErfDagRegister(void); const char *RunModeErfDagGetDefaultMode(void); diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c index 2b04cbacdd..98b1f2d359 100644 --- a/src/source-erf-dag.c +++ b/src/source-erf-dag.c @@ -19,7 +19,7 @@ * \file * * \author Endace Technology Limited. - * \author Jason MacLulich + * \author Jason MacLulich * * Support for reading ERF records from a DAG card. * @@ -78,6 +78,8 @@ extern uint8_t suricata_ctl_flags; typedef struct ErfDagThreadVars_ { ThreadVars *tv; + TmSlot *slot; + int dagfd; int dagstream; char dagname[DAGNAME_BUFSIZE]; @@ -100,12 +102,12 @@ typedef struct ErfDagThreadVars_ { } ErfDagThreadVars; -TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot); TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **); void ReceiveErfDagThreadExitStats(ThreadVars *, void *); TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *); -TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top, - PacketQueue *postpq, uint32_t *pkts_read); +TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top, + uint32_t *pkts_read); TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p); TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **); @@ -120,7 +122,8 @@ TmModuleReceiveErfDagRegister(void) { tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag"; tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit; - tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag; + tmm_modules[TMM_RECEIVEERFDAG].Func = NULL; + tmm_modules[TMM_RECEIVEERFDAG].PktAcqLoop = ReceiveErfDagLoop; tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats = ReceiveErfDagThreadExitStats; tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL; @@ -300,37 +303,21 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data) } /** - * \brief Thread entry function for reading ERF records from a DAG card. - * - * Reads a new ERF record the DAG input buffer and copies it to - * an internal Suricata packet buffer -- similar to the way the - * pcap packet handler works. + * \brief Receives packets from a DAG interface. * - * We create new packet structures using PacketGetFromQueueOrAlloc - * for each packet between the top and btm pointers except for - * the first packet for which a Packet buffer is provided - * from the packetpool. + * \param tv pointer to ThreadVars + * \param data pointer to ErfDagThreadVars + * \param slot slot containing task information * - * We always read up to dag_max_read_packets ERF packets from the - * DAG buffer, but we might read less. This differs from the - * ReceivePcap handler -- it will only read pkts up to a maximum - * of either the packetpool count or the pcap_max_read_packets. - * - * \param tv pointer to ThreadVars - * \param p data pointer - * \param data - * \param pq pointer to the PacketQueue (not used here) - * \param postpq - * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success. - * \note We also use the packetpool hack first used in the source-pcap - * handler so we don't keep producing packets without any dying. - * This implies that if we are in this situation we run the risk - * of dropping packets at the interface. + * \retval TM_ECODE_OK on success + * \retval TM_ECODE_FAILED on failure */ -TmEcode -ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, - PacketQueue *postpq) +TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) { + ErfDagThreadVars *dtv = (ErfDagThreadVars *)data; + TmSlot *s = (TmSlot *)slot; + dtv->slot = s->slot_next; + SCEnter(); uint16_t packet_q_len = 0; @@ -339,97 +326,90 @@ ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, uint8_t *top = NULL; uint32_t pkts_read = 0; - assert(p); - assert(pq); - assert(postpq); - - ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data; - - /* NOTE/JNM: Hack copied from source-pcap.c - * - * Make sure we have at least one packet in the packet pool, to - * prevent us from alloc'ing packets at line rate - */ - while (packet_q_len == 0) { - packet_q_len = PacketPoolSize(); - if (packet_q_len == 0) { - PacketPoolWait(); - } - } - - if (postpq == NULL) { - ewtn->dag_max_read_packets = 1; - } - - while(pkts_read == 0) + while (1) { - if (suricata_ctl_flags != 0) { - break; + if (suricata_ctl_flags & SURICATA_STOP || + suricata_ctl_flags & SURICATA_KILL) { + SCReturnInt(TM_ECODE_FAILED); } + /* Make sure we have at least one packet in the packet pool, + * to prevent us from alloc'ing packets at line rate. */ + do { + packet_q_len = PacketPoolSize(); + if (unlikely(packet_q_len == 0)) { + PacketPoolWait(); + } + } while (packet_q_len == 0); + /* NOTE/JNM: This might not work well if we start restricting the - * number of ERF records processed per call to a small number as - * the over head required here could exceed the time it takes to - * process a small number of ERF records. - * - * XXX/JNM: Possibly process the DAG stream buffer first if there - * are ERF packets or else call dag_advance_stream and then process - * the DAG stream buffer. - */ - top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm)); - - if (NULL == top) - { - if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) { - usleep(10 * 1000); - ewtn->btm = ewtn->top; + * number of ERF records processed per call to a small number as + * the over head required here could exceed the time it takes to + * process a small number of ERF records. + * + * XXX/JNM: Possibly process the DAG stream buffer first if there + * are ERF packets or else call dag_advance_stream and then process + * the DAG stream buffer. + */ + top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm)); + + if (NULL == top) + { + if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) { + usleep(10 * 1000); + dtv->btm = dtv->top; continue; - } - else { - SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, - "Failed to read from stream: %d, DAG: %s when using dag_advance_stream", - ewtn->dagstream, ewtn->dagname); - SCReturnInt(TM_ECODE_FAILED); - } - } + } + else { + SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, + "Failed to read from stream: %d, DAG: %s when using dag_advance_stream", + dtv->dagstream, dtv->dagname); + SCReturnInt(TM_ECODE_FAILED); + } + } - diff = top - ewtn->btm; - if (diff == 0) - { - continue; - } + diff = top - dtv->btm; + if (diff == 0) + { + continue; + } - assert(diff >= dag_record_size); + assert(diff >= dag_record_size); - err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read); + err = ProcessErfDagRecords(dtv, top, &pkts_read); if (err == TM_ECODE_FAILED) { - SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, - "Failed to read from stream: %d, DAG: %s", - ewtn->dagstream, ewtn->dagname); - ReceiveErfDagCloseStream(ewtn->dagfd, ewtn->dagstream); + SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, + "Failed to read from stream: %d, DAG: %s", + dtv->dagstream, dtv->dagname); + ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream); SCReturnInt(err); } } SCLogDebug("Read %d records from stream: %d, DAG: %s", - pkts_read, ewtn->dagstream, ewtn->dagname); + pkts_read, dtv->dagstream, dtv->dagname); if (suricata_ctl_flags != 0) { SCReturnInt(TM_ECODE_FAILED); } - SCReturnInt(err); + SCReturnInt(TM_ECODE_OK); } +/** + * \brief Process a chunk of records read from a DAG interface. + * + * This function takes a pointer to buffer read from the DAG interface + * and processes it individual records. + */ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, - Packet *p, uint8_t* top, - PacketQueue *postpq, uint32_t *pkts_read) { SCEnter(); + Packet *p; int err = 0; dag_record_t* dr = NULL; char *prec = NULL; @@ -458,8 +438,7 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, if ((top-(ewtn->btm)) < rlen) SCReturnInt(TM_ECODE_OK); - p = p ? p : PacketGetFromQueueOrAlloc(); - + p = PacketGetFromQueueOrAlloc(); if (p == NULL) { SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a Packet on stream: %d, DAG: %s", @@ -469,22 +448,19 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, err = ProcessErfDagRecord(ewtn, prec, p); - if (err != TM_ECODE_OK) + if (err != TM_ECODE_OK) { + TmqhOutputPacketpool(ewtn->tv, p); SCReturnInt(err); + } ewtn->btm += rlen; - /* XXX/JNM: Hack to get around the fact that the first Packet from - * Suricata is added explicitly by the Slot code and shouldn't go - * onto the post queue -- else it is added twice to the next queue. - */ - if (*pkts_read) { - PacketEnqueue(postpq, p); + err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p); + if (err != TM_ECODE_OK) { + return err; } (*pkts_read)++; - - p = NULL; } SCReturnInt(TM_ECODE_OK); @@ -530,9 +506,6 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p) */ PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)); - SCLogDebug("pktlen: %" PRIu32 " (pkt %02x, pkt data %02x)", - GET_PKT_LEN(p), *p, *GET_PKT_DATA(p)); - /* Convert ERF time to timeval - from libpcap. */ uint64_t ts = dr->ts; p->ts.tv_sec = ts >> 32; diff --git a/src/suricata.c b/src/suricata.c index 2befc8849f..afbe20f2d4 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -481,7 +481,7 @@ void usage(const char *progname) #endif /* HAVE_LIBCAP_NG */ printf("\t--erf-in : process an ERF file\n"); #ifdef HAVE_DAG - printf("\t--dag : process ERF records from 0,1,...,n DAG input streams\n"); + printf("\t--dag : process ERF records from DAG interface X, stream Y\n"); #endif #ifdef HAVE_NAPATECH printf("\t--napatech : run Napatech feeds using \n"); @@ -944,17 +944,22 @@ int main(int argc, char **argv) exit(EXIT_FAILURE); } } - else if (strcmp((long_opts[option_index]).name, "dag") == 0) { + else if (strcmp((long_opts[option_index]).name, "dag") == 0) { #ifdef HAVE_DAG - run_mode = RUNMODE_DAG; - if (ConfSet("erf-dag.iface", optarg, 0) != 1) { - fprintf(stderr, "ERROR: Failed to set erf_dag.iface\n"); + if (run_mode == RUNMODE_UNKNOWN) { + run_mode = RUNMODE_DAG; + } + else if (run_mode != RUNMODE_DAG) { + SCLogError(SC_ERR_MULTIPLE_RUN_MODE, + "more than one run mode has been specified"); + usage(argv[0]); exit(EXIT_FAILURE); } + LiveRegisterDevice(optarg); #else - SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required" + SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required" " to receieve packets using --dag."); - exit(EXIT_FAILURE); + exit(EXIT_FAILURE); #endif /* HAVE_DAG */ } else if (strcmp((long_opts[option_index]).name, "napatech") == 0) {