]> git.ipfire.org Git - people/ms/suricata.git/commitdiff
Add multi packet reading for pcap live mode. Add a partly lock free multi writer...
authorVictor Julien <victor@inliniac.net>
Tue, 1 Jun 2010 10:43:33 +0000 (12:43 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 1 Jun 2010 10:43:33 +0000 (12:43 +0200)
src/runmodes.c
src/source-pcap-file.c
src/source-pcap.c
src/threadvars.h
src/tm-queuehandlers.h
src/tm-threads.c
src/tmqh-ringbuffer.c
src/tmqh-simple.c
src/util-ringbuffer.c
src/util-ringbuffer.h

index 5a9a923d9c11dbf9af2abf94615c112df2cbbc4d..75372f0801167394bdfe57dd2d3595a329ecfbd8 100644 (file)
@@ -1900,7 +1900,7 @@ int RunModeIdsPcapAuto(DetectEngineCtx *de_ctx, char *iface) {
 
     TimeModeSetLive();
     /* create the threads */
-    ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
+    ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot");
     if (tv_receivepcap == NULL) {
         printf("ERROR: TmThreadsCreate failed\n");
         exit(EXIT_FAILURE);
@@ -2116,28 +2116,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
         printf("ERROR: TmThreadSpawn failed\n");
         exit(EXIT_FAILURE);
     }
-/*
-    ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
-    if (tv_stream1 == NULL) {
-        printf("ERROR: TmThreadsCreate failed for Stream1\n");
-        exit(EXIT_FAILURE);
-    }
-    tm_module = TmModuleGetByName("StreamTcp");
-    if (tm_module == NULL) {
-        printf("ERROR: TmModuleGetByName StreamTcp failed\n");
-        exit(EXIT_FAILURE);
-    }
-    Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
-
-    TmThreadSetCPUAffinity(tv_stream1, 0);
-    if (ncpus > 1)
-        TmThreadSetThreadPriority(tv_stream1, PRIO_MEDIUM);
 
-    if (TmThreadSpawn(tv_stream1) != TM_ECODE_OK) {
-        printf("ERROR: TmThreadSpawn failed\n");
-        exit(EXIT_FAILURE);
-    }
-*/
     for (cpu = 0; cpu < ncpus; cpu++) {
         snprintf(tname, sizeof(tname),"Detect%"PRIu16, cpu+1);
         if (tname == NULL)
@@ -2146,7 +2125,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
         char *thread_name = SCStrdup(tname);
         SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
 
-        ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","simple","1slot");
+        ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","ringbuffer","1slot");
         if (tv_detect_ncpu == NULL) {
             printf("ERROR: TmThreadsCreate failed\n");
             exit(EXIT_FAILURE);
@@ -2176,7 +2155,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
     }
 
     ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
-        "alert-queue1", "simple", "packetpool", "packetpool", "varslot");
+        "alert-queue1", "ringbuffer", "packetpool", "packetpool", "varslot");
     SetupOutputs(tv_outputs);
 
     TmThreadSetCPUAffinity(tv_outputs, 0);
index 11ebece35b5c50da55d5e2493fbb4d9e40d10f06..ac777d832534149c157f51ecdf24d9814b5bc9b7 100644 (file)
@@ -45,6 +45,7 @@
 #include "util-privs.h"
 
 extern int max_pending_packets;
+
 static int pcap_max_read_packets = 0;
 
 typedef struct PcapFileGlobalVars_ {
@@ -148,6 +149,7 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
  */
 TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
     SCEnter();
+    uint16_t packet_q_len = 0;
 
     PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
 
@@ -155,11 +157,24 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
         SCReturnInt(TM_ECODE_FAILED);
     }
 
+    /* make sure we have at least one packet in the packet pool, to prevent
+     * us from alloc'ing packets at line rate */
+    SCMutexLock(&packet_q.mutex_q);
+    packet_q_len = packet_q.len;
+    if (packet_q.len == 0) {
+        SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
+    }
+    packet_q_len = packet_q.len;
+    SCMutexUnlock(&packet_q.mutex_q);
+
+    if (postpq == NULL)
+        pcap_max_read_packets = 1;
+
     ptv->array_idx = 0;
     ptv->in_p = p;
 
     /* Right now we just support reading packets one at a time. */
-    int r = pcap_dispatch(pcap_g.pcap_handle, pcap_max_read_packets,
+    int r = pcap_dispatch(pcap_g.pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len,
             (pcap_handler)PcapFileCallback, (u_char *)ptv);
 
     uint16_t cnt = 0;
@@ -172,7 +187,7 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
             pp->pcap_cnt = pcap_g.cnt;
             PacketEnqueue(postpq, pp);
         } else {
-            p->pcap_cnt = pcap_g.cnt;
+            pp->pcap_cnt = pcap_g.cnt;
         }
     }
 
