From 1f801d316c4cc951135c4afcda45b942ac14e10f Mon Sep 17 00:00:00 2001 From: Jason Ish Date: Thu, 29 Mar 2012 13:41:37 -0600 Subject: [PATCH] Apply changes recommended by Stephen Donnely of Endace: - Skip pad records. - Don't log error on EGAIN, just try again. - Skip over extension headers. - Check we have the full packet (skip partial packets) - Remove obsolete rlen check. Also remove max_pending_packets to process more packets per iteration. --- src/source-erf-dag.c | 210 +++++++++++++++++++++---------------------- 1 file changed, 102 insertions(+), 108 deletions(-) diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c index 98b1f2d359..a6c04c2440 100644 --- a/src/source-erf-dag.c +++ b/src/source-erf-dag.c @@ -68,8 +68,6 @@ TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data) #else /* Implied we do have DAG support */ -#define DAG_MAX_READ_PKTS 256 - #include "source-erf-dag.h" #include @@ -83,32 +81,26 @@ typedef struct ErfDagThreadVars_ { int dagfd; int dagstream; char dagname[DAGNAME_BUFSIZE]; - uint32_t dag_max_read_packets; struct timeval maxwait, poll; /* Could possibly be made static */ uint32_t pkts; uint64_t bytes; - /* Track current location in the DAG stream input buffer + /* Current location in the DAG stream input buffer. */ - uint8_t* top; /* We track top as well so we don't have to - call dag_advance_stream again if there - are still pkts to process. - - JNM: Currently not used. - */ - uint8_t* btm; + uint8_t *top; + uint8_t *btm; } ErfDagThreadVars; +static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top, + uint32_t *pkts_read); +static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec); TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot); TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **); void ReceiveErfDagThreadExitStats(ThreadVars *, void *); TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *); -TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top, - uint32_t *pkts_read); -TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p); TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **); TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); @@ -182,13 +174,6 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data) memset(ewtn, 0, sizeof(*ewtn)); - /* Use max_pending_packets as our maximum number of packets read - from the DAG buffer. - */ - ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ? - DAG_MAX_READ_PKTS : max_pending_packets; - - /* dag_parse_name will return a DAG device name and stream number * to open for this thread. */ @@ -277,9 +262,9 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data) * Initialise DAG Polling parameters. */ timerclear(&ewtn->maxwait); - ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */ + ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */ timerclear(&ewtn->poll); - ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */ + ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */ /* 32kB minimum data to return -- we still restrict the number of * pkts that are processed to a maximum of dag_max_read_packets. @@ -317,15 +302,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) ErfDagThreadVars *dtv = (ErfDagThreadVars *)data; TmSlot *s = (TmSlot *)slot; dtv->slot = s->slot_next; - - SCEnter(); - - uint16_t packet_q_len = 0; uint32_t diff = 0; int err; uint8_t *top = NULL; uint32_t pkts_read = 0; + SCEnter(); + while (1) { if (suricata_ctl_flags & SURICATA_STOP || @@ -333,31 +316,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_FAILED); } - /* Make sure we have at least one packet in the packet pool, - * to prevent us from alloc'ing packets at line rate. */ - do { - packet_q_len = PacketPoolSize(); - if (unlikely(packet_q_len == 0)) { - PacketPoolWait(); - } - } while (packet_q_len == 0); - - /* NOTE/JNM: This might not work well if we start restricting the - * number of ERF records processed per call to a small number as - * the over head required here could exceed the time it takes to - * process a small number of ERF records. - * - * XXX/JNM: Possibly process the DAG stream buffer first if there - * are ERF packets or else call dag_advance_stream and then process - * the DAG stream buffer. - */ top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm)); - - if (NULL == top) - { - if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) { - usleep(10 * 1000); - dtv->btm = dtv->top; + if (top == NULL) { + if (errno == EAGAIN) { + if (dtv->dagstream & 0x1) { + usleep(10 * 1000); + dtv->btm = dtv->top; + } continue; } else { @@ -369,8 +334,7 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) } diff = top - dtv->btm; - if (diff == 0) - { + if (diff == 0) { continue; } @@ -385,10 +349,10 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream); SCReturnInt(err); } - } - SCLogDebug("Read %d records from stream: %d, DAG: %s", - pkts_read, dtv->dagstream, dtv->dagname); + SCLogDebug("Read %d records from stream: %d, DAG: %s", + pkts_read, dtv->dagstream, dtv->dagname); + } if (suricata_ctl_flags != 0) { SCReturnInt(TM_ECODE_FAILED); @@ -403,61 +367,66 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) * This function takes a pointer to buffer read from the DAG interface * and processes it individual records. */ -TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, - uint8_t* top, - uint32_t *pkts_read) +static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top, + uint32_t *pkts_read) { SCEnter(); - Packet *p; - int err = 0; - dag_record_t* dr = NULL; - char *prec = NULL; - int rlen; + int err = 0; + dag_record_t *dr = NULL; + char *prec = NULL; + int rlen; + char hdr_type = 0; + int processed = 0; + int packet_q_len = 0; *pkts_read = 0; - while(((top-(ewtn->btm))>=dag_record_size) && - ((*pkts_read)<(ewtn->dag_max_read_packets))) - { - prec = (char*)ewtn->btm; - dr = (dag_record_t*)prec; + while (((top - ewtn->btm) >= dag_record_size) && + ((processed + dag_record_size) < 4*1024*1024)) { - rlen = ntohs(dr->rlen); + /* Make sure we have at least one packet in the packet pool, + * to prevent us from alloc'ing packets at line rate. */ + do { + packet_q_len = PacketPoolSize(); + if (unlikely(packet_q_len == 0)) { + PacketPoolWait(); + } + } while (packet_q_len == 0); - if (rlen == 20) { - rlen = 28; - SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED, - "Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s", - ewtn->dagstream, ewtn->dagname); - } + prec = (char *)ewtn->btm; + dr = (dag_record_t*)prec; + rlen = ntohs(dr->rlen); + hdr_type = dr->type; /* If we don't have enough data to finsih processing this ERF record * return and maybe next time we will. */ - if ((top-(ewtn->btm)) < rlen) + if ((top - ewtn->btm) < rlen) SCReturnInt(TM_ECODE_OK); - p = PacketGetFromQueueOrAlloc(); - if (p == NULL) { - SCLogError(SC_ERR_MEM_ALLOC, - "Failed to allocate a Packet on stream: %d, DAG: %s", - ewtn->dagstream, ewtn->dagname); - SCReturnInt(TM_ECODE_FAILED); - } - - err = ProcessErfDagRecord(ewtn, prec, p); + ewtn->btm += rlen; + processed += rlen; - if (err != TM_ECODE_OK) { - TmqhOutputPacketpool(ewtn->tv, p); - SCReturnInt(err); + /* Only support ethernet at this time. */ + switch (hdr_type & 0x7f) { + case TYPE_PAD: + /* Skip. */ + continue; + case TYPE_ETH: + case TYPE_DSM_COLOR_ETH: + case TYPE_COLOR_ETH: + case TYPE_COLOR_HASH_ETH: + break; + default: + SCLogError(SC_ERR_UNIMPLEMENTED, + "Processing of DAG record type: %d not implemented.", dr->type); + SCReturnInt(TM_ECODE_FAILED); } - ewtn->btm += rlen; - - err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p); + err = ProcessErfDagRecord(ewtn, prec); if (err != TM_ECODE_OK) { - return err; + SCReturnInt(TM_ECODE_FAILED); } (*pkts_read)++; @@ -471,32 +440,49 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, * \param prec pointer to a DAG record. * \param */ -TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p) +static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec) { SCEnter(); int wlen = 0; + int rlen = 0; + int hdr_num = 0; + char hdr_type = 0; dag_record_t *dr = (dag_record_t*)prec; erf_payload_t *pload; + Packet *p; - assert(prec); - assert(p); + hdr_type = dr->type; + wlen = ntohs(dr->wlen); + rlen = ntohs(dr->rlen); - if (p == NULL) SCReturnInt(TM_ECODE_OK); + /* count extension headers */ + while (hdr_type & 0x80) { + if (rlen < (dag_record_size + (hdr_num * 8))) { + SCLogError(SC_ERR_UNIMPLEMENTED, + "Insufficient captured packet length."); + SCReturnInt(TM_ECODE_FAILED); + } + hdr_type = prec[(dag_record_size + (hdr_num * 8))]; + hdr_num++; + } - /* Only support ethernet at this time. */ - if (dr->type != TYPE_ETH && - dr->type != TYPE_DSM_COLOR_ETH && - dr->type != TYPE_COLOR_ETH && - dr->type != TYPE_COLOR_HASH_ETH) { - SCLogError(SC_ERR_UNIMPLEMENTED, - "Processing of DAG record type: %d not implemented.", dr->type); - SCReturnInt(TM_ECODE_FAILED); + /* Check that the whole frame was captured */ + if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) { + SCLogInfo("Incomplete frame captured."); + SCReturnInt(TM_ECODE_OK); } - wlen = ntohs(dr->wlen); + /* skip over extension headers */ + pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num)); - pload = &(dr->rec); + p = PacketGetFromQueueOrAlloc(); + if (p == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, + "Failed to allocate a Packet on stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCReturnInt(TM_ECODE_FAILED); + } SET_PKT_LEN(p, wlen - 4); /* Trim the FCS... */ p->datalink = LINKTYPE_ETHERNET; @@ -504,7 +490,10 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p) /* Take into account for link type Ethernet ETH frame starts * after ther ERF header + pad. */ - PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)); + if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) { + TmqhOutputPacketpool(ewtn->tv, p); + SCReturnInt(TM_ECODE_FAILED); + } /* Convert ERF time to timeval - from libpcap. */ uint64_t ts = dr->ts; @@ -520,6 +509,11 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p) ewtn->pkts++; ewtn->bytes += wlen; + if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) { + TmqhOutputPacketpool(ewtn->tv, p); + SCReturnInt(TM_ECODE_FAILED); + } + SCReturnInt(TM_ECODE_OK); } -- 2.47.2