#else /* Implied we do have DAG support */
-#define DAG_MAX_READ_PKTS 256
-
#include "source-erf-dag.h"
#include <dagapi.h>
int dagfd;
int dagstream;
char dagname[DAGNAME_BUFSIZE];
- uint32_t dag_max_read_packets;
struct timeval maxwait, poll; /* Could possibly be made static */
uint32_t pkts;
uint64_t bytes;
- /* Track current location in the DAG stream input buffer
+ /* Current location in the DAG stream input buffer.
*/
- uint8_t* top; /* We track top as well so we don't have to
- call dag_advance_stream again if there
- are still pkts to process.
-
- JNM: Currently not used.
- */
- uint8_t* btm;
+ uint8_t *top;
+ uint8_t *btm;
} ErfDagThreadVars;
+static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
+ uint32_t *pkts_read);
+static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
-TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t* top,
- uint32_t *pkts_read);
-TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
memset(ewtn, 0, sizeof(*ewtn));
- /* Use max_pending_packets as our maximum number of packets read
- from the DAG buffer.
- */
- ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
- DAG_MAX_READ_PKTS : max_pending_packets;
-
-
/* dag_parse_name will return a DAG device name and stream number
* to open for this thread.
*/
* Initialise DAG Polling parameters.
*/
timerclear(&ewtn->maxwait);
- ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
+ ewtn->maxwait.tv_usec = 20 * 1000; /* 20ms timeout */
timerclear(&ewtn->poll);
- ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
+ ewtn->poll.tv_usec = 1 * 1000; /* 1ms poll interval */
/* 32kB minimum data to return -- we still restrict the number of
* pkts that are processed to a maximum of dag_max_read_packets.
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
dtv->slot = s->slot_next;
-
- SCEnter();
-
- uint16_t packet_q_len = 0;
uint32_t diff = 0;
int err;
uint8_t *top = NULL;
uint32_t pkts_read = 0;
+ SCEnter();
+
while (1)
{
if (suricata_ctl_flags & SURICATA_STOP ||
SCReturnInt(TM_ECODE_FAILED);
}
- /* Make sure we have at least one packet in the packet pool,
- * to prevent us from alloc'ing packets at line rate. */
- do {
- packet_q_len = PacketPoolSize();
- if (unlikely(packet_q_len == 0)) {
- PacketPoolWait();
- }
- } while (packet_q_len == 0);
-
- /* NOTE/JNM: This might not work well if we start restricting the
- * number of ERF records processed per call to a small number as
- * the over head required here could exceed the time it takes to
- * process a small number of ERF records.
- *
- * XXX/JNM: Possibly process the DAG stream buffer first if there
- * are ERF packets or else call dag_advance_stream and then process
- * the DAG stream buffer.
- */
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
-
- if (NULL == top)
- {
- if ((dtv->dagstream & 0x1) && (errno == EAGAIN)) {
- usleep(10 * 1000);
- dtv->btm = dtv->top;
+ if (top == NULL) {
+ if (errno == EAGAIN) {
+ if (dtv->dagstream & 0x1) {
+ usleep(10 * 1000);
+ dtv->btm = dtv->top;
+ }
continue;
}
else {
}
diff = top - dtv->btm;
- if (diff == 0)
- {
+ if (diff == 0) {
continue;
}
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
SCReturnInt(err);
}
- }
- SCLogDebug("Read %d records from stream: %d, DAG: %s",
- pkts_read, dtv->dagstream, dtv->dagname);
+ SCLogDebug("Read %d records from stream: %d, DAG: %s",
+ pkts_read, dtv->dagstream, dtv->dagname);
+ }
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
* This function takes a pointer to buffer read from the DAG interface
* and processes it individual records.
*/
-TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
- uint8_t* top,
- uint32_t *pkts_read)
+static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
+ uint32_t *pkts_read)
{
SCEnter();
- Packet *p;
- int err = 0;
- dag_record_t* dr = NULL;
- char *prec = NULL;
- int rlen;
+ int err = 0;
+ dag_record_t *dr = NULL;
+ char *prec = NULL;
+ int rlen;
+ char hdr_type = 0;
+ int processed = 0;
+ int packet_q_len = 0;
*pkts_read = 0;
- while(((top-(ewtn->btm))>=dag_record_size) &&
- ((*pkts_read)<(ewtn->dag_max_read_packets)))
- {
- prec = (char*)ewtn->btm;
- dr = (dag_record_t*)prec;
+ while (((top - ewtn->btm) >= dag_record_size) &&
+ ((processed + dag_record_size) < 4*1024*1024)) {
- rlen = ntohs(dr->rlen);
+ /* Make sure we have at least one packet in the packet pool,
+ * to prevent us from alloc'ing packets at line rate. */
+ do {
+ packet_q_len = PacketPoolSize();
+ if (unlikely(packet_q_len == 0)) {
+ PacketPoolWait();
+ }
+ } while (packet_q_len == 0);
- if (rlen == 20) {
- rlen = 28;
- SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
- "Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
- ewtn->dagstream, ewtn->dagname);
- }
+ prec = (char *)ewtn->btm;
+ dr = (dag_record_t*)prec;
+ rlen = ntohs(dr->rlen);
+ hdr_type = dr->type;
/* If we don't have enough data to finsih processing this ERF record
* return and maybe next time we will.
*/
- if ((top-(ewtn->btm)) < rlen)
+ if ((top - ewtn->btm) < rlen)
SCReturnInt(TM_ECODE_OK);
- p = PacketGetFromQueueOrAlloc();
- if (p == NULL) {
- SCLogError(SC_ERR_MEM_ALLOC,
- "Failed to allocate a Packet on stream: %d, DAG: %s",
- ewtn->dagstream, ewtn->dagname);
- SCReturnInt(TM_ECODE_FAILED);
- }
-
- err = ProcessErfDagRecord(ewtn, prec, p);
+ ewtn->btm += rlen;
+ processed += rlen;
- if (err != TM_ECODE_OK) {
- TmqhOutputPacketpool(ewtn->tv, p);
- SCReturnInt(err);
+ /* Only support ethernet at this time. */
+ switch (hdr_type & 0x7f) {
+ case TYPE_PAD:
+ /* Skip. */
+ continue;
+ case TYPE_ETH:
+ case TYPE_DSM_COLOR_ETH:
+ case TYPE_COLOR_ETH:
+ case TYPE_COLOR_HASH_ETH:
+ break;
+ default:
+ SCLogError(SC_ERR_UNIMPLEMENTED,
+ "Processing of DAG record type: %d not implemented.", dr->type);
+ SCReturnInt(TM_ECODE_FAILED);
}
- ewtn->btm += rlen;
-
- err = TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p);
+ err = ProcessErfDagRecord(ewtn, prec);
if (err != TM_ECODE_OK) {
- return err;
+ SCReturnInt(TM_ECODE_FAILED);
}
(*pkts_read)++;
* \param prec pointer to a DAG record.
* \param
*/
-TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
+static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
{
SCEnter();
int wlen = 0;
+ int rlen = 0;
+ int hdr_num = 0;
+ char hdr_type = 0;
dag_record_t *dr = (dag_record_t*)prec;
erf_payload_t *pload;
+ Packet *p;
- assert(prec);
- assert(p);
+ hdr_type = dr->type;
+ wlen = ntohs(dr->wlen);
+ rlen = ntohs(dr->rlen);
- if (p == NULL) SCReturnInt(TM_ECODE_OK);
+ /* count extension headers */
+ while (hdr_type & 0x80) {
+ if (rlen < (dag_record_size + (hdr_num * 8))) {
+ SCLogError(SC_ERR_UNIMPLEMENTED,
+ "Insufficient captured packet length.");
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+ hdr_type = prec[(dag_record_size + (hdr_num * 8))];
+ hdr_num++;
+ }
- /* Only support ethernet at this time. */
- if (dr->type != TYPE_ETH &&
- dr->type != TYPE_DSM_COLOR_ETH &&
- dr->type != TYPE_COLOR_ETH &&
- dr->type != TYPE_COLOR_HASH_ETH) {
- SCLogError(SC_ERR_UNIMPLEMENTED,
- "Processing of DAG record type: %d not implemented.", dr->type);
- SCReturnInt(TM_ECODE_FAILED);
+ /* Check that the whole frame was captured */
+ if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
+ SCLogInfo("Incomplete frame captured.");
+ SCReturnInt(TM_ECODE_OK);
}
- wlen = ntohs(dr->wlen);
+ /* skip over extension headers */
+ pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
- pload = &(dr->rec);
+ p = PacketGetFromQueueOrAlloc();
+ if (p == NULL) {
+ SCLogError(SC_ERR_MEM_ALLOC,
+ "Failed to allocate a Packet on stream: %d, DAG: %s",
+ ewtn->dagstream, ewtn->dagname);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
SET_PKT_LEN(p, wlen - 4); /* Trim the FCS... */
p->datalink = LINKTYPE_ETHERNET;
/* Take into account for link type Ethernet ETH frame starts
* after ther ERF header + pad.
*/
- PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p));
+ if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
+ TmqhOutputPacketpool(ewtn->tv, p);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr->ts;
ewtn->pkts++;
ewtn->bytes += wlen;
+ if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
+ TmqhOutputPacketpool(ewtn->tv, p);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
SCReturnInt(TM_ECODE_OK);
}