index 42ec468adf5e0b19a1512dfabd8c1cc033266288..6fc3e0b70496179d4ba19bf1f8a895c14b3a656b 100644 (file)
 #include "util-error.h"
 #include "util-privs.h"
 
+extern uint8_t suricata_ctl_flags;
 extern int max_pending_packets;
 
+static int pcap_max_read_packets = 0;
+
+/** max packets < 65536 */
+#define PCAP_FILE_MAX_PKTS 256
+
 /**
  * \brief Structure to hold thread specific variables.
  */
@@ -66,6 +72,11 @@ typedef struct PcapThreadVars_
     uint32_t errs;
 
     ThreadVars *tv;
+
+    Packet *in_p;
+
+    Packet *array[PCAP_FILE_MAX_PKTS];
+    uint16_t array_idx;
 } PcapThreadVars;
 
 TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
@@ -117,9 +128,19 @@ void TmModuleDecodePcapRegister (void) {
 void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
     SCLogDebug("user %p, h %p, pkt %p", user, h, pkt);
     PcapThreadVars *ptv = (PcapThreadVars *)user;
-    ThreadVars *tv = ptv->tv;
+    //ThreadVars *tv = ptv->tv;
+
+    Packet *p = NULL;
+    if (ptv->array_idx == 0) {
+        p = ptv->in_p;
+    } else {
+        p = PacketGetFromQueueOrAlloc();
+    }
+
+    if (p == NULL) {
+        return;
+    }
 
-    Packet *p = tv->tmqh_in(tv);
     p->ts.tv_sec = h->ts.tv_sec;
     p->ts.tv_usec = h->ts.tv_usec;
 
@@ -131,8 +152,9 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
     memcpy(p->pkt, pkt, p->pktlen);
     SCLogDebug("p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)", p->pktlen, *pkt, *p->pkt);
 
-    /* pass on... */
-    tv->tmqh_out(tv, p);
+    /* store the packet in our array */
+    ptv->array[ptv->array_idx] = p;
+    ptv->array_idx++;
 }
 
 /**
@@ -148,29 +170,63 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
  */
 TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
     SCEnter();
+    uint16_t packet_q_len = 0;
+
     PcapThreadVars *ptv = (PcapThreadVars *)data;
 
