]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Apply changes recommended by Stephen Donnely of Endace: - Skip pad records. - Don...
authorJason Ish <jason.ish@endace.com>
Thu, 29 Mar 2012 19:41:37 +0000 (13:41 -0600)
committerVictor Julien <victor@inliniac.net>
Wed, 4 Apr 2012 07:44:39 +0000 (09:44 +0200)
src/source-erf-dag.c

index 98b1f2d359bba60fadaa4a3bcb3442450d9eb70a..a6c04c2440239504d13ea35b223723afb1520f67 100644 (file)
@@ -68,8 +68,6 @@ TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data)
 
 #else /* Implied we do have DAG support */
 
-#define DAG_MAX_READ_PKTS 256
-
 #include "source-erf-dag.h"
 #include <dagapi.h>
 
@@ -83,32 +81,26 @@ typedef struct ErfDagThreadVars_ {
     int dagfd;
     int dagstream;
     char dagname[DAGNAME_BUFSIZE];
-    uint32_t dag_max_read_packets;
 
     struct timeval maxwait, poll;   /* Could possibly be made static */
 
     uint32_t pkts;
     uint64_t bytes;
 
-    /* Track current location in the DAG stream input buffer
+    /* Current location in the DAG stream input buffer.
      */
-    uint8_t* top;                   /* We track top as well so we don't have to
-                                       call dag_advance_stream again if there
-                                       are still pkts to process.
-
-                                       JNM: Currently not used.
-                                     */
-    uint8_t* btm;
+    uint8_t *top;
+    uint8_t *btm;
 
 } ErfDagThreadVars;
 
+static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
+    uint32_t *pkts_read);
+static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
 TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
 TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
 void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
 TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
-TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
-                             uint32_t *pkts_read);
-TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
 
 TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
 TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
@@ -182,13 +174,6 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
 
     memset(ewtn, 0, sizeof(*ewtn));
 
-    /*  Use max_pending_packets as our maximum number of packets read
-     from the DAG buffer.
-     */
-    ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
-        DAG_MAX_READ_PKTS : max_pending_packets;
-
-
     /* dag_parse_name will return a DAG device name and stream number
      * to open for this thread.
      */
@@ -277,9 +262,9 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
      * Initialise DAG Polling parameters.
      */
     timerclear(&ewtn->maxwait);
-    ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
+    ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */
     timerclear(&ewtn->poll);
-    ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
+    ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */
 
     /* 32kB minimum data to return -- we still restrict the number of
      * pkts that are processed to a maximum of dag_max_read_packets.
@@ -317,15 +302,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
     ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
     TmSlot *s = (TmSlot *)slot;
     dtv->slot = s->slot_next;
-
-    SCEnter();
-
-    uint16_t packet_q_len = 0;
     uint32_t diff = 0;
     int      err;
     uint8_t  *top = NULL;
     uint32_t pkts_read = 0;
 
+    SCEnter();
+
     while (1)
     {
         if (suricata_ctl_flags & SURICATA_STOP ||
@@ -333,31 +316,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
             SCReturnInt(TM_ECODE_FAILED);
         }
 
-        /* Make sure we have at least one packet in the packet pool,
-         * to prevent us from alloc'ing packets at line rate. */
-        do {
-            packet_q_len = PacketPoolSize();
-            if (unlikely(packet_q_len == 0)) {
-                PacketPoolWait();
-            }
-        } while (packet_q_len == 0);
-
-        /* NOTE/JNM: This might not work well if we start restricting the
-         * number of ERF records processed per call to a small number as
-         * the over head required here could exceed the time it takes to
-         * process a small number of ERF records.
-         *
-         * XXX/JNM: Possibly process the DAG stream buffer first if there
-         * are ERF packets or else call dag_advance_stream and then process
-         * the DAG stream buffer.
-         */
         top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
