TimeModeSetLive();
/* create the threads */
- ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
+ ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
-/*
- ThreadVars *tv_stream1 = TmThreadCreatePacketHandler("Stream1","decode-queue1","simple","stream-queue1","simple","1slot");
- if (tv_stream1 == NULL) {
- printf("ERROR: TmThreadsCreate failed for Stream1\n");
- exit(EXIT_FAILURE);
- }
- tm_module = TmModuleGetByName("StreamTcp");
- if (tm_module == NULL) {
- printf("ERROR: TmModuleGetByName StreamTcp failed\n");
- exit(EXIT_FAILURE);
- }
- Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
-
- TmThreadSetCPUAffinity(tv_stream1, 0);
- if (ncpus > 1)
- TmThreadSetThreadPriority(tv_stream1, PRIO_MEDIUM);
- if (TmThreadSpawn(tv_stream1) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(EXIT_FAILURE);
- }
-*/
for (cpu = 0; cpu < ncpus; cpu++) {
snprintf(tname, sizeof(tname),"Detect%"PRIu16, cpu+1);
if (tname == NULL)
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
- ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","simple","1slot");
+ ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","ringbuffer","1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
- "alert-queue1", "simple", "packetpool", "packetpool", "varslot");
+ "alert-queue1", "ringbuffer", "packetpool", "packetpool", "varslot");
SetupOutputs(tv_outputs);
TmThreadSetCPUAffinity(tv_outputs, 0);
#include "util-privs.h"
extern int max_pending_packets;
+
static int pcap_max_read_packets = 0;
typedef struct PcapFileGlobalVars_ {
*/
TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
SCEnter();
+ uint16_t packet_q_len = 0;
PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
SCReturnInt(TM_ECODE_FAILED);
}
+ /* make sure we have at least one packet in the packet pool, to prevent
+ * us from alloc'ing packets at line rate */
+ SCMutexLock(&packet_q.mutex_q);
+ packet_q_len = packet_q.len;
+ if (packet_q.len == 0) {
+ SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
+ }
+ packet_q_len = packet_q.len;
+ SCMutexUnlock(&packet_q.mutex_q);
+
+ if (postpq == NULL)
+ pcap_max_read_packets = 1;
+
ptv->array_idx = 0;
ptv->in_p = p;
/* Right now we just support reading packets one at a time. */
- int r = pcap_dispatch(pcap_g.pcap_handle, pcap_max_read_packets,
+ int r = pcap_dispatch(pcap_g.pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len,
(pcap_handler)PcapFileCallback, (u_char *)ptv);
uint16_t cnt = 0;
pp->pcap_cnt = pcap_g.cnt;
PacketEnqueue(postpq, pp);
} else {
- p->pcap_cnt = pcap_g.cnt;
+ pp->pcap_cnt = pcap_g.cnt;
}
}
#include "util-error.h"
#include "util-privs.h"
+extern uint8_t suricata_ctl_flags;
extern int max_pending_packets;
+static int pcap_max_read_packets = 0;
+
+/** max packets < 65536 */
+#define PCAP_FILE_MAX_PKTS 256
+
/**
* \brief Structure to hold thread specific variables.
*/
uint32_t errs;
ThreadVars *tv;
+
+ Packet *in_p;
+
+ Packet *array[PCAP_FILE_MAX_PKTS];
+ uint16_t array_idx;
} PcapThreadVars;
TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
SCLogDebug("user %p, h %p, pkt %p", user, h, pkt);
PcapThreadVars *ptv = (PcapThreadVars *)user;
- ThreadVars *tv = ptv->tv;
+ //ThreadVars *tv = ptv->tv;
+
+ Packet *p = NULL;
+ if (ptv->array_idx == 0) {
+ p = ptv->in_p;
+ } else {
+ p = PacketGetFromQueueOrAlloc();
+ }
+
+ if (p == NULL) {
+ return;
+ }
- Packet *p = tv->tmqh_in(tv);
p->ts.tv_sec = h->ts.tv_sec;
p->ts.tv_usec = h->ts.tv_usec;
memcpy(p->pkt, pkt, p->pktlen);
SCLogDebug("p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)", p->pktlen, *pkt, *p->pkt);
- /* pass on... */
- tv->tmqh_out(tv, p);
+ /* store the packet in our array */
+ ptv->array[ptv->array_idx] = p;
+ ptv->array_idx++;
}
/**
*/
TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
SCEnter();
+ uint16_t packet_q_len = 0;
+
PcapThreadVars *ptv = (PcapThreadVars *)data;
- /* Just read one packet at a time for now. */
+ /* make sure we have at least one packet in the packet pool, to prevent
+ * us from alloc'ing packets at line rate */
+ SCMutexLock(&packet_q.mutex_q);
+ packet_q_len = packet_q.len;
+ if (packet_q.len == 0) {
+ SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
+ }
+ packet_q_len = packet_q.len;
+ SCMutexUnlock(&packet_q.mutex_q);
+
+ if (postpq == NULL)
+ pcap_max_read_packets = 1;
+
+ ptv->array_idx = 0;
+ ptv->in_p = p;
+
int r = 0;
while (r == 0) {
- r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv);
- if (r < 0) {
- SCLogError(SC_ERR_PCAP_DISPATCH, "error code %"PRId32" %s",
- r, pcap_geterr(ptv->pcap_handle));
+ r = pcap_dispatch(ptv->pcap_handle, (pcap_max_read_packets < packet_q_len) ? pcap_max_read_packets : packet_q_len,
+ (pcap_handler)PcapCallback, (u_char *)ptv);
+ if (suricata_ctl_flags != 0) {
break;
}
+ }
+
+ uint16_t cnt = 0;
+ for (cnt = 0; cnt < ptv->array_idx; cnt++) {
+ Packet *pp = ptv->array[cnt];
- if (TmThreadsCheckFlag(tv, THV_KILL) || TmThreadsCheckFlag(tv, THV_PAUSE)) {
- SCLogInfo("pcap packet reading interrupted");
- SCReturnInt(TM_ECODE_OK);
+ /* enqueue all but the first in the postpq, the first
+ * pkt is handled by the tv "out handler" */
+ if (cnt > 0) {
+ PacketEnqueue(postpq, pp);
}
}
+ if (r < 0) {
+ SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
+ r, pcap_geterr(ptv->pcap_handle));
+
+ EngineStop();
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ if (suricata_ctl_flags != 0) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
SCReturnInt(TM_ECODE_OK);
}
/**
- * \brief Init function for RecievePcap.
+ * \brief Init function for ReceivePcap.
*
* This is a setup function for recieving packets
* via libpcap. There are two versions of this function
SCEnter();
char *tmpbpfstring;
+ /* use max_pending_packets as pcap read size unless it's bigger than
+ * our size limit */
+ pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
+ PCAP_FILE_MAX_PKTS : max_pending_packets;
+
if (initdata == NULL) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
SCReturnInt(TM_ECODE_FAILED);
char *tmpbpfstring;
+ /* use max_pending_packets as pcap read size unless it's bigger than
+ * our size limit */
+ pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
+ PCAP_FILE_MAX_PKTS : max_pending_packets;
+
if (initdata == NULL) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
SCReturnInt(TM_ECODE_FAILED);
/** queue handlers */
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
+ void (*InShutdownHandler)(struct ThreadVars_ *);
void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);
/** slot functions */
typedef struct Tmqh_ {
char *name;
Packet *(*InHandler)(ThreadVars *);
+ void (*InShutdownHandler)(ThreadVars *);
void (*OutHandler)(ThreadVars *, Packet *);
void *(*OutHandlerCtxSetup)(char *);
void (*OutHandlerCtxFree)(void *);
if (tmqh == NULL) goto error;
tv->tmqh_in = tmqh->InHandler;
+ tv->InShutdownHandler = tmqh->InShutdownHandler;
SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
}
if (tv->inq != NULL) {
/* signal the queue for the number of users */
+ if (tv->InShutdownHandler != NULL) {
+ tv->InShutdownHandler(tv);
+ }
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
break;
}
+ if (tv->InShutdownHandler != NULL) {
+ tv->InShutdownHandler(tv);
+ }
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
/* signal the queue for the number of users */
+ if (tv->InShutdownHandler != NULL) {
+ tv->InShutdownHandler(tv);
+ }
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
cnt++;
+ if (tv->InShutdownHandler != NULL) {
+ tv->InShutdownHandler(tv);
+ }
+
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
#include "util-ringbuffer.h"
-static RingBufferMrSw *ringbuffers[256];
+static RingBufferMrMw8 *ringbuffers[256];
Packet *TmqhInputRingBuffer(ThreadVars *t);
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p);
+void TmqhInputRingBufferShutdownHandler(ThreadVars *);
void TmqhRingBufferRegister (void) {
tmqh_table[TMQH_RINGBUFFER].name = "ringbuffer";
tmqh_table[TMQH_RINGBUFFER].InHandler = TmqhInputRingBuffer;
+ tmqh_table[TMQH_RINGBUFFER].InShutdownHandler = TmqhInputRingBufferShutdownHandler;
tmqh_table[TMQH_RINGBUFFER].OutHandler = TmqhOutputRingBuffer;
+ memset(ringbuffers, 0, sizeof(ringbuffers));
+
int i = 0;
for (i = 0; i < 256; i++) {
- ringbuffers[i] = RingBufferMrSwInit();
+ ringbuffers[i] = RingBufferMrMw8Init();
}
}
Packet *TmqhInputRingBuffer(ThreadVars *t)
{
- RingBufferMrSw *rb = ringbuffers[t->inq->id];
+ RingBufferMrMw8 *rb = ringbuffers[t->inq->id];
- Packet *p = (Packet *)RingBufferMrSwGet(rb);
+ Packet *p = (Packet *)RingBufferMrMw8Get(rb);
return p;
}
+void TmqhInputRingBufferShutdownHandler(ThreadVars *tv) {
+ if (tv == NULL || tv->inq == NULL) {
+ return;
+ }
+
+ RingBufferMrMw8 *rb = ringbuffers[tv->inq->id];
+ if (rb == NULL) {
+ return;
+ }
+
+ rb->shutdown = 1;
+}
+
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p)
{
- RingBufferMrSw *rb = ringbuffers[t->outq->id];
- RingBufferMrSwPut(rb, (void *)p);
+ RingBufferMrMw8 *rb = ringbuffers[t->outq->id];
+ RingBufferMrMw8Put(rb, (void *)p);
}
Packet *TmqhInputSimple(ThreadVars *t);
void TmqhOutputSimple(ThreadVars *t, Packet *p);
+void TmqhInputSimpleShutdownHandler(ThreadVars *);
void TmqhSimpleRegister (void) {
tmqh_table[TMQH_SIMPLE].name = "simple";
tmqh_table[TMQH_SIMPLE].InHandler = TmqhInputSimple;
+ tmqh_table[TMQH_SIMPLE].InShutdownHandler = TmqhInputSimpleShutdownHandler;
tmqh_table[TMQH_SIMPLE].OutHandler = TmqhOutputSimple;
}
}
}
+void TmqhInputSimpleShutdownHandler(ThreadVars *tv) {
+ int i;
+
+ if (tv == NULL || tv->inq == NULL) {
+ return;
+ }
+
+ for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
+ SCCondSignal(&trans_q[tv->inq->id].cond_q);
+}
+
void TmqhOutputSimple(ThreadVars *t, Packet *p)
{
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false");
#include "suricata.h"
#include "util-ringbuffer.h"
-/* suricata engine control flags */
-extern uint8_t suricata_ctl_flags;
+/* Multi Reader, Single Writer, 8 bits */
+
+RingBufferMrSw8 *RingBufferMrSw8Init(void) {
+ RingBufferMrSw8 *rb = SCMalloc(sizeof(RingBufferMrSw8));
+ if (rb == NULL) {
+ return NULL;
+ }
+
+ memset(rb, 0x00, sizeof(RingBufferMrSw8));
+ return rb;
+}
+
+void RingBufferMrSw8Destroy(RingBufferMrSw8 *rb) {
+ if (rb != NULL) {
+ SCFree(rb);
+ }
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrSw8Get(RingBufferMrSw8 *rb) {
+ void *ptr;
+ /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned char readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (rb->read == rb->write) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ usleep(1);
+ }
+
+ /* atomically update rb->read */
+ readp = rb->read - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (rb->read == rb->write)
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer
+ */
+int RingBufferMrSw8Put(RingBufferMrSw8 *rb, void *ptr) {
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+ while ((rb->write + 1) == rb->read) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ usleep(1);
+ }
+
+ rb->array[rb->write] = ptr;
+ __sync_fetch_and_add(&rb->write, 1);
+ return 0;
+}
/* Multi Reader, Single Writer */
retry:
while (rb->read == rb->write) {
/* break out if the engine wants to shutdown */
- if (suricata_ctl_flags != 0)
+ if (rb->shutdown != 0)
return NULL;
usleep(1);
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
/* break out if the engine wants to shutdown */
- if (suricata_ctl_flags != 0)
+ if (rb->shutdown != 0)
return -1;
usleep(1);
/* buffer is empty, wait... */
while (rb->read == rb->write) {
/* break out if the engine wants to shutdown */
- if (suricata_ctl_flags != 0)
+ if (rb->shutdown != 0)
return NULL;
usleep(1);
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
/* break out if the engine wants to shutdown */
- if (suricata_ctl_flags != 0)
+ if (rb->shutdown != 0)
return -1;
usleep(1);
return 0;
}
+/* Multi Reader, Multi Writer, 8 bits */
+
+RingBufferMrMw8 *RingBufferMrMw8Init(void) {
+ RingBufferMrMw8 *rb = SCMalloc(sizeof(RingBufferMrMw8));
+ if (rb == NULL) {
+ return NULL;
+ }
+
+ memset(rb, 0x00, sizeof(RingBufferMrMw8));
+
+ SCSpinInit(&rb->spin, 0);
+ return rb;
+}
+
+void RingBufferMrMw8Destroy(RingBufferMrMw8 *rb) {
+ if (rb != NULL) {
+ SCSpinDestroy(&rb->spin);
+ SCFree(rb);
+ }
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMw8Get(RingBufferMrMw8 *rb) {
+ void *ptr;
+ /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned char readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (rb->read == rb->write) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ usleep(1);
+ }
+
+ /* atomically update rb->read */
+ readp = rb->read - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (rb->read == rb->write)
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer.
+ *
+ * As we support multiple writers we need to protect 2 things:
+ * 1. writing the ptr to the array
+ * 2. incrementing the rb->write idx
+ *
+ * We can't do both at the same time in one atomic operation, so
+ * we need to (spin) lock it. We do increment rb->write atomically
+ * after that, so that we don't need to use the lock in our *Get
+ * function.
+ *
+ * \param rb the ringbuffer
+ * \param ptr ptr to store
+ *
+ * \retval 0 ok
+ * \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMw8Put(RingBufferMrMw8 *rb, void *ptr) {
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+retry:
+ while ((rb->write + 1) == rb->read) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ usleep(1);
+ }
+
+ /* get our lock */
+ SCSpinLock(&rb->spin);
+ /* if while we got our lock the buffer changed, we need to retry */
+ if ((rb->write + 1) == rb->read) {
+ SCSpinUnlock(&rb->spin);
+ goto retry;
+ }
+
+ SCLogDebug("rb->write %u, ptr %p", rb->write, ptr);
+
+ /* update the ring buffer */
+ rb->array[rb->write] = ptr;
+ __sync_fetch_and_add(&rb->write, 1);
+ SCSpinUnlock(&rb->spin);
+ SCLogDebug("ptr %p, done", ptr);
+ return 0;
+}
+
+/* Multi Reader, Multi Writer, 16 bits */
+
+RingBufferMrMw *RingBufferMrMwInit(void) {
+ RingBufferMrMw *rb = SCMalloc(sizeof(RingBufferMrMw));
+ if (rb == NULL) {
+ return NULL;
+ }
+
+ memset(rb, 0x00, sizeof(RingBufferMrMw));
+
+ SCSpinInit(&rb->spin, 0);
+ return rb;
+}
+
+void RingBufferMrMwDestroy(RingBufferMrMw *rb) {
+ if (rb != NULL) {
+ SCSpinDestroy(&rb->spin);
+ SCFree(rb);
+ }
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMwGet(RingBufferMrMw *rb) {
+ void *ptr;
+ /** local pointer for data races. If __sync_bool_compare_and_swap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned short readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (rb->read == rb->write) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ usleep(1);
+ }
+
+ /* atomically update rb->read */
+ readp = rb->read - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (rb->read == rb->write)
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer.
+ *
+ * As we support multiple writers we need to protect 2 things:
+ * 1. writing the ptr to the array
+ * 2. incrementing the rb->write idx
+ *
+ * We can't do both at the same time in one atomic operation, so
+ * we need to (spin) lock it. We do increment rb->write atomically
+ * after that, so that we don't need to use the lock in our *Get
+ * function.
+ *
+ * \param rb the ringbuffer
+ * \param ptr ptr to store
+ *
+ * \retval 0 ok
+ * \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMwPut(RingBufferMrMw *rb, void *ptr) {
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+retry:
+ while ((rb->write + 1) == rb->read) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ usleep(1);
+ }
+
+ /* get our lock */
+ SCSpinLock(&rb->spin);
+ /* if while we got our lock the buffer changed, we need to retry */
+ if ((rb->write + 1) == rb->read) {
+ SCSpinUnlock(&rb->spin);
+ goto retry;
+ }
+
+ SCLogDebug("rb->write %u, ptr %p", rb->write, ptr);
+
+ /* update the ring buffer */
+ rb->array[rb->write] = ptr;
+ __sync_fetch_and_add(&rb->write, 1);
+ SCSpinUnlock(&rb->spin);
+ SCLogDebug("ptr %p, done", ptr);
+ return 0;
+}
+
* read and write pointer. Only the read ptr needs atomic updating.
*/
+#define RING_BUFFER_MRSW_8_SIZE 256
+
+/** Multiple Reader, Single Writer ring buffer, fixed at
+ * 256 items so we can use unsigned char's that just
+ * wrap around */
+typedef struct RingBufferMrSw8_ {
+ unsigned char write; /**< idx where we put data */
+ unsigned char read; /**< idx where we read data */
+ uint8_t shutdown;
+ void *array[RING_BUFFER_MRSW_8_SIZE];
+} RingBufferMrSw8;
+
+void *RingBufferMrSw8Get(RingBufferMrSw8 *);
+int RingBufferMrSw8Put(RingBufferMrSw8 *, void *);
+RingBufferMrSw8 *RingBufferMrSw8Init(void);
+void RingBufferMrSw8Destroy(RingBufferMrSw8 *);
+
#define RING_BUFFER_MRSW_SIZE 65536
/** Multiple Reader, Single Writer ring buffer, fixed at
typedef struct RingBufferMrSw_ {
unsigned short write; /**< idx where we put data */
unsigned short read; /**< idx where we read data */
+ uint8_t shutdown;
void *array[RING_BUFFER_MRSW_SIZE];
} RingBufferMrSw;
typedef struct RingBufferSrSw_ {
unsigned short write; /**< idx where we put data */
unsigned short read; /**< idx where we read data */
+ uint8_t shutdown;
void *array[RING_BUFFER_SRSW_SIZE];
} RingBufferSrSw;
RingBufferSrSw *RingBufferSrSwInit(void);
void RingBufferSrSwDestroy(RingBufferSrSw *);
+#define RING_BUFFER_MRMW_8_SIZE 256
+
+/** Multiple Reader, Multi Writer ring buffer, fixed at
+ * 256 items so we can use unsigned char's that just
+ * wrap around */
+typedef struct RingBufferMrMw8_ {
+ unsigned char write; /**< idx where we put data */
+ unsigned char read; /**< idx where we read data */
+ uint8_t shutdown;
+ SCSpinlock spin; /**< lock protecting writes */
+ void *array[RING_BUFFER_MRMW_8_SIZE];
+} RingBufferMrMw8;
+
+void *RingBufferMrMw8Get(RingBufferMrMw8 *);
+int RingBufferMrMw8Put(RingBufferMrMw8 *, void *);
+RingBufferMrMw8 *RingBufferMrMw8Init(void);
+void RingBufferMrMw8Destroy(RingBufferMrMw8 *);
+
+#define RING_BUFFER_MRMW_SIZE 65536
+
+/** Multiple Reader, Multi Writer ring buffer, fixed at
+ * 65536 items so we can use unsigned char's that just
+ * wrap around */
+typedef struct RingBufferMrMw_ {
+ unsigned short write; /**< idx where we put data */
+ unsigned short read; /**< idx where we read data */
+ uint8_t shutdown;
+ SCSpinlock spin; /**< lock protecting writes */
+ void *array[RING_BUFFER_MRMW_SIZE];
+} RingBufferMrMw;
+
+void *RingBufferMrMwGet(RingBufferMrMw *);
+int RingBufferMrMwPut(RingBufferMrMw *, void *);
+RingBufferMrMw *RingBufferMrMwInit(void);
+void RingBufferMrMwDestroy(RingBufferMrMw *);
+
#endif /* __UTIL_RINGBUFFER_H__ */