-    /* Just read one packet at a time for now. */
+    /* make sure we have at least one packet in the packet pool, to prevent
+     * us from alloc'ing packets at line rate */
+    SCMutexLock(&packet_q.mutex_q);
+    packet_q_len = packet_q.len;
+    if (packet_q.len == 0) {
+        SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
+    }
+    packet_q_len = packet_q.len;
+    SCMutexUnlock(&packet_q.mutex_q);
+
+    if (postpq == NULL)
+        pcap_max_read_packets = 1;
+
+    ptv->array_idx = 0;
+    ptv->in_p = p;
+
     int r = 0;
     while (r == 0) {
-        r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv);
-        if (r < 0) {
-            SCLogError(SC_ERR_PCAP_DISPATCH, "error code %"PRId32" %s",
-                r, pcap_geterr(ptv->pcap_handle));
+        r = pcap_dispatch(ptv->pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len,
+            (pcap_handler)PcapCallback, (u_char *)ptv);
+        if (suricata_ctl_flags != 0) {
             break;
         }
+    }
+
+    uint16_t cnt = 0;
+    for (cnt = 0; cnt < ptv->array_idx; cnt++) {
+        Packet *pp = ptv->array[cnt];
 
-        if (TmThreadsCheckFlag(tv, THV_KILL) || TmThreadsCheckFlag(tv, THV_PAUSE)) {
-            SCLogInfo("pcap packet reading interrupted");
-            SCReturnInt(TM_ECODE_OK);
+        /* enqueue all but the first in the postpq, the first
+         * pkt is handled by the tv "out handler" */
+        if (cnt > 0) {
+            PacketEnqueue(postpq, pp);
         }
     }
 
+    if (r < 0) {
+        SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
+                r, pcap_geterr(ptv->pcap_handle));
+
+        EngineStop();
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
+    if (suricata_ctl_flags != 0) {
+        SCReturnInt(TM_ECODE_FAILED);
+    }
+
     SCReturnInt(TM_ECODE_OK);
 }
 
 /**
- * \brief Init function for RecievePcap.
+ * \brief Init function for ReceivePcap.
  *
  * This is a setup function for recieving packets
  * via libpcap. There are two versions of this function
@@ -189,6 +245,11 @@ TmEcode ReceivePcapThreadInit(ThreadVars *tv, void *initdata, void **data) {
     SCEnter();
     char *tmpbpfstring;
 
+    /* use max_pending_packets as pcap read size unless it's bigger than
+     * our size limit */
+    pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
+        PCAP_FILE_MAX_PKTS : max_pending_packets;
+
     if (initdata == NULL) {
         SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
         SCReturnInt(TM_ECODE_FAILED);
@@ -276,6 +337,11 @@ TmEcode ReceivePcapThreadInit(ThreadVars *tv, void *initdata, void **data) {
 
     char *tmpbpfstring;
 
+    /* use max_pending_packets as pcap read size unless it's bigger than
+     * our size limit */
+    pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
+        PCAP_FILE_MAX_PKTS : max_pending_packets;
+
     if (initdata == NULL) {
         SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
         SCReturnInt(TM_ECODE_FAILED);
index d3b0cfa9996cb5a0d6106c96f0199e4552a8fb91..52e82ed6c6def5786aaf57e50b05cdb5546fe817 100644 (file)
@@ -70,6 +70,7 @@ typedef struct ThreadVars_ {
 
     /** queue handlers */
     struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
+    void (*InShutdownHandler)(struct ThreadVars_ *);
     void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);
 
     /** slot functions */
index 23532dfeec8aaf0f830fb32d88fe64a2e99eceae..a5b98e308b69c05f254003262c28889e00b8f874 100644 (file)
@@ -37,6 +37,7 @@ enum {
 typedef struct Tmqh_ {
     char *name;
     Packet *(*InHandler)(ThreadVars *);
+    void (*InShutdownHandler)(ThreadVars *);
     void (*OutHandler)(ThreadVars *, Packet *);
     void *(*OutHandlerCtxSetup)(char *);
     void (*OutHandlerCtxFree)(void *);
index 67378c797fd23825eb9ff29685773f08730b0346..5effde7c4f6c3176a05a450e37fc85bc0650e442 100644 (file)
@@ -859,6 +859,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
         if (tmqh == NULL) goto error;
 
         tv->tmqh_in = tmqh->InHandler;
+        tv->InShutdownHandler = tmqh->InShutdownHandler;
         SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
     }
 
@@ -1051,6 +1052,9 @@ void TmThreadKillThread(ThreadVars *tv)
 
     if (tv->inq != NULL) {
         /* signal the queue for the number of users */
+                if (tv->InShutdownHandler != NULL) {
+                    tv->InShutdownHandler(tv);
+                }
         for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
             SCCondSignal(&trans_q[tv->inq->id].cond_q);
 
@@ -1060,6 +1064,9 @@ void TmThreadKillThread(ThreadVars *tv)
                 break;
             }
 
+                if (tv->InShutdownHandler != NULL) {
+                    tv->InShutdownHandler(tv);
+                }
             for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
                 SCCondSignal(&trans_q[tv->inq->id].cond_q);
 
@@ -1107,6 +1114,9 @@ void TmThreadKillThreads(void) {
 
                 /* signal the queue for the number of users */
 
+                if (tv->InShutdownHandler != NULL) {
+                    tv->InShutdownHandler(tv);
+                }
                 for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
                     SCCondSignal(&trans_q[tv->inq->id].cond_q);
 
@@ -1120,6 +1130,10 @@ void TmThreadKillThreads(void) {
 
                     cnt++;
 
+                    if (tv->InShutdownHandler != NULL) {
+                        tv->InShutdownHandler(tv);
+                    }
+
                     for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
                         SCCondSignal(&trans_q[tv->inq->id].cond_q);
 
index 9ab274fff03c1a1bf1b78ec13cd51027ee3acd61..6e2c80dc0c005331749ba5c2a0d9870a49e9566e 100644 (file)
 
 #include "util-ringbuffer.h"
 
-static RingBufferMrSw *ringbuffers[256];
+static RingBufferMrMw8 *ringbuffers[256];
 
 Packet *TmqhInputRingBuffer(ThreadVars *t);
 void TmqhOutputRingBuffer(ThreadVars *t, Packet *p);
+void TmqhInputRingBufferShutdownHandler(ThreadVars *);
 
 void TmqhRingBufferRegister (void) {
     tmqh_table[TMQH_RINGBUFFER].name = "ringbuffer";
     tmqh_table[TMQH_RINGBUFFER].InHandler = TmqhInputRingBuffer;
+    tmqh_table[TMQH_RINGBUFFER].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
     tmqh_table[TMQH_RINGBUFFER].OutHandler = TmqhOutputRingBuffer;
 
+    memset(ringbuffers, 0, sizeof(ringbuffers));
+
     int i = 0;
     for (i = 0; i < 256; i++) {
-        ringbuffers[i] = RingBufferMrSwInit();
+        ringbuffers[i] = RingBufferMrMw8Init();
     }
 }
 
 Packet *TmqhInputRingBuffer(ThreadVars *t)
 {
-    RingBufferMrSw *rb = ringbuffers[t->inq->id];
+    RingBufferMrMw8 *rb = ringbuffers[t->inq->id];
 
-    Packet *p = (Packet *)RingBufferMrSwGet(rb);
+    Packet *p = (Packet *)RingBufferMrMw8Get(rb);
     return p;
 }
 
+void TmqhInputRingBufferShutdownHandler(ThreadVars *tv) {
+    if (tv == NULL || tv->inq == NULL) {
+        return;
+    }
+
+    RingBufferMrMw8 *rb = ringbuffers[tv->inq->id];
+    if (rb == NULL) {
+        return;
+    }
+
+    rb->shutdown = 1;
+}
+
 void TmqhOutputRingBuffer(ThreadVars *t, Packet *p)
 {
-    RingBufferMrSw *rb = ringbuffers[t->outq->id];
-    RingBufferMrSwPut(rb, (void *)p);
+    RingBufferMrMw8 *rb = ringbuffers[t->outq->id];
+    RingBufferMrMw8Put(rb, (void *)p);
 }
 
index 57836440ce5d890af08cebf1e12a92b177f53bc4..0089a36560299f15f4dcc24cbf820f191767c9b8 100644 (file)
 
 Packet *TmqhInputSimple(ThreadVars *t);
 void TmqhOutputSimple(ThreadVars *t, Packet *p);
+void TmqhInputSimpleShutdownHandler(ThreadVars *);
 
 void TmqhSimpleRegister (void) {
     tmqh_table[TMQH_SIMPLE].name = "simple";
     tmqh_table[TMQH_SIMPLE].InHandler = TmqhInputSimple;
+    tmqh_table[TMQH_SIMPLE].InShutdownHandler = TmqhInputSimpleShutdownHandler;
     tmqh_table[TMQH_SIMPLE].OutHandler = TmqhOutputSimple;
 }
 
@@ -65,6 +67,17 @@ Packet *TmqhInputSimple(ThreadVars *t)
     }
 }
 
+void TmqhInputSimpleShutdownHandler(ThreadVars *tv) {
+    int i;
+
+    if (tv == NULL || tv->inq == NULL) {
+        return;
+    }
+
+    for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
+        SCCondSignal(&trans_q[tv->inq->id].cond_q);
+}
+
 void TmqhOutputSimple(ThreadVars *t, Packet *p)
 {
     SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false");
index 5b3ca5bda5135bc1e7fe8e6bf761be7d36016544..8bae394dd75f1a73cbc399a6051b009f27528c3e 100644 (file)
@@ -2,8 +2,84 @@
 #include "suricata.h"
 #include "util-ringbuffer.h"
 
-/* suricata engine control flags */
-extern uint8_t suricata_ctl_flags;
+/* Multi Reader, Single Writer, 8 bits */
+
+RingBufferMrSw8 *RingBufferMrSw8Init(void) {
+    RingBufferMrSw8 *rb = SCMalloc(sizeof(RingBufferMrSw8));
+    if (rb == NULL) {
+        return NULL;
+    }
+
+    memset(rb, 0x00, sizeof(RingBufferMrSw8));
+    return rb;
+}
+
+void RingBufferMrSw8Destroy(RingBufferMrSw8 *rb) {
+    if (rb != NULL) {
+        SCFree(rb);
+    }
+}
+
+/**
+ *  \brief get the next ptr from the ring buffer
+ *
+ *  Because we allow for multiple readers we take great care in making sure
+ *  that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrSw8Get(RingBufferMrSw8 *rb) {
+    void *ptr;
+    /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+     *  fails we increase our local array idx to try the next array member
+     *  until we succeed. Or when the buffer is empty again we jump back
+     *  to the waiting loop. */
+    unsigned char readp;
+
+    /* buffer is empty, wait... */
+retry:
+    while (rb->read == rb->write) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return NULL;
+
+        usleep(1);
+    }
+
+    /* atomically update rb->read */
+    readp = rb->read - 1;
+    do {
+        /* with multiple readers we can get in the situation that we exitted
+         * from the wait loop but the rb is empty again once we get here. */
+        if (rb->read == rb->write)
+            goto retry;
+
+        readp++;
+        ptr = rb->array[readp];
+    } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+    SCLogDebug("ptr %p", ptr);
+    return ptr;
+}
+
+/**
+ *  \brief put a ptr in the RingBuffer
+ */
+int RingBufferMrSw8Put(RingBufferMrSw8 *rb, void *ptr) {
+    SCLogDebug("ptr %p", ptr);
+
+    /* buffer is full, wait... */
+    while ((rb->write + 1) == rb->read) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return -1;
+
+        usleep(1);
+    }
+
+    rb->array[rb->write] = ptr;
+    __sync_fetch_and_add(&rb->write, 1);
+    return 0;
+}
 
 /* Multi Reader, Single Writer */
 
@@ -42,7 +118,7 @@ void *RingBufferMrSwGet(RingBufferMrSw *rb) {
 retry:
     while (rb->read == rb->write) {
         /* break out if the engine wants to shutdown */
-        if (suricata_ctl_flags != 0)
+        if (rb->shutdown != 0)
             return NULL;
 
         usleep(1);
@@ -73,7 +149,7 @@ int RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
     /* buffer is full, wait... */
     while ((rb->write + 1) == rb->read) {
         /* break out if the engine wants to shutdown */
-        if (suricata_ctl_flags != 0)
+        if (rb->shutdown != 0)
             return -1;
 
         usleep(1);
@@ -109,7 +185,7 @@ void *RingBufferSrSwGet(RingBufferSrSw *rb) {
     /* buffer is empty, wait... */
     while (rb->read == rb->write) {
         /* break out if the engine wants to shutdown */
-        if (suricata_ctl_flags != 0)
+        if (rb->shutdown != 0)
             return NULL;
 
         usleep(1);
@@ -125,7 +201,7 @@ int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
     /* buffer is full, wait... */
     while ((rb->write + 1) == rb->read) {
         /* break out if the engine wants to shutdown */
-        if (suricata_ctl_flags != 0)
+        if (rb->shutdown != 0)
             return -1;
 
         usleep(1);
@@ -136,3 +212,225 @@ int RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
     return 0;
 }
 
+/* Multi Reader, Multi Writer, 8 bits */
+
+RingBufferMrMw8 *RingBufferMrMw8Init(void) {
+    RingBufferMrMw8 *rb = SCMalloc(sizeof(RingBufferMrMw8));
+    if (rb == NULL) {
+        return NULL;
+    }
+
+    memset(rb, 0x00, sizeof(RingBufferMrMw8));
+
+    SCSpinInit(&rb->spin, 0);
+    return rb;
+}
+
+void RingBufferMrMw8Destroy(RingBufferMrMw8 *rb) {
+    if (rb != NULL) {
+        SCSpinDestroy(&rb->spin);
+        SCFree(rb);
+    }
+}
+
+/**
+ *  \brief get the next ptr from the ring buffer
+ *
+ *  Because we allow for multiple readers we take great care in making sure
+ *  that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMw8Get(RingBufferMrMw8 *rb) {
+    void *ptr;
+    /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+     *  fails we increase our local array idx to try the next array member
+     *  until we succeed. Or when the buffer is empty again we jump back
+     *  to the waiting loop. */
+    unsigned char readp;
+
+    /* buffer is empty, wait... */
+retry:
+    while (rb->read == rb->write) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return NULL;
+
+        usleep(1);
+    }
+
+    /* atomically update rb->read */
+    readp = rb->read - 1;
+    do {
+        /* with multiple readers we can get in the situation that we exitted
+         * from the wait loop but the rb is empty again once we get here. */
+        if (rb->read == rb->write)
+            goto retry;
+
+        readp++;
+        ptr = rb->array[readp];
+    } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+    SCLogDebug("ptr %p", ptr);
+    return ptr;
+}
+
+/**
+ *  \brief put a ptr in the RingBuffer.
+ *
+ *  As we support multiple writers we need to protect 2 things:
+ *   1. writing the ptr to the array
+ *   2. incrementing the rb->write idx
+ *
+ *  We can't do both at the same time in one atomic operation, so
+ *  we need to (spin) lock it. We do increment rb->write atomically
+ *  after that, so that we don't need to use the lock in our *Get
+ *  function.
+ *
+ *  \param rb the ringbuffer
+ *  \param ptr ptr to store
+ *
+ *  \retval 0 ok
+ *  \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMw8Put(RingBufferMrMw8 *rb, void *ptr) {
+    SCLogDebug("ptr %p", ptr);
+
+    /* buffer is full, wait... */
+retry:
+    while ((rb->write + 1) == rb->read) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return -1;
+
+        usleep(1);
+    }
+
+    /* get our lock */
+    SCSpinLock(&rb->spin);
+    /* if while we got our lock the buffer changed, we need to retry */
+    if ((rb->write + 1) == rb->read) {
+        SCSpinUnlock(&rb->spin);
+        goto retry;
+    }
+
+    SCLogDebug("rb->write %u, ptr %p", rb->write, ptr);
+
+    /* update the ring buffer */
+    rb->array[rb->write] = ptr;
+    __sync_fetch_and_add(&rb->write, 1);
+    SCSpinUnlock(&rb->spin);
+    SCLogDebug("ptr %p, done", ptr);
+    return 0;
+}
+
+/* Multi Reader, Multi Writer, 16 bits */
+
+RingBufferMrMw *RingBufferMrMwInit(void) {
+    RingBufferMrMw *rb = SCMalloc(sizeof(RingBufferMrMw));
+    if (rb == NULL) {
+        return NULL;
+    }
+
+    memset(rb, 0x00, sizeof(RingBufferMrMw));
+
+    SCSpinInit(&rb->spin, 0);
+    return rb;
+}
+
+void RingBufferMrMwDestroy(RingBufferMrMw *rb) {
+    if (rb != NULL) {
+        SCSpinDestroy(&rb->spin);
+        SCFree(rb);
+    }
+}
+
+/**
+ *  \brief get the next ptr from the ring buffer
+ *
+ *  Because we allow for multiple readers we take great care in making sure
+ *  that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMwGet(RingBufferMrMw *rb) {
+    void *ptr;
+    /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+     *  fails we increase our local array idx to try the next array member
+     *  until we succeed. Or when the buffer is empty again we jump back
+     *  to the waiting loop. */
+    unsigned short readp;
+
+    /* buffer is empty, wait... */
+retry:
+    while (rb->read == rb->write) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return NULL;
+
+        usleep(1);
+    }
+
+    /* atomically update rb->read */
+    readp = rb->read - 1;
+    do {
+        /* with multiple readers we can get in the situation that we exitted
+         * from the wait loop but the rb is empty again once we get here. */
+        if (rb->read == rb->write)
+            goto retry;
+
+        readp++;
+        ptr = rb->array[readp];
+    } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+    SCLogDebug("ptr %p", ptr);
+    return ptr;
+}
+
+/**
+ *  \brief put a ptr in the RingBuffer.
+ *
+ *  As we support multiple writers we need to protect 2 things:
+ *   1. writing the ptr to the array
+ *   2. incrementing the rb->write idx
+ *
+ *  We can't do both at the same time in one atomic operation, so
+ *  we need to (spin) lock it. We do increment rb->write atomically
+ *  after that, so that we don't need to use the lock in our *Get
+ *  function.
+ *
+ *  \param rb the ringbuffer
+ *  \param ptr ptr to store
+ *
+ *  \retval 0 ok
+ *  \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMwPut(RingBufferMrMw *rb, void *ptr) {
+    SCLogDebug("ptr %p", ptr);
+
+    /* buffer is full, wait... */
+retry:
+    while ((rb->write + 1) == rb->read) {
+        /* break out if the engine wants to shutdown */
+        if (rb->shutdown != 0)
+            return -1;
+
+        usleep(1);
+    }
+
+    /* get our lock */
+    SCSpinLock(&rb->spin);
+    /* if while we got our lock the buffer changed, we need to retry */
+    if ((rb->write + 1) == rb->read) {
+        SCSpinUnlock(&rb->spin);
+        goto retry;
+    }
+
+    SCLogDebug("rb->write %u, ptr %p", rb->write, ptr);
+
+    /* update the ring buffer */
+    rb->array[rb->write] = ptr;
+    __sync_fetch_and_add(&rb->write, 1);
+    SCSpinUnlock(&rb->spin);
+    SCLogDebug("ptr %p, done", ptr);
+    return 0;
+}
+
index aeacc43b418872f2ddfb32a97dd079de339f9f0d..15b3b62b3d99a630fb0fe084130e99b3e1d12c13 100644 (file)
@@ -6,6 +6,23 @@
  *  read and write pointer. Only the read ptr needs atomic updating.
  */
 
