return 0;
}
+int StreamIterator(Flow *f, TcpStream *stream, int close, void *cbdata, uint8_t iflags)
+{
+ int logged = 0;
+
+ /* optimization: don't iterate list if we've logged all,
+ * so check the last segment's flags */
+ if (stream->seg_list_tail != NULL &&
+ (!(stream->seg_list_tail->flags & SEGMENTTCP_FLAG_LOGAPI_PROCESSED)))
+ {
+ TcpSegment *seg = stream->seg_list;
+ while (seg) {
+ uint8_t flags = iflags;
+
+ if (seg->flags & SEGMENTTCP_FLAG_LOGAPI_PROCESSED) {
+ seg = seg->next;
+ continue;
+ }
+
+ if (SEQ_GT(seg->seq + seg->payload_len, stream->last_ack)) {
+ SCLogDebug("seg not (fully) acked yet");
+ break;
+ }
+
+ if (seg->seq == stream->isn + 1)
+ flags |= OUTPUT_STREAMING_FLAG_OPEN;
+ /* if we need to close and we're at the last segment in the list
+ * we add the 'close' flag so the logger can close up. */
+ if (close && seg->next == NULL)
+ flags |= OUTPUT_STREAMING_FLAG_CLOSE;
+
+ Streamer(cbdata, f, seg->payload, (uint32_t)seg->payload_len, flags);
+
+ seg->flags |= SEGMENTTCP_FLAG_LOGAPI_PROCESSED;
+
+ seg = seg->next;
+
+ logged = 1;
+ }
+ }
+
+ /* if we need to close we need to invoke the Streamer for sure. If we
+ * logged no segments, we call the Streamer with NULL data so it can
+ * close up. */
+ if (logged == 0 && close) {
+ Streamer(cbdata, f, NULL, 0, OUTPUT_STREAMING_FLAG_CLOSE);
+ }
+
+ return 0;
+}
+
static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data, PacketQueue *pq, PacketQueue *postpq)
{
BUG_ON(thread_data == NULL);
OutputStreamingLogger *logger = list;
OutputLoggerThreadStore *store = op_thread_data->store;
-// StreamerCallbackData streamer_cbdata = { logger, store, tv, p };
+ StreamerCallbackData streamer_cbdata = { logger, store, tv, p };
BUG_ON(logger == NULL && store != NULL);
BUG_ON(logger != NULL && store == NULL);
else
flags |= OUTPUT_STREAMING_FLAG_TOSERVER;
-// int file_close = (p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0;
-// int file_trunc = 0;
-
FLOWLOCK_WRLOCK(f);
+ TcpSession *ssn = f->protoctx;
+ if (ssn) {
+ int close = (ssn->state >= TCP_CLOSED);
+ close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
+ SCLogDebug("close ? %s", close ? "yes" : "no");
- logger = list;
- store = op_thread_data->store;
- while (logger && store) {
- BUG_ON(logger->LogFunc == NULL);
-
- SCLogDebug("logger %p", logger);
- PACKET_PROFILING_TMM_START(p, logger->module_id);
- //logger->LogFunc(tv, store->thread_data, (const Packet *)p, (const File *)ff,
- // (const FileData *)write_ffd, flags);
- PACKET_PROFILING_TMM_END(p, logger->module_id);
-
- logger = logger->next;
- store = store->next;
-
- BUG_ON(logger == NULL && store != NULL);
- BUG_ON(logger != NULL && store == NULL);
- }
+ TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
+ StreamIterator(p->flow, stream, close, (void *)&streamer_cbdata, flags);
+ }
FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}