From: Victor Julien Date: Sun, 8 Nov 2015 17:30:05 +0000 (+0100) Subject: streaming: buffer API X-Git-Tag: suricata-3.1RC1~121 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=81b2984c4e4b244afd87502bec94209a3d0d8094;p=thirdparty%2Fsuricata.git streaming: buffer API Add a new API to store data from streaming sources, like HTTP body processing or TCP data. Currently most of the code uses a pattern of list of data chunks (e.g. TcpSegment) that is reassembled into a large buffer on-demand. The Streaming Buffer API changes the logic to store the data in reassembled form from the start, with the segments/chunks pointing to the reassembled data. The main buffer storing the data slides forward, automatically or manually. The *NoTrack calls allows for a segmentless mode of operation. This approach has two main advantages: 1. accessing the reassembled data is virtually cost-free 2. reduction of allocations and memory management --- diff --git a/src/Makefile.am b/src/Makefile.am index b729388ee3..65f175e8c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -404,6 +404,7 @@ util-spm-bs.c util-spm-bs.h \ util-spm-hs.c util-spm-hs.h \ util-spm.c util-spm.h util-clock.h \ util-storage.c util-storage.h \ +util-streaming-buffer.c util-streaming-buffer.h \ util-strlcatu.c \ util-strlcpyu.c \ util-syslog.c util-syslog.h \ diff --git a/src/output-streaming.c b/src/output-streaming.c index 73aa115b40..4ae7c58c2e 100644 --- a/src/output-streaming.c +++ b/src/output-streaming.c @@ -98,7 +98,7 @@ typedef struct StreamerCallbackData_ { enum OutputStreamingType type; } StreamerCallbackData; -int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags) +int Streamer(void *cbdata, Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags) { StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata; BUG_ON(streamer_cbdata == NULL); diff --git a/src/runmode-unittests.c b/src/runmode-unittests.c index 523d81afdd..6f572d3fce 100644 --- a/src/runmode-unittests.c +++ b/src/runmode-unittests.c @@ -118,6 +118,8 @@ #include "defrag.h" #include "detect-engine-siggroup.h" +#include "util-streaming-buffer.h" + #endif /* UNITTESTS */ void RegisterAllModules(); @@ -278,6 +280,8 @@ void RunUnittests(int list_unittests, char *regex_arg) #endif AppLayerUnittestsRegister(); MimeDecRegisterTests(); + StreamingBufferRegisterTests(); + if (list_unittests) { UtListTests(regex_arg); } else { diff --git a/src/util-streaming-buffer.c b/src/util-streaming-buffer.c new file mode 100644 index 0000000000..d0df5f687d --- /dev/null +++ b/src/util-streaming-buffer.c @@ -0,0 +1,725 @@ +/* Copyright (C) 2015-2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +#include "suricata-common.h" +#include "util-streaming-buffer.h" +#include "util-unittest.h" +#include "util-print.h" + +/** + * \file + * + * \author Victor Julien + * + * \brief Streaming Buffer API + */ + +/* memory handling wrappers. If config doesn't define it's own set of + * functions, use the defaults */ +#define MALLOC(cfg, s) \ + (cfg)->Malloc ? (cfg)->Malloc((s)) : SCMalloc((s)) +#define CALLOC(cfg, n, s) \ + (cfg)->Calloc ? (cfg)->Calloc((n), (s)) : SCCalloc((n), (s)) +#define REALLOC(cfg, ptr, orig_s, s) \ + (cfg)->Realloc ? (cfg)->Realloc((ptr), (orig_s), (s)) : SCRealloc((ptr), (s)) +#define FREE(cfg, ptr, s) \ + (cfg)->Free ? (cfg)->Free((ptr), (s)) : SCFree((ptr)) + +static inline int InitBuffer(StreamingBuffer *sb) +{ + sb->buf = CALLOC(sb->cfg, 1, sb->cfg->buf_size); + if (sb->buf == NULL) { + return -1; + } + sb->buf_size = sb->cfg->buf_size; + return 0; +} + +StreamingBuffer *StreamingBufferInit(const StreamingBufferConfig *cfg) +{ + StreamingBuffer *sb = CALLOC(cfg, 1, sizeof(StreamingBuffer)); + if (sb != NULL) { + sb->buf_size = cfg->buf_size; + sb->cfg = cfg; + + if (cfg->buf_size > 0) { + if (InitBuffer(sb) == 0) { + return sb; + } + FREE(cfg, sb, sizeof(StreamingBuffer)); + /* implied buf_size == 0 */ + } else { + return sb; + } + } + return NULL; +} + +void StreamingBufferClear(StreamingBuffer *sb) +{ + if (sb != NULL) { + SCLogDebug("sb->buf_size %u max %u", sb->buf_size, sb->buf_size_max); + + if (sb->buf != NULL) { + FREE(sb->cfg, sb->buf, sb->buf_size); + sb->buf = NULL; + } + } +} + +void StreamingBufferFree(StreamingBuffer *sb) +{ + if (sb != NULL) { + StreamingBufferClear(sb); + FREE(sb->cfg, sb, sizeof(StreamingBuffer)); + } +} + +/** + * \internal + * \brief move buffer forward by 'slide' + */ +static void AutoSlide(StreamingBuffer *sb) +{ + uint32_t size = sb->cfg->buf_slide; + uint32_t slide = sb->buf_offset - size; + SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size); + memmove(sb->buf, sb->buf+slide, size); + sb->stream_offset += slide; + sb->buf_offset = size; +} + +static void GrowToSize(StreamingBuffer *sb, uint32_t size) +{ + /* try to grow in multiples of sb->cfg->buf_size */ + uint32_t x = sb->cfg->buf_size ? size % sb->cfg->buf_size : 0; + uint32_t base = size - x; + uint32_t grow = base + sb->cfg->buf_size; + + void *ptr = REALLOC(sb->cfg, sb->buf, sb->buf_size, grow); + if (ptr != NULL) { + /* for safe printing and general caution, lets memset the + * new data to 0 */ + size_t diff = grow - sb->buf_size; + void *new_mem = ((char *)ptr) + sb->buf_size; + memset(new_mem, 0, diff); + + sb->buf = ptr; + sb->buf_size = grow; + SCLogDebug("grown buffer to %u", grow); +#ifdef DEBUG + if (sb->buf_size > sb->buf_size_max) { + sb->buf_size_max = sb->buf_size; + } +#endif + } +} + +static void Grow(StreamingBuffer *sb) +{ + uint32_t grow = sb->buf_size * 2; + void *ptr = REALLOC(sb->cfg, sb->buf, sb->buf_size, grow); + if (ptr != NULL) { + /* for safe printing and general caution, lets memset the + * new data to 0 */ + size_t diff = grow - sb->buf_size; + void *new_mem = ((char *)ptr) + sb->buf_size; + memset(new_mem, 0, diff); + + sb->buf = ptr; + sb->buf_size = grow; + SCLogDebug("grown buffer to %u", grow); +#ifdef DEBUG + if (sb->buf_size > sb->buf_size_max) { + sb->buf_size_max = sb->buf_size; + } +#endif + } +} + +/** + * \brief slide to absolute offset + * \todo if sliding beyond window, we could perhaps reset? + */ +void StreamingBufferSlideToOffset(StreamingBuffer *sb, uint64_t offset) +{ + if (offset > sb->stream_offset && + offset <= sb->stream_offset + sb->buf_offset) + { + uint32_t slide = offset - sb->stream_offset; + uint32_t size = sb->buf_offset - slide; + SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size); + memmove(sb->buf, sb->buf+slide, size); + sb->stream_offset += slide; + sb->buf_offset = size; + } +} + +void StreamingBufferSlide(StreamingBuffer *sb, uint32_t slide) +{ + uint32_t size = sb->buf_offset - slide; + SCLogDebug("sliding %u forward, size of original buffer left after slide %u", slide, size); + memmove(sb->buf, sb->buf+slide, size); + sb->stream_offset += slide; + sb->buf_offset = size; +} + +#define DATA_FITS(sb, len) \ + ((sb)->buf_offset + (len) <= (sb)->buf_size) + +StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb, const uint8_t *data, uint32_t data_len) +{ + if (sb->buf == NULL) { + if (InitBuffer(sb) == -1) + return NULL; + } + + StreamingBufferSegment *seg = CALLOC(sb->cfg, 1, sizeof(StreamingBufferSegment)); + if (seg != NULL) { + if (!DATA_FITS(sb, data_len)) { + if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) + AutoSlide(sb); + if (sb->buf_size == 0) { + GrowToSize(sb, data_len); + } else { + while (!DATA_FITS(sb, data_len)) { + Grow(sb); + } + } + } + if (!DATA_FITS(sb, data_len)) { + FREE(sb->cfg, seg, sizeof(StreamingBufferSegment)); + return NULL; + } + + memcpy(sb->buf + sb->buf_offset, data, data_len); + seg->stream_offset = sb->stream_offset + sb->buf_offset; + seg->segment_len = data_len; + sb->buf_offset += data_len; + return seg; + } + return NULL; +} + +void StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg, + const uint8_t *data, uint32_t data_len) +{ + BUG_ON(seg == NULL); + + if (sb->buf == NULL) { + if (InitBuffer(sb) == -1) + return; + } + + if (!DATA_FITS(sb, data_len)) { + if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) + AutoSlide(sb); + if (sb->buf_size == 0) { + GrowToSize(sb, data_len); + } else { + while (!DATA_FITS(sb, data_len)) { + Grow(sb); + } + } + } + if (!DATA_FITS(sb, data_len)) { + return; + } + + memcpy(sb->buf + sb->buf_offset, data, data_len); + seg->stream_offset = sb->stream_offset + sb->buf_offset; + seg->segment_len = data_len; + sb->buf_offset += data_len; +} + +/** + * \brief add data w/o tracking a segment + */ +void StreamingBufferAppendNoTrack(StreamingBuffer *sb, + const uint8_t *data, uint32_t data_len) +{ + if (sb->buf == NULL) { + if (InitBuffer(sb) == -1) + return; + } + + if (!DATA_FITS(sb, data_len)) { + if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) + AutoSlide(sb); + if (sb->buf_size == 0) { + GrowToSize(sb, data_len); + } else { + while (!DATA_FITS(sb, data_len)) { + Grow(sb); + } + } + } + if (!DATA_FITS(sb, data_len)) { + return; + } + + memcpy(sb->buf + sb->buf_offset, data, data_len); + sb->buf_offset += data_len; +} + +#define DATA_FITS_AT_OFFSET(sb, len, offset) \ + ((offset) + (len) <= (sb)->buf_size) + +/** + * \param offset offset relative to StreamingBuffer::stream_offset + */ +void StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg, + const uint8_t *data, uint32_t data_len, + uint64_t offset) +{ + BUG_ON(seg == NULL); + + if (offset < sb->stream_offset) + return; + + if (sb->buf == NULL) { + if (InitBuffer(sb) == -1) + return; + } + + uint32_t rel_offset = offset - sb->stream_offset; + if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) { + if (sb->cfg->flags & STREAMING_BUFFER_AUTOSLIDE) { + AutoSlide(sb); + rel_offset = offset - sb->stream_offset; + } + if (!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)) { + GrowToSize(sb, (rel_offset + data_len)); + } + } + BUG_ON(!DATA_FITS_AT_OFFSET(sb, data_len, rel_offset)); + + memcpy(sb->buf + rel_offset, data, data_len); + seg->stream_offset = offset; + seg->segment_len = data_len; + if (rel_offset + data_len > sb->buf_offset) + sb->buf_offset = rel_offset + data_len; + + +} + +int StreamingBufferSegmentIsBeforeWindow(const StreamingBuffer *sb, + const StreamingBufferSegment *seg) +{ + if (seg->stream_offset < sb->stream_offset) { + if (seg->stream_offset + seg->segment_len <= sb->stream_offset) { + return 1; + } + } + return 0; +} + +void StreamingBufferSegmentGetData(const StreamingBuffer *sb, + const StreamingBufferSegment *seg, + const uint8_t **data, uint32_t *data_len) +{ + if (seg->stream_offset >= sb->stream_offset) { + uint64_t offset = seg->stream_offset - sb->stream_offset; + *data = sb->buf + offset; + if (offset + seg->segment_len > sb->buf_size) + *data_len = sb->buf_size - offset; + else + *data_len = seg->segment_len; + return; + } else { + uint64_t offset = sb->stream_offset - seg->stream_offset; + if (offset < seg->segment_len) { + *data = sb->buf; + *data_len = seg->segment_len - offset; + return; + } + } + *data = NULL; + *data_len = 0; + return; +} + +/** + * \retval 1 data is the same + * \retval 0 data is different + */ +int StreamingBufferSegmentCompareRawData(const StreamingBuffer *sb, + const StreamingBufferSegment *seg, + const uint8_t *rawdata, uint32_t rawdata_len) +{ + const uint8_t *segdata = NULL; + uint32_t segdata_len = 0; + StreamingBufferSegmentGetData(sb, seg, &segdata, &segdata_len); + if (segdata && segdata_len && + segdata_len == rawdata_len && + memcmp(segdata, rawdata, segdata_len) == 0) + { + return 1; + } + return 0; +} + +int StreamingBufferGetData(const StreamingBuffer *sb, + const uint8_t **data, uint32_t *data_len, + uint64_t *stream_offset) +{ + if (sb != NULL && sb->buf != NULL) { + *data = sb->buf; + *data_len = sb->buf_offset; + *stream_offset = sb->stream_offset; + return 1; + } else { + *data = NULL; + *data_len = 0; + *stream_offset = 0; + return 0; + } +} + +int StreamingBufferGetDataAtOffset (const StreamingBuffer *sb, + const uint8_t **data, uint32_t *data_len, + uint64_t offset) +{ + if (sb != NULL && sb->buf != NULL && + offset >= sb->stream_offset && + offset < (sb->stream_offset + sb->buf_offset)) + { + uint32_t skip = offset - sb->stream_offset; + *data = sb->buf + skip; + *data_len = sb->buf_offset - skip; + return 1; + } else { + *data = NULL; + *data_len = 0; + return 0; + } +} + +/** + * \retval 1 data is the same + * \retval 0 data is different + */ +int StreamingBufferCompareRawData(const StreamingBuffer *sb, + const uint8_t *rawdata, uint32_t rawdata_len) +{ + const uint8_t *sbdata = NULL; + uint32_t sbdata_len = 0; + uint64_t offset = 0; + StreamingBufferGetData(sb, &sbdata, &sbdata_len, &offset); + if (offset == 0 && + sbdata && sbdata_len && + sbdata_len == rawdata_len && + memcmp(sbdata, rawdata, sbdata_len) == 0) + { + return 1; + } + SCLogInfo("sbdata_len %u, offset %u", sbdata_len, (uint)offset); + PrintRawDataFp(stdout, sbdata,sbdata_len); + return 0; +} + +void Dump(StreamingBuffer *sb) +{ + PrintRawDataFp(stdout, sb->buf, sb->buf_offset); +} + +void DumpSegment(StreamingBuffer *sb, StreamingBufferSegment *seg) +{ + const uint8_t *data = NULL; + uint32_t data_len = 0; + StreamingBufferSegmentGetData(sb, seg, &data, &data_len); + if (data && data_len) { + PrintRawDataFp(stdout, data, data_len); + } +} + +#ifdef UNITTESTS +static int StreamingBufferTest01(void) +{ + StreamingBufferConfig cfg = { STREAMING_BUFFER_AUTOSLIDE, 8, 16, NULL, NULL, NULL, NULL }; + StreamingBuffer *sb = StreamingBufferInit(&cfg); + FAIL_IF(sb == NULL); + + StreamingBufferSegment *seg1 = StreamingBufferAppendRaw(sb, (const uint8_t *)"ABCDEFGH", 8); + StreamingBufferSegment *seg2 = StreamingBufferAppendRaw(sb, (const uint8_t *)"01234567", 8); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 16); + FAIL_IF(seg1->stream_offset != 0); + FAIL_IF(seg2->stream_offset != 8); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg1,(const uint8_t *)"ABCDEFGH", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg2,(const uint8_t *)"01234567", 8)); + Dump(sb); + + StreamingBufferSegment *seg3 = StreamingBufferAppendRaw(sb, (const uint8_t *)"QWERTY", 6); + FAIL_IF(sb->stream_offset != 8); + FAIL_IF(sb->buf_offset != 14); + FAIL_IF(seg3->stream_offset != 16); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg3,(const uint8_t *)"QWERTY", 6)); + Dump(sb); + + StreamingBufferSegment *seg4 = StreamingBufferAppendRaw(sb, (const uint8_t *)"KLM", 3); + FAIL_IF(sb->stream_offset != 14); + FAIL_IF(sb->buf_offset != 11); + FAIL_IF(seg4->stream_offset != 22); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg4,(const uint8_t *)"KLM", 3)); + Dump(sb); + + StreamingBufferSegment *seg5 = StreamingBufferAppendRaw(sb, (const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27); + FAIL_IF(sb->stream_offset != 17); + FAIL_IF(sb->buf_offset != 35); + FAIL_IF(seg5->stream_offset != 25); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1)); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg5)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg5,(const uint8_t *)"!@#$%^&*()_+<>?/,.;:'[]{}-=", 27)); + Dump(sb); + + StreamingBufferSegment *seg6 = StreamingBufferAppendRaw(sb, (const uint8_t *)"UVWXYZ", 6); + FAIL_IF(sb->stream_offset != 17); + FAIL_IF(sb->buf_offset != 41); + FAIL_IF(seg6->stream_offset != 52); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg1)); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg3)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg4)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg5)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,seg6)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,seg6,(const uint8_t *)"UVWXYZ", 6)); + Dump(sb); + + SCFree(seg1); + SCFree(seg2); + SCFree(seg3); + SCFree(seg4); + SCFree(seg5); + SCFree(seg6); + StreamingBufferFree(sb); + PASS; +} + +static int StreamingBufferTest02(void) +{ + StreamingBufferConfig cfg = { 0, 8, 24, NULL, NULL, NULL, NULL }; + StreamingBuffer *sb = StreamingBufferInit(&cfg); + FAIL_IF(sb == NULL); + + StreamingBufferSegment seg1; + StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8); + StreamingBufferSegment seg2; + StreamingBufferAppend(sb, &seg2, (const uint8_t *)"01234567", 8); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 16); + FAIL_IF(seg1.stream_offset != 0); + FAIL_IF(seg2.stream_offset != 8); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + + StreamingBufferSlide(sb, 6); + + StreamingBufferSegment seg3; + StreamingBufferAppend(sb, &seg3, (const uint8_t *)"QWERTY", 6); + FAIL_IF(sb->stream_offset != 6); + FAIL_IF(sb->buf_offset != 16); + FAIL_IF(seg3.stream_offset != 16); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + + StreamingBufferSlide(sb, 6); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + + StreamingBufferFree(sb); + PASS; +} + +static int StreamingBufferTest03(void) +{ + StreamingBufferConfig cfg = { 0, 8, 24, NULL, NULL, NULL, NULL }; + StreamingBuffer *sb = StreamingBufferInit(&cfg); + FAIL_IF(sb == NULL); + + StreamingBufferSegment seg1; + StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8); + StreamingBufferSegment seg2; + StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 22); + FAIL_IF(seg1.stream_offset != 0); + FAIL_IF(seg2.stream_offset != 14); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + + StreamingBufferSegment seg3; + StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"QWERTY", 6, 8); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 22); + FAIL_IF(seg3.stream_offset != 8); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + + StreamingBufferSlide(sb, 10); + FAIL_IF(!StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + + StreamingBufferFree(sb); + PASS; +} + +static int StreamingBufferTest04(void) +{ + StreamingBufferConfig cfg = { 0, 8, 16, NULL, NULL, NULL, NULL }; + StreamingBuffer *sb = StreamingBufferInit(&cfg); + FAIL_IF(sb == NULL); + + StreamingBufferSegment seg1; + StreamingBufferAppend(sb, &seg1, (const uint8_t *)"ABCDEFGH", 8); + StreamingBufferSegment seg2; + StreamingBufferInsertAt(sb, &seg2, (const uint8_t *)"01234567", 8, 14); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 22); + FAIL_IF(seg1.stream_offset != 0); + FAIL_IF(seg2.stream_offset != 14); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + + StreamingBufferSegment seg3; + StreamingBufferInsertAt(sb, &seg3, (const uint8_t *)"QWERTY", 6, 8); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 22); + FAIL_IF(seg3.stream_offset != 8); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + + /* far ahead of curve: */ + StreamingBufferSegment seg4; + StreamingBufferInsertAt(sb, &seg4, (const uint8_t *)"XYZ", 3, 124); + FAIL_IF(sb->stream_offset != 0); + FAIL_IF(sb->buf_offset != 127); + FAIL_IF(sb->buf_size != 128); + FAIL_IF(seg4.stream_offset != 124); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg3)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(sb,&seg4)); + Dump(sb); + DumpSegment(sb, &seg1); + DumpSegment(sb, &seg2); + DumpSegment(sb, &seg3); + DumpSegment(sb, &seg4); + + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg1,(const uint8_t *)"ABCDEFGH", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg2,(const uint8_t *)"01234567", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg3,(const uint8_t *)"QWERTY", 6)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(sb,&seg4,(const uint8_t *)"XYZ", 3)); + + StreamingBufferFree(sb); + PASS; +} + +static int StreamingBufferTest05(void) +{ + StreamingBufferConfig cfg = { STREAMING_BUFFER_AUTOSLIDE, 8, 32, NULL, NULL, NULL, NULL }; + StreamingBuffer sb = STREAMING_BUFFER_INITIALIZER(&cfg); + + StreamingBufferSegment *seg1 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"AAAAAAAA", 8); + StreamingBufferSegment *seg2 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"BBBBBBBB", 8); + StreamingBufferSegment *seg3 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"CCCCCCCC", 8); + StreamingBufferSegment *seg4 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"DDDDDDDD", 8); + FAIL_IF(sb.stream_offset != 0); + FAIL_IF(sb.buf_offset != 32); + FAIL_IF(seg1->stream_offset != 0); + FAIL_IF(seg2->stream_offset != 8); + FAIL_IF(seg3->stream_offset != 16); + FAIL_IF(seg4->stream_offset != 24); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg1)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg2)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg3)); + FAIL_IF(StreamingBufferSegmentIsBeforeWindow(&sb,seg4)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg1,(const uint8_t *)"AAAAAAAA", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg2,(const uint8_t *)"BBBBBBBB", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg3,(const uint8_t *)"CCCCCCCC", 8)); + FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg4,(const uint8_t *)"DDDDDDDD", 8)); + Dump(&sb); + StreamingBufferSegment *seg5 = StreamingBufferAppendRaw(&sb, (const uint8_t *)"EEEEEEEE", 8); + FAIL_IF(!StreamingBufferSegmentCompareRawData(&sb,seg5,(const uint8_t *)"EEEEEEEE", 8)); + Dump(&sb); + + SCFree(seg1); + SCFree(seg2); + SCFree(seg3); + SCFree(seg4); + SCFree(seg5); + StreamingBufferClear(&sb); + PASS; +} +#endif + +void StreamingBufferRegisterTests(void) +{ +#ifdef UNITTESTS + UtRegisterTest("StreamingBufferTest01", StreamingBufferTest01); + UtRegisterTest("StreamingBufferTest02", StreamingBufferTest02); + UtRegisterTest("StreamingBufferTest03", StreamingBufferTest03); + UtRegisterTest("StreamingBufferTest04", StreamingBufferTest04); + UtRegisterTest("StreamingBufferTest05", StreamingBufferTest05); +#endif +} diff --git a/src/util-streaming-buffer.h b/src/util-streaming-buffer.h new file mode 100644 index 0000000000..d35e1aaca4 --- /dev/null +++ b/src/util-streaming-buffer.h @@ -0,0 +1,140 @@ +/* Copyright (C) 2015-2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/* + * This API is meant to be used with streaming data. A single memory + * block is used to store the data. StreamingBufferSegment points to + * chunk of data in the single StreamingBuffer. It points by offset + * and length, so no pointers. The buffer is resized on demand and + * slides forward, either automatically or manually. + * + * When a segment needs it's data it uses StreamingBufferSegmentGetData + * which takes care of checking if the segment still has a valid offset + * and length. + * + * The StreamingBuffer::stream_offset is an absolute offset since the + * start of the data streaming. + * + * Similarly, StreamingBufferSegment::stream_offset is also an absolute + * offset. + * + * Using the segments is optional. + * + * + * stream_offset buf_offset stream_offset + buf_size + * ^ ^ ^ + * | | | + * | | | + * +--------------------------------------------+ + * | data | empty | + * | xxxxxxxxxx | | + * +------^--------^--------+-------------------+ + * | | + * | | + * | | + * | | + * | | + * +------+--------+-------+ + * | StreamingBufferSegment| + * +-----------+-----------+ + * | offset | len | + * +-----------+-----------+ + */ + + +#ifndef __UTIL_STREAMING_BUFFER_H__ +#define __UTIL_STREAMING_BUFFER_H__ + +#define STREAMING_BUFFER_NOFLAGS 0 +#define STREAMING_BUFFER_AUTOSLIDE (1<<0) + +typedef struct StreamingBufferConfig_ { + uint32_t flags; + uint32_t buf_slide; + uint32_t buf_size; + void *(*Malloc)(size_t size); + void *(*Calloc)(size_t n, size_t size); + void *(*Realloc)(void *ptr, size_t orig_size, size_t size); + void (*Free)(void *ptr, size_t size); +} StreamingBufferConfig; + +#define STREAMING_BUFFER_CONFIG_INITIALIZER { 0, 0, 0, NULL, NULL, NULL, NULL, } + +typedef struct StreamingBuffer_ { + const StreamingBufferConfig *cfg; + uint64_t stream_offset; /**< offset of the start of the memory block */ + + uint8_t *buf; /**< memory block for reassembly */ + uint32_t buf_size; /**< size of memory block */ + uint32_t buf_offset; /**< how far we are in buf_size */ +#ifdef DEBUG + uint32_t buf_size_max; +#endif +} StreamingBuffer; + +#ifndef DEBUG +#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, }; +#else +#define STREAMING_BUFFER_INITIALIZER(cfg) { (cfg), 0, NULL, 0, 0, 0 }; +#endif + +typedef struct StreamingBufferSegment_ { + uint64_t stream_offset; + uint32_t segment_len; +} __attribute__((__packed__)) StreamingBufferSegment; + +StreamingBuffer *StreamingBufferInit(const StreamingBufferConfig *cfg); +void StreamingBufferClear(StreamingBuffer *sb); +void StreamingBufferFree(StreamingBuffer *sb); + +void StreamingBufferSlide(StreamingBuffer *sb, uint32_t slide); +void StreamingBufferSlideToOffset(StreamingBuffer *sb, uint64_t offset); + +StreamingBufferSegment *StreamingBufferAppendRaw(StreamingBuffer *sb, + const uint8_t *data, uint32_t data_len); +void StreamingBufferAppend(StreamingBuffer *sb, StreamingBufferSegment *seg, + const uint8_t *data, uint32_t data_len); +void StreamingBufferAppendNoTrack(StreamingBuffer *sb, + const uint8_t *data, uint32_t data_len); +void StreamingBufferInsertAt(StreamingBuffer *sb, StreamingBufferSegment *seg, + const uint8_t *data, uint32_t data_len, + uint64_t offset); + +void StreamingBufferSegmentGetData(const StreamingBuffer *sb, + const StreamingBufferSegment *seg, + const uint8_t **data, uint32_t *data_len); + +int StreamingBufferSegmentCompareRawData(const StreamingBuffer *sb, + const StreamingBufferSegment *seg, + const uint8_t *rawdata, uint32_t rawdata_len); +int StreamingBufferCompareRawData(const StreamingBuffer *sb, + const uint8_t *rawdata, uint32_t rawdata_len); + +int StreamingBufferGetData(const StreamingBuffer *sb, + const uint8_t **data, uint32_t *data_len, + uint64_t *stream_offset); + +int StreamingBufferGetDataAtOffset (const StreamingBuffer *sb, + const uint8_t **data, uint32_t *data_len, + uint64_t offset); + +int StreamingBufferSegmentIsBeforeWindow(const StreamingBuffer *sb, + const StreamingBufferSegment *seg); + +void StreamingBufferRegisterTests(void); + +#endif /* __UTIL_STREAMING_BUFFER_H__ */