+#define RING_BUFFER_MRSW_8_SIZE 256
+
+/** Multiple Reader, Single Writer ring buffer, fixed at
+ *  256 items so we can use unsigned char's that just
+ *  wrap around */
+typedef struct RingBufferMrSw8_ {
+    unsigned char write;  /**< idx where we put data */
+    unsigned char read;   /**< idx where we read data */
+    uint8_t shutdown;
+    void *array[RING_BUFFER_MRSW_8_SIZE];
+} RingBufferMrSw8;
+
+void *RingBufferMrSw8Get(RingBufferMrSw8 *);
+int RingBufferMrSw8Put(RingBufferMrSw8 *, void *);
+RingBufferMrSw8 *RingBufferMrSw8Init(void);
+void RingBufferMrSw8Destroy(RingBufferMrSw8 *);
+
 #define RING_BUFFER_MRSW_SIZE 65536
 
 /** Multiple Reader, Single Writer ring buffer, fixed at
@@ -14,6 +31,7 @@
 typedef struct RingBufferMrSw_ {
     unsigned short write;  /**< idx where we put data */
     unsigned short read;   /**< idx where we read data */
+    uint8_t shutdown;
     void *array[RING_BUFFER_MRSW_SIZE];
 } RingBufferMrSw;
 
@@ -30,6 +48,7 @@ void RingBufferMrSwDestroy(RingBufferMrSw *);
 typedef struct RingBufferSrSw_ {
     unsigned short write;  /**< idx where we put data */
     unsigned short read;   /**< idx where we read data */
+    uint8_t shutdown;
     void *array[RING_BUFFER_SRSW_SIZE];
 } RingBufferSrSw;
 
@@ -38,5 +57,41 @@ int RingBufferSrSwPut(RingBufferSrSw *, void *);
 RingBufferSrSw *RingBufferSrSwInit(void);
 void RingBufferSrSwDestroy(RingBufferSrSw *);
 
+#define RING_BUFFER_MRMW_8_SIZE 256
+
+/** Multiple Reader, Multi Writer ring buffer, fixed at
+ *  256 items so we can use unsigned char's that just
+ *  wrap around */
+typedef struct RingBufferMrMw8_ {
+    unsigned char write;  /**< idx where we put data */
+    unsigned char read;   /**< idx where we read data */
+    uint8_t shutdown;
+    SCSpinlock spin;      /**< lock protecting writes */
+    void *array[RING_BUFFER_MRMW_8_SIZE];
+} RingBufferMrMw8;
+
+void *RingBufferMrMw8Get(RingBufferMrMw8 *);
+int RingBufferMrMw8Put(RingBufferMrMw8 *, void *);
+RingBufferMrMw8 *RingBufferMrMw8Init(void);
+void RingBufferMrMw8Destroy(RingBufferMrMw8 *);
+
+#define RING_BUFFER_MRMW_SIZE 65536
+
+/** Multiple Reader, Multi Writer ring buffer, fixed at
+ *  65536 items so we can use unsigned char's that just
+ *  wrap around */
+typedef struct RingBufferMrMw_ {
+    unsigned short write;  /**< idx where we put data */
+    unsigned short read;   /**< idx where we read data */
+    uint8_t shutdown;
+    SCSpinlock spin;      /**< lock protecting writes */
+    void *array[RING_BUFFER_MRMW_SIZE];
+} RingBufferMrMw;
+
+void *RingBufferMrMwGet(RingBufferMrMw *);
+int RingBufferMrMwPut(RingBufferMrMw *, void *);
+RingBufferMrMw *RingBufferMrMwInit(void);
+void RingBufferMrMwDestroy(RingBufferMrMw *);
+
 #endif /* __UTIL_RINGBUFFER_H__ */