]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
streaming: buffer API
authorVictor Julien <victor@inliniac.net>
Sun, 8 Nov 2015 17:30:05 +0000 (18:30 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 20 May 2016 10:32:39 +0000 (12:32 +0200)
Add a new API to store data from streaming sources, like HTTP body
processing or TCP data.

Currently most of the code uses a pattern of list of data chunks
(e.g. TcpSegment) that is reassembled into a large buffer on-demand.

The Streaming Buffer API changes the logic to store the data in
reassembled form from the start, with the segments/chunks pointing
to the reassembled data.

The main buffer storing the data slides forward, automatically or
manually. The *NoTrack calls allows for a segmentless mode of
operation.

This approach has two main advantages:

1. accessing the reassembled data is virtually cost-free
2. reduction of allocations and memory management

src/Makefile.am
src/output-streaming.c
src/runmode-unittests.c
src/util-streaming-buffer.c [new file with mode: 0644]
src/util-streaming-buffer.h [new file with mode: 0644]

index b729388ee337f0cdfa0b06b73d7850c1999977a6..65f175e8c512a9b7a568ce05ec2da188ab4ba77a 100644 (file)
@@ -404,6 +404,7 @@ util-spm-bs.c util-spm-bs.h \
 util-spm-hs.c util-spm-hs.h \
 util-spm.c util-spm.h util-clock.h \
 util-storage.c util-storage.h \
+util-streaming-buffer.c util-streaming-buffer.h \
 util-strlcatu.c \
 util-strlcpyu.c \
 util-syslog.c util-syslog.h \
index 73aa115b40305c88d454a225925bee4da8b67402..4ae7c58c2e5e18e5957294226cb50dad0cd8ce83 100644 (file)
@@ -98,7 +98,7 @@ typedef struct StreamerCallbackData_ {
     enum OutputStreamingType type;
 } StreamerCallbackData;
 
-int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
+int Streamer(void *cbdata, Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
 {
     StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata;
     BUG_ON(streamer_cbdata == NULL);
index 523d81afddfaad3fd962137aefcacbc7d09e464b..6f572d3fce1be594608a41b5338452942ecb5bb2 100644 (file)
 #include "defrag.h"
 #include "detect-engine-siggroup.h"
 
+#include "util-streaming-buffer.h"
+
 #endif /* UNITTESTS */
 
 void RegisterAllModules();
@@ -278,6 +280,8 @@ void RunUnittests(int list_unittests, char *regex_arg)
 #endif
     AppLayerUnittestsRegister();
     MimeDecRegisterTests();
+    StreamingBufferRegisterTests();
+
     if (list_unittests) {
         UtListTests(regex_arg);
     } else {
diff --git a/src/util-streaming-buffer.c b/src/util-streaming-buffer.c
new file mode 100644 (file)
index 0000000..d0df5f6
--- /dev/null
@@ -0,0 +1,725 @@
+/* Copyright (C) 2015-2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#include "suricata-common.h"
+#include "util-streaming-buffer.h"
+#include "util-unittest.h"
+#include "util-print.h"
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ *
+ *  \brief Streaming Buffer API
+ */
+
+/* memory handling wrappers. If config doesn't define it's own set of
+ * functions, use the defaults */
+#define MALLOC(cfg, s) \
+    (cfg)->Malloc ? (cfg)->Malloc((s)) : SCMalloc((s))
+#define CALLOC(cfg, n, s) \
+    (cfg)->Calloc ? (cfg)->Calloc((n), (s)) : SCCalloc((n), (s))
+#define REALLOC(cfg, ptr, orig_s, s) \
+    (cfg)->Realloc ? (cfg)->Realloc((ptr), (orig_s), (s)) : SCRealloc((ptr), (s))
+#define FREE(cfg, ptr, s) \
+    (cfg)->Free ? (cfg)->Free((ptr), (s)) : SCFree((ptr))
+
+static inline int InitBuffer(StreamingBuffer *sb)
+{
+    sb->buf = CALLOC(sb->cfg, 1, sb->cfg->buf_size);
+    if (sb->buf == NULL) {
+        return -1;
+    }
+    sb->buf_size = sb->cfg->buf_size;
+    return 0;
+}
+
+StreamingBuffer *StreamingBufferInit(const StreamingBufferConfig *cfg)
+{
+    StreamingBuffer *sb = CALLOC(cfg, 1, sizeof(StreamingBuffer));
+    if (sb != NULL) {
+        sb->buf_size = cfg->buf_size;
+        sb->cfg = cfg;
+
+        if (cfg->buf_size > 0) {
+            if (InitBuffer(sb) == 0) {
+                return sb;
+            }
+            FREE(cfg, sb, sizeof(StreamingBuffer));
+        /* implied buf_size == 0 */
+        } else {
+            return sb;
+        }
+    }
+    return NULL;
+}
+
+void StreamingBufferClear(StreamingBuffer *sb)
+{
+    if (sb != NULL) {
+        SCLogDebug("sb->buf_size %u max %u", sb->buf_size, sb->buf_size_max);
+
+        if (sb->buf != NULL) {
+            FREE(sb->cfg, sb->buf, sb->buf_size);
+            sb->buf = NULL;
+        }
+    }
+}
+
+void StreamingBufferFree(StreamingBuffer *sb)
+{
+    if (sb != NULL) {
+        StreamingBufferClear(sb);
+        FREE(sb->cfg, sb, sizeof(StreamingBuffer));
+    }
+}
+
+/**
+ * \internal
+ * \brief move buffer forward by 'slide'
+ */
+static void AutoSlide(StreamingBuffer *sb)
+{
+    uint32_t size = sb->cfg->buf_slide;
+    uint32_t slide = sb->buf_offset - size;
+    SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size);
+    memmove(sb->buf, sb->buf+slide, size);
+    sb->stream_offset += slide;
+    sb->buf_offset = size;
+}
+
+static void GrowToSize(StreamingBuffer *sb, uint32_t size)
+{
+    /* try to grow in multiples of sb->cfg->buf_size */
+    uint32_t x = sb->cfg->buf_size ? size % sb->cfg->buf_size : 0;
+    uint32_t base = size - x;
+    uint32_t grow = base + sb->cfg->buf_size;
+
+    void *ptr = REALLOC(sb->cfg, sb->buf, sb->buf_size, grow);
+    if (ptr != NULL) {
+        /* for safe printing and general caution, lets memset the
+         * new data to 0 */
+        size_t diff = grow - sb->buf_size;
+        void *new_mem = ((char *)ptr) + sb->buf_size;
+        memset(new_mem, 0, diff);
+
+        sb->buf = ptr;
+        sb->buf_size = grow;
+        SCLogDebug("grown buffer to %u", grow);
+#ifdef DEBUG
+        if (sb->buf_size > sb->buf_size_max) {
+            sb->buf_size_max = sb->buf_size;
+        }
+#endif
+    }
+}
+
+static void Grow(StreamingBuffer *sb)
+{
+    uint32_t grow = sb->buf_size * 2;
+    void *ptr = REALLOC(sb->cfg, sb->buf, sb->buf_size, grow);
+    if (ptr != NULL) {
+        /* for safe printing and general caution, lets memset the
+         * new data to 0 */
+        size_t diff = grow - sb->buf_size;
+        void *new_mem = ((char *)ptr) + sb->buf_size;
+        memset(new_mem, 0, diff);
+
+        sb->buf = ptr;
+        sb->buf_size = grow;
+        SCLogDebug("grown buffer to %u", grow);
+#ifdef DEBUG
+        if (sb->buf_size > sb->buf_size_max) {
+            sb->buf_size_max = sb->buf_size;
+        }
+#endif
+    }
+}
+
+/**
+ *  \brief slide to absolute offset
+ *  \todo if sliding beyond window, we could perhaps reset?
+ */
+void StreamingBufferSlideToOffset(StreamingBuffer *sb, uint64_t offset)
+{
+    if (offset > sb->stream_offset &&
+        offset <= sb->stream_offset + sb->buf_offset)
+    {
+        uint32_t slide = offset - sb->stream_offset;
+        uint32_t size = sb->buf_offset - slide;
+        SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size);
+        memmove(sb->buf, sb->buf+slide, size);
+        sb->stream_offset += slide;
+        sb->buf_offset = size;
+    }
+}
+
+void StreamingBufferSlide(StreamingBuffer *sb, uint32_t slide)
+{
+    uint32_t size = sb->buf_offset - slide;
+    SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size);
+    memmove(sb->buf, sb->buf+slide, size);
+    sb->stream_offset += slide;
+    sb->buf_offset = size;
+}
+
+#define DATA_FITS(sb, len) \
+    ((sb)->buf_offset + (len) <= (sb)->buf_size)
+
+StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb, const uint8_t *data, uint32_t data_len)
+{
+    if (sb->buf == NULL) {
+        if (InitBuffer(sb) == -1)
+            return NULL;
+    }
+
+    StreamingBufferSegment *seg = CALLOC(sb->cfg, 1, sizeof(StreamingBufferSegment));
+    if (seg != NULL) {
+        if (!DATA_FITS(sb, data_len)) {
+            if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE)
+                AutoSlide(sb);
+            if (sb->buf_size == 0) {
+                GrowToSize(sb, data_len);
+            } else {
+                while (!DATA_FITS(sb, data_len)) {
+                    Grow(sb);
+                }
+            }
+        }
+        if (!DATA_FITS(sb, data_len)) {
+            FREE(sb->cfg, seg, sizeof(StreamingBufferSegment));
+            return NULL;
+        }
+
+        memcpy(sb->buf + sb->buf_offset, data, data_len);
+        seg->stream_offset = sb->stream_offset + sb->buf_offset;
+        seg->segment_len = data_len;
+        sb->buf_offset += data_len;
+        return seg;
+    }
+    return NULL;
+}
+
+void StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg,
+                            const uint8_t *data, uint32_t data_len)
+{
+    BUG_ON(seg == NULL);
+
+    if (sb->buf == NULL) {
+        if (InitBuffer(sb) == -1)
+            return;
+    }
+
+    if (!DATA_FITS(sb, data_len)) {
+        if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE)
+            AutoSlide(sb);
+        if (sb->buf_size == 0) {
+            GrowToSize(sb, data_len);
+        } else {
+            while (!DATA_FITS(sb, data_len)) {
+                Grow(sb);
+            }
+        }
+    }
+    if (!DATA_FITS(sb, data_len)) {
+        return;
+    }
+
+    memcpy(sb->buf + sb->buf_offset, data, data_len);
+    seg->stream_offset = sb->stream_offset + sb->buf_offset;
+    seg->segment_len = data_len;
+    sb->buf_offset += data_len;
+}
+
+/**
+ *  \brief add data w/o tracking a segment
+ */
+void StreamingBufferAppendNoTrack(StreamingBuffer *sb,
+                                  const uint8_t *data, uint32_t data_len)
+{
+    if (sb->buf == NULL) {
+        if (InitBuffer(sb) == -1)
+            return;
+    }
+
+    if (!DATA_FITS(sb, data_len)) {
+        if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE)
+            AutoSlide(sb);
+        if (sb->buf_size == 0) {
+            GrowToSize(sb, data_len);
+        } else {
+            while (!DATA_FITS(sb, data_len)) {
+                Grow(sb);
+            }
+        }
+    }
+    if (!DATA_FITS(sb, data_len)) {
+        return;
+    }
+
+    memcpy(sb->buf + sb->buf_offset, data, data_len);
+    sb->buf_offset += data_len;
+}
+
+#define DATA_FITS_AT_OFFSET(sb, len, offset) \
+    ((offset) + (len) <= (sb)->buf_size)
+
+/**
+ *  \param offset offset relative to StreamingBuffer::stream_offset
+ */
+void StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg,
+                             const uint8_t *data, uint32_t data_len,
+                             uint64_t offset)
+{
+    BUG_ON(seg == NULL);
+
+    if (offset < sb->stream_offset)
+        return;
+
+    if (sb->buf == NULL) {
+        if (InitBuffer(sb) == -1)
+            return;
+    }
+
+    uint32_t rel_offset = offset - sb->stream_offset;
+    if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) {
+        if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) {
+            AutoSlide(sb);
+            rel_offset = offset - sb->stream_offset;
+        }
+        if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) {
+            GrowToSize(sb, (rel_offset + data_len));
+        }
+    }
+    BUG_ON(!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset));
+
+    memcpy(sb->buf + rel_offset, data, data_len);
+    seg->stream_offset = offset;
+    seg->segment_len = data_len;
+    if (rel_offset + data_len > sb->buf_offset)
+        sb->buf_offset = rel_offset + data_len;
+
+
+}
+
+int StreamingBufferSegmentIsBeforeWindow(const StreamingBuffer *sb,
+                                         const StreamingBufferSegment *seg)
+{
+    if (seg->stream_offset < sb->stream_offset) {
+        if (seg->stream_offset + seg->segment_len <= sb->stream_offset) {
+            return 1;
+        }
+    }
+    return 0;
+}
+
+void StreamingBufferSegmentGetData(const StreamingBuffer *sb,
+                                   const StreamingBufferSegment *seg,
+                                   const uint8_t **data, uint32_t *data_len)
+{
+    if (seg->stream_offset >= sb->stream_offset) {
+        uint64_t offset = seg->stream_offset - sb->stream_offset;
+        *data = sb->buf + offset;
+        if (offset + seg->segment_len > sb->buf_size)
+            *data_len = sb->buf_size - offset;
+        else
+            *data_len = seg->segment_len;
+        return;
+    } else {
+        uint64_t offset = sb->stream_offset - seg->stream_offset;
+        if (offset < seg->segment_len) {
+            *data = sb->buf;
+            *data_len = seg->segment_len - offset;
+            return;
+        }
+    }
+    *data = NULL;
+    *data_len = 0;
+    return;
+}
+
+/**
+ *  \retval 1 data is the same
+ *  \retval 0 data is different
+ */
+int StreamingBufferSegmentCompareRawData(const StreamingBuffer *sb,
+                                         const StreamingBufferSegment *seg,
+                                         const uint8_t *rawdata, uint32_t rawdata_len)
+{
+    const uint8_t *segdata = NULL;
+    uint32_t segdata_len = 0;
+    StreamingBufferSegmentGetData(sb, seg, &segdata, &segdata_len);
+    if (segdata && segdata_len &&
+        segdata_len == rawdata_len &&
+        memcmp(segdata, rawdata, segdata_len) == 0)
+    {
+        return 1;
+    }
+    return 0;
+}
+
+int StreamingBufferGetData(const StreamingBuffer *sb,
+        const uint8_t **data, uint32_t *data_len,
+        uint64_t *stream_offset)
+{
+    if (sb != NULL && sb->buf != NULL) {
+        *data = sb->buf;
+        *data_len = sb->buf_offset;
+        *stream_offset = sb->stream_offset;
+        return 1;
+    } else {
+        *data = NULL;
+        *data_len = 0;
+        *stream_offset = 0;
+        return 0;
+    }
+}
+
+int StreamingBufferGetDataAtOffset (const StreamingBuffer *sb,
+        const uint8_t **data, uint32_t *data_len,
+        uint64_t offset)
+{
+    if (sb != NULL && sb->buf != NULL &&
+            offset >= sb->stream_offset &&
+            offset < (sb->stream_offset + sb->buf_offset))
+    {
+        uint32_t skip = offset - sb->stream_offset;
+        *data = sb->buf + skip;
+        *data_len = sb->buf_offset - skip;
+        return 1;
+    } else {
+        *data = NULL;
+        *data_len = 0;
+        return 0;
+    }
+}
+
+/**
+ *  \retval 1 data is the same
+ *  \retval 0 data is different
+ */
+int StreamingBufferCompareRawData(const StreamingBuffer *sb,
+                                  const uint8_t *rawdata, uint32_t rawdata_len)
+{
+    const uint8_t *sbdata = NULL;
+    uint32_t sbdata_len = 0;
+    uint64_t offset = 0;
+    StreamingBufferGetData(sb, &sbdata, &sbdata_len, &offset);
+    if (offset == 0 &&
+        sbdata && sbdata_len &&
+        sbdata_len == rawdata_len &&
+        memcmp(sbdata, rawdata, sbdata_len) == 0)
+    {
+        return 1;
+    }
+    SCLogInfo("sbdata_len %u, offset %u", sbdata_len, (uint)offset);
+    PrintRawDataFp(stdout, sbdata,sbdata_len);
+    return 0;
+}
+
+void Dump(StreamingBuffer *sb)
+{
+    PrintRawDataFp(stdout, sb->buf, sb->buf_offset);
+}
+
+void DumpSegment(StreamingBuffer *sb, StreamingBufferSegment *seg)
+{
+    const uint8_t *data = NULL;
+    uint32_t data_len = 0;
+    StreamingBufferSegmentGetData(sb, seg, &data, &data_len);
+    if (data && data_len) {
+        PrintRawDataFp(stdout, data, data_len);
+    }
+}
+
+#ifdef UNITTESTS
+static int StreamingBufferTest01(void)
+{
+    StreamingBufferConfig cfg = { STREAMING_BUFFER_AUTOSLIDE, 8, 16, NULL, NULL, NULL, NULL };
+    StreamingBuffer *sb = StreamingBufferInit(&cfg);
+    FAIL_IF(sb == NULL);
+
+    StreamingBufferSegment *seg1 = StreamingBufferAppendRaw(sb, (const uint8_t *)"ABCDEFGH", 8);
+    StreamingBufferSegment *seg2 = StreamingBufferAppendRaw(sb, (const uint8_t *)"01234567", 8);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 16);
+    FAIL_IF(seg1->stream_offset != 0);
+    FAIL_IF(seg2->stream_offset != 8);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg1,(const uint8_t *)"ABCDEFGH", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg2,(const uint8_t *)"01234567", 8));
+    Dump(sb);
+
+    StreamingBufferSegment *seg3 = StreamingBufferAppendRaw(sb, (const uint8_t *)"QWERTY", 6);
+    FAIL_IF(sb->stream_offset != 8);
+    FAIL_IF(sb->buf_offset != 14);
+    FAIL_IF(seg3->stream_offset != 16);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg3,(const uint8_t *)"QWERTY", 6));
+    Dump(sb);
+
+    StreamingBufferSegment *seg4 = StreamingBufferAppendRaw(sb, (const uint8_t *)"KLM", 3);
+    FAIL_IF(sb->stream_offset != 14);
+    FAIL_IF(sb->buf_offset != 11);
+    FAIL_IF(seg4->stream_offset != 22);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg4,(const uint8_t *)"KLM", 3));
+    Dump(sb);
+
+    StreamingBufferSegment *seg5 = StreamingBufferAppendRaw(sb, (const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27);
+    FAIL_IF(sb->stream_offset != 17);
+    FAIL_IF(sb->buf_offset != 35);
+    FAIL_IF(seg5->stream_offset != 25);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1));
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg5));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg5,(const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27));
+    Dump(sb);
+
+    StreamingBufferSegment *seg6 = StreamingBufferAppendRaw(sb, (const uint8_t *)"UVWXYZ", 6);
+    FAIL_IF(sb->stream_offset != 17);
+    FAIL_IF(sb->buf_offset != 41);
+    FAIL_IF(seg6->stream_offset != 52);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1));
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg5));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg6));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg6,(const uint8_t *)"UVWXYZ", 6));
+    Dump(sb);
+
+    SCFree(seg1);
+    SCFree(seg2);
+    SCFree(seg3);
+    SCFree(seg4);
+    SCFree(seg5);
+    SCFree(seg6);
+    StreamingBufferFree(sb);
+    PASS;
+}
+
+static int StreamingBufferTest02(void)
+{
+    StreamingBufferConfig cfg = { 0, 8, 24, NULL, NULL, NULL, NULL };
+    StreamingBuffer *sb = StreamingBufferInit(&cfg);
+    FAIL_IF(sb == NULL);
+
+    StreamingBufferSegment seg1;
+    StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8);
+    StreamingBufferSegment seg2;
+    StreamingBufferAppend(sb, &seg2, (const uint8_t *)"01234567", 8);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 16);
+    FAIL_IF(seg1.stream_offset != 0);
+    FAIL_IF(seg2.stream_offset != 8);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+
+    StreamingBufferSlide(sb, 6);
+
+    StreamingBufferSegment seg3;
+    StreamingBufferAppend(sb, &seg3, (const uint8_t *)"QWERTY", 6);
+    FAIL_IF(sb->stream_offset != 6);
+    FAIL_IF(sb->buf_offset != 16);
+    FAIL_IF(seg3.stream_offset != 16);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+
+    StreamingBufferSlide(sb, 6);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+
+    StreamingBufferFree(sb);
+    PASS;
+}
+
+static int StreamingBufferTest03(void)
+{
+    StreamingBufferConfig cfg = { 0, 8, 24, NULL, NULL, NULL, NULL };
+    StreamingBuffer *sb = StreamingBufferInit(&cfg);
+    FAIL_IF(sb == NULL);
+
+    StreamingBufferSegment seg1;
+    StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8);
+    StreamingBufferSegment seg2;
+    StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 22);
+    FAIL_IF(seg1.stream_offset != 0);
+    FAIL_IF(seg2.stream_offset != 14);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+
+    StreamingBufferSegment seg3;
+    StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"QWERTY", 6, 8);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 22);
+    FAIL_IF(seg3.stream_offset != 8);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+
+    StreamingBufferSlide(sb, 10);
+    FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+
+    StreamingBufferFree(sb);
+    PASS;
+}
+
+static int StreamingBufferTest04(void)
+{
+    StreamingBufferConfig cfg = { 0, 8, 16, NULL, NULL, NULL, NULL };
+    StreamingBuffer *sb = StreamingBufferInit(&cfg);
+    FAIL_IF(sb == NULL);
+
+    StreamingBufferSegment seg1;
+    StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8);
+    StreamingBufferSegment seg2;
+    StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 22);
+    FAIL_IF(seg1.stream_offset != 0);
+    FAIL_IF(seg2.stream_offset != 14);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+
+    StreamingBufferSegment seg3;
+    StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"QWERTY", 6, 8);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 22);
+    FAIL_IF(seg3.stream_offset != 8);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+
+    /* far ahead of curve: */
+    StreamingBufferSegment seg4;
+    StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"XYZ", 3, 124);
+    FAIL_IF(sb->stream_offset != 0);
+    FAIL_IF(sb->buf_offset != 127);
+    FAIL_IF(sb->buf_size != 128);
+    FAIL_IF(seg4.stream_offset != 124);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg4));
+    Dump(sb);
+    DumpSegment(sb, &seg1);
+    DumpSegment(sb, &seg2);
+    DumpSegment(sb, &seg3);
+    DumpSegment(sb, &seg4);
+
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg1,(const uint8_t *)"ABCDEFGH", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg2,(const uint8_t *)"01234567", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg3,(const uint8_t *)"QWERTY", 6));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg4,(const uint8_t *)"XYZ", 3));
+
+    StreamingBufferFree(sb);
+    PASS;
+}
+
+static int StreamingBufferTest05(void)
+{
+    StreamingBufferConfig cfg = { STREAMING_BUFFER_AUTOSLIDE, 8, 32, NULL, NULL, NULL, NULL };
+    StreamingBuffer sb = STREAMING_BUFFER_INITIALIZER(&cfg);
+
+    StreamingBufferSegment *seg1 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"AAAAAAAA", 8);
+    StreamingBufferSegment *seg2 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"BBBBBBBB", 8);
+    StreamingBufferSegment *seg3 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"CCCCCCCC", 8);
+    StreamingBufferSegment *seg4 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"DDDDDDDD", 8);
+    FAIL_IF(sb.stream_offset != 0);
+    FAIL_IF(sb.buf_offset != 32);
+    FAIL_IF(seg1->stream_offset != 0);
+    FAIL_IF(seg2->stream_offset != 8);
+    FAIL_IF(seg3->stream_offset != 16);
+    FAIL_IF(seg4->stream_offset != 24);
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg1));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg2));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg3));
+    FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg4));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg1,(const uint8_t *)"AAAAAAAA", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg2,(const uint8_t *)"BBBBBBBB", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg3,(const uint8_t *)"CCCCCCCC", 8));
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg4,(const uint8_t *)"DDDDDDDD", 8));
+    Dump(&sb);
+    StreamingBufferSegment *seg5 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"EEEEEEEE", 8);
+    FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg5,(const uint8_t *)"EEEEEEEE", 8));
+    Dump(&sb);
+
+    SCFree(seg1);
+    SCFree(seg2);
+    SCFree(seg3);
+    SCFree(seg4);
+    SCFree(seg5);
+    StreamingBufferClear(&sb);
+    PASS;
+}
+#endif
+
+void StreamingBufferRegisterTests(void)
+{
+#ifdef UNITTESTS
+    UtRegisterTest("StreamingBufferTest01", StreamingBufferTest01);
+    UtRegisterTest("StreamingBufferTest02", StreamingBufferTest02);
+    UtRegisterTest("StreamingBufferTest03", StreamingBufferTest03);
+    UtRegisterTest("StreamingBufferTest04", StreamingBufferTest04);
+    UtRegisterTest("StreamingBufferTest05", StreamingBufferTest05);
+#endif
+}
diff --git a/src/util-streaming-buffer.h b/src/util-streaming-buffer.h
new file mode 100644 (file)
index 0000000..d35e1aa
--- /dev/null
@@ -0,0 +1,140 @@
+/* Copyright (C) 2015-2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/*
+ * This API is meant to be used with streaming data. A single memory
+ * block is used to store the data. StreamingBufferSegment points to
+ * chunk of data in the single StreamingBuffer. It points by offset
+ * and length, so no pointers. The buffer is resized on demand and
+ * slides forward, either automatically or manually.
+ *
+ * When a segment needs it's data it uses StreamingBufferSegmentGetData
+ * which takes care of checking if the segment still has a valid offset
+ * and length.
+ *
+ * The StreamingBuffer::stream_offset is an absolute offset since the
+ * start of the data streaming.
+ *
+ * Similarly, StreamingBufferSegment::stream_offset is also an absolute
+ * offset.
+ *
+ * Using the segments is optional.
+ *
+ *
+ * stream_offset            buf_offset          stream_offset + buf_size
+ * ^                        ^                   ^
+ * |                        |                   |
+ * |                        |                   |
+ * +--------------------------------------------+
+ * |         data           |     empty         |
+ * |      xxxxxxxxxx        |                   |
+ * +------^--------^--------+-------------------+
+ *        |        |
+ *        |        |
+ *        |        |
+ *        |        |
+ *        |        |
+ * +------+--------+-------+
+ * | StreamingBufferSegment|
+ * +-----------+-----------+
+ * | offset    | len       |
+ * +-----------+-----------+
+ */
+
+
+#ifndef __UTIL_STREAMING_BUFFER_H__
+#define __UTIL_STREAMING_BUFFER_H__
+
+#define STREAMING_BUFFER_NOFLAGS     0
+#define STREAMING_BUFFER_AUTOSLIDE  (1<<0)
+
+typedef struct StreamingBufferConfig_ {
+    uint32_t flags;
+    uint32_t buf_slide;
+    uint32_t buf_size;
+    void *(*Malloc)(size_t size);
+    void *(*Calloc)(size_t n, size_t size);
+    void *(*Realloc)(void *ptr, size_t orig_size, size_t size);
+    void (*Free)(void *ptr, size_t size);
+} StreamingBufferConfig;
+
+#define STREAMING_BUFFER_CONFIG_INITIALIZER { 0, 0, 0, NULL, NULL, NULL, NULL, }
+
+typedef struct StreamingBuffer_ {
+    const StreamingBufferConfig *cfg;
+    uint64_t stream_offset; /**< offset of the start of the memory block */
+
+    uint8_t *buf;           /**< memory block for reassembly */
+    uint32_t buf_size;      /**< size of memory block */
+    uint32_t buf_offset;    /**< how far we are in buf_size */
+#ifdef DEBUG
+    uint32_t buf_size_max;
+#endif
+} StreamingBuffer;
+
+#ifndef DEBUG
+#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, };
+#else
+#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, 0 };
+#endif
+
+typedef struct StreamingBufferSegment_ {
+    uint64_t stream_offset;
+    uint32_t segment_len;
+} __attribute__((__packed__)) StreamingBufferSegment;
+
+StreamingBuffer *StreamingBufferInit(const StreamingBufferConfig *cfg);
+void StreamingBufferClear(StreamingBuffer *sb);
+void StreamingBufferFree(StreamingBuffer *sb);
+
+void StreamingBufferSlide(StreamingBuffer *sb, uint32_t slide);
+void StreamingBufferSlideToOffset(StreamingBuffer *sb, uint64_t offset);
+
+StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb,
+        const uint8_t *data, uint32_t data_len);
+void StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg,
+        const uint8_t *data, uint32_t data_len);
+void StreamingBufferAppendNoTrack(StreamingBuffer *sb,
+        const uint8_t *data, uint32_t data_len);
+void StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg,
+                             const uint8_t *data, uint32_t data_len,
+                             uint64_t offset);
+
+void StreamingBufferSegmentGetData(const StreamingBuffer *sb,
+                                   const StreamingBufferSegment *seg,
+                                   const uint8_t **data, uint32_t *data_len);
+
+int StreamingBufferSegmentCompareRawData(const StreamingBuffer *sb,
+                                         const StreamingBufferSegment *seg,
+                                         const uint8_t *rawdata, uint32_t rawdata_len);
+int StreamingBufferCompareRawData(const StreamingBuffer *sb,
+                                  const uint8_t *rawdata, uint32_t rawdata_len);
+
+int StreamingBufferGetData(const StreamingBuffer *sb,
+        const uint8_t **data, uint32_t *data_len,
+        uint64_t *stream_offset);
+
+int StreamingBufferGetDataAtOffset (const StreamingBuffer *sb,
+        const uint8_t **data, uint32_t *data_len,
+        uint64_t offset);
+
+int StreamingBufferSegmentIsBeforeWindow(const StreamingBuffer *sb,
+                                         const StreamingBufferSegment *seg);
+
+void StreamingBufferRegisterTests(void);
+
+#endif /* __UTIL_STREAMING_BUFFER_H__ */