-
-        if (NULL == top)
-        {
-            if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
-                usleep(10 * 1000);
-                dtv->btm = dtv->top;
+        if (top == NULL) {
+            if (errno == EAGAIN) {
+                if (dtv->dagstream & 0x1) {
+                    usleep(10 * 1000);
+                    dtv->btm = dtv->top;
+                }
                 continue;
             }
             else {
@@ -369,8 +334,7 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
         }
 
         diff = top - dtv->btm;
-        if (diff == 0)
-        {
+        if (diff == 0) {
             continue;
         }
 
@@ -385,10 +349,10 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
             ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
             SCReturnInt(err);
         }
-    }
 
-    SCLogDebug("Read %d records from stream: %d, DAG: %s",
-        pkts_read, dtv->dagstream, dtv->dagname);
+        SCLogDebug("Read %d records from stream: %d, DAG: %s",
+            pkts_read, dtv->dagstream, dtv->dagname);
+    }
 
     if (suricata_ctl_flags != 0) {
         SCReturnInt(TM_ECODE_FAILED);
@@ -403,61 +367,66 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
  * This function takes a pointer to buffer read from the DAG interface
  * and processes it individual records.
  */
-TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
-                             uint8_t* top,
-                             uint32_t *pkts_read)
+static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
+    uint32_t *pkts_read)
 {
     SCEnter();
 
-    Packet *p;
-    int     err = 0;
-    dag_record_t* dr = NULL;
-    char    *prec = NULL;
-    int     rlen;
+    int err = 0;
+    dag_record_t *dr = NULL;
+    char *prec = NULL;
+    int rlen;
+    char hdr_type = 0;
+    int processed = 0;
+    int packet_q_len = 0;
 
     *pkts_read = 0;
 
-    while(((top-(ewtn->btm))>=dag_record_size) &&
-          ((*pkts_read)<(ewtn->dag_max_read_packets)))
-    {
-        prec = (char*)ewtn->btm;
-        dr = (dag_record_t*)prec;
+    while (((top - ewtn->btm) >= dag_record_size) &&
+        ((processed + dag_record_size) < 4*1024*1024)) {
 
-        rlen = ntohs(dr->rlen);
+        /* Make sure we have at least one packet in the packet pool,
+         * to prevent us from alloc'ing packets at line rate. */
+        do {
+            packet_q_len = PacketPoolSize();
+            if (unlikely(packet_q_len == 0)) {
+                PacketPoolWait();
+            }
+        } while (packet_q_len == 0);
 
-        if (rlen == 20) {
-            rlen = 28;
-            SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
-                "Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
-                ewtn->dagstream, ewtn->dagname);
-        }
+        prec = (char *)ewtn->btm;
+        dr = (dag_record_t*)prec;
+        rlen = ntohs(dr->rlen);
+        hdr_type = dr->type;
 
         /* If we don't have enough data to finsih processing this ERF record
          * return and maybe next time we will.
          */
-        if ((top-(ewtn->btm)) < rlen)
+        if ((top - ewtn->btm) < rlen)
             SCReturnInt(TM_ECODE_OK);
 
-        p = PacketGetFromQueueOrAlloc();
-        if (p == NULL) {
-            SCLogError(SC_ERR_MEM_ALLOC,
-                       "Failed to allocate a Packet on stream: %d, DAG: %s",
-                       ewtn->dagstream, ewtn->dagname);
-            SCReturnInt(TM_ECODE_FAILED);
-        }
-
-        err = ProcessErfDagRecord(ewtn, prec, p);
+        ewtn->btm += rlen;
+        processed += rlen;
 
-        if (err != TM_ECODE_OK) {
-            TmqhOutputPacketpool(ewtn->tv, p);
-            SCReturnInt(err);
+        /* Only support ethernet at this time. */
+        switch (hdr_type & 0x7f) {
+        case TYPE_PAD:
+            /* Skip. */
+            continue;
+        case TYPE_ETH:
+        case TYPE_DSM_COLOR_ETH:
+        case TYPE_COLOR_ETH:
+        case TYPE_COLOR_HASH_ETH:
+            break;
+        default:
+            SCLogError(SC_ERR_UNIMPLEMENTED,
+                "Processing of DAG record type: %d not implemented.", dr->type);
+            SCReturnInt(TM_ECODE_FAILED);
         }
 
-        ewtn->btm += rlen;
-
-        err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
+        err = ProcessErfDagRecord(ewtn, prec);
         if (err != TM_ECODE_OK) {
-            return err;
+            SCReturnInt(TM_ECODE_FAILED);
         }
 
         (*pkts_read)++;
@@ -471,32 +440,49 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
  * \param   prec pointer to a DAG record.
  * \param
  */
-TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
+static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
 {
     SCEnter();
 
     int wlen = 0;
+    int rlen = 0;
+    int hdr_num = 0;
+    char hdr_type = 0;
     dag_record_t  *dr = (dag_record_t*)prec;
     erf_payload_t *pload;
+    Packet *p;
 
-    assert(prec);
-    assert(p);
+    hdr_type = dr->type;
+    wlen = ntohs(dr->wlen);
+    rlen = ntohs(dr->rlen);
 
-    if (p == NULL) SCReturnInt(TM_ECODE_OK);
+    /* count extension headers */
+    while (hdr_type & 0x80) {
+        if (rlen < (dag_record_size + (hdr_num * 8))) {
+            SCLogError(SC_ERR_UNIMPLEMENTED,
+                "Insufficient captured packet length.");
+            SCReturnInt(TM_ECODE_FAILED);
+        }
+        hdr_type = prec[(dag_record_size + (hdr_num * 8))];
+        hdr_num++;
+    }
 
-    /* Only support ethernet at this time. */
-    if (dr->type != TYPE_ETH &&
-           dr->type != TYPE_DSM_COLOR_ETH &&
-           dr->type != TYPE_COLOR_ETH &&
-           dr->type != TYPE_COLOR_HASH_ETH) {
-        SCLogError(SC_ERR_UNIMPLEMENTED,
-                   "Processing of DAG record type: %d not implemented.", dr->type);
-        SCReturnInt(TM_ECODE_FAILED);
+    /* Check that the whole frame was captured */
+    if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
+        SCLogInfo("Incomplete frame captured.");
+        SCReturnInt(TM_ECODE_OK);
     }
 
-    wlen = ntohs(dr->wlen);
+    /* skip over extension headers */
+    pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
 
-    pload = &(dr->rec);
+    p = PacketGetFromQueueOrAlloc();
+    if (p == NULL) {
+        SCLogError(SC_ERR_MEM_ALLOC,
+            "Failed to allocate a Packet on stream: %d, DAG: %s",
+            ewtn->dagstream, ewtn->dagname);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
 
     SET_PKT_LEN(p, wlen - 4);   /* Trim the FCS... */
     p->datalink = LINKTYPE_ETHERNET;
@@ -504,7 +490,10 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
     /* Take into account for link type Ethernet ETH frame starts
      * after ther ERF header + pad.
      */
-    PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
+    if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
+        TmqhOutputPacketpool(ewtn->tv, p);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
 
     /* Convert ERF time to timeval - from libpcap. */
     uint64_t ts = dr->ts;
@@ -520,6 +509,11 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
     ewtn->pkts++;
     ewtn->bytes += wlen;
 
+    if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
+        TmqhOutputPacketpool(ewtn->tv, p);
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
     SCReturnInt(TM_ECODE_OK);
 }