]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
app-layer: improve/fix updates logic
authorVictor Julien <vjulien@oisf.net>
Thu, 8 Jun 2023 16:37:44 +0000 (18:37 +0200)
committerVictor Julien <vjulien@oisf.net>
Mon, 12 Jun 2023 12:16:59 +0000 (14:16 +0200)
In 23323a961fac ("app-layer: reduce app cleanup and output-tx calls"), flag
was set per packet updating the app-layer state. However this was missing a
common pattern: in IDS mode most updates are done in the opposite direction
of the traffic due to updates getting triggered by ACK's. This meant that
file store processing might not happen for a long time, or at all. Also,
app layer cleanup might not be called, which includes file pruning.

This patch sets per flow set of flags to indicate app layer is (potentially)
updated. It sets this per direction, based on how the parsers were invoked.
If an ACK triggers an app update, the flow is tagged for the opposite
direction and the next packet in that direction triggers output and cleanup.

Fixes: 23323a961fac ("app-layer: reduce app cleanup and output-tx calls")
Bug: #6120.

src/app-layer.c
src/app-layer.h
src/decode.h
src/flow-worker.c
src/flow.h
src/output-tx.c
src/packet.c
src/stream-tcp-reassemble.c
src/stream-tcp-reassemble.h

index 6d4ac2ea8f795fa5aa5874b3c2538af6807eb9a4..67b8807314434b5f80f977ee9e7ac0d26a572cb2 100644 (file)
@@ -338,10 +338,9 @@ extern enum ExceptionPolicy g_applayerparser_error_policy;
  *  \retval int -1 error
  *  \retval int 0 ok
  */
-static int TCPProtoDetect(ThreadVars *tv,
-        TcpReassemblyThreadCtx *ra_ctx, AppLayerThreadCtx *app_tctx,
-        Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream,
-        uint8_t *data, uint32_t data_len, uint8_t flags)
+static int TCPProtoDetect(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
+        AppLayerThreadCtx *app_tctx, Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream,
+        uint8_t *data, uint32_t data_len, uint8_t flags, enum StreamUpdateDir dir)
 {
     AppProto *alproto;
     AppProto *alproto_otherdir;
@@ -507,7 +506,7 @@ static int TCPProtoDetect(ThreadVars *tv,
         int r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
                 flags, data, data_len);
         PACKET_PROFILING_APP_END(app_tctx, f->alproto);
-        p->flags |= PKT_APPLAYER_UPDATE;
+        p->app_update_direction = (uint8_t)dir;
         if (r != 1) {
             StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
         }
@@ -581,7 +580,7 @@ static int TCPProtoDetect(ThreadVars *tv,
                             f->alproto, flags,
                             data, data_len);
                     PACKET_PROFILING_APP_END(app_tctx, f->alproto);
-                    p->flags |= PKT_APPLAYER_UPDATE;
+                    p->app_update_direction = (uint8_t)dir;
                     if (r != 1) {
                         StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
                     }
@@ -641,11 +640,9 @@ detect_error:
  *  \param stream ptr-to-ptr to stream object. Might change if flow dir is
  *                reversed.
  */
-int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
-                          Packet *p, Flow *f,
-                          TcpSession *ssn, TcpStream **stream,
-                          uint8_t *data, uint32_t data_len,
-                          uint8_t flags)
+int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f,
+        TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags,
+        enum StreamUpdateDir dir)
 {
     SCEnter();
 
@@ -691,7 +688,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
         r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
                 flags, data, data_len);
         PACKET_PROFILING_APP_END(app_tctx, f->alproto);
-        p->flags |= PKT_APPLAYER_UPDATE;
+        p->app_update_direction = (uint8_t)dir;
         /* ignore parser result for gap */
         StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
         if (r < 0) {
@@ -709,8 +706,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
     if (alproto == ALPROTO_UNKNOWN && (flags & STREAM_START)) {
         DEBUG_VALIDATE_BUG_ON(FlowChangeProto(f));
         /* run protocol detection */
-        if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream,
-                           data, data_len, flags) != 0) {
+        if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir) !=
+                0) {
             goto failure;
         }
     } else if (alproto != ALPROTO_UNKNOWN && FlowChangeProto(f)) {
@@ -722,7 +719,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
         StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->client);
         StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->server);
         /* rerun protocol detection */
-        int rd = TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags);
+        int rd =
+                TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir);
         if (f->alproto == ALPROTO_UNKNOWN) {
             DEBUG_VALIDATE_BUG_ON(alstate_orig != f->alstate);
             // not enough data, revert AppLayerProtoDetectReset to rerun detection
@@ -775,7 +773,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
             r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto,
                                     flags, data, data_len);
             PACKET_PROFILING_APP_END(app_tctx, f->alproto);
-            p->flags |= PKT_APPLAYER_UPDATE;
+            p->app_update_direction = (uint8_t)dir;
             if (r != 1) {
                 StreamTcpUpdateAppLayerProgress(ssn, direction, data_len);
                 if (r < 0) {
@@ -900,7 +898,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
             r = AppLayerParserParse(tv, tctx->alp_tctx, f, f->alproto,
                                     flags, p->payload, p->payload_len);
             PACKET_PROFILING_APP_END(tctx, f->alproto);
-            p->flags |= PKT_APPLAYER_UPDATE;
+            p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET;
         }
         PACKET_PROFILING_APP_STORE(tctx, p);
         /* we do only inspection in one direction, so flag both
@@ -917,7 +915,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
                 flags, p->payload, p->payload_len);
         PACKET_PROFILING_APP_END(tctx, f->alproto);
         PACKET_PROFILING_APP_STORE(tctx, p);
-        p->flags |= PKT_APPLAYER_UPDATE;
+        p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET;
     }
     if (r < 0) {
         ExceptionPolicyApply(p, g_applayerparser_error_policy, PKT_DROP_REASON_APPLAYER_ERROR);
index bb2464f77da3d9debdbb61343c14f12c7c973f28..d08d785d73d1560ebe53ad872a573656af1ccc83 100644 (file)
 /**
  * \brief Handles reassembled tcp stream.
  */
-int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
-                          Packet *p, Flow *f,
-                          TcpSession *ssn, TcpStream **stream,
-                          uint8_t *data, uint32_t data_len,
-                          uint8_t flags);
+int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f,
+        TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags,
+        enum StreamUpdateDir dir);
 
 /**
  * \brief Handles an udp chunk.
index b39502f54523ce6a245437eb8fed1d83e31fb3ae..c3c2a94c9be7d7436a3f310ba5d140cd5d4a050d 100644 (file)
@@ -459,6 +459,8 @@ typedef struct Packet_
     uint8_t flowflags;
     /* coccinelle: Packet:flowflags:FLOW_PKT_ */
 
+    uint8_t app_update_direction; // enum StreamUpdateDir
+
     /* Pkt Flags */
     uint32_t flags;
 
@@ -1056,9 +1058,6 @@ void DecodeUnregisterCounters(void);
 #define PKT_FIRST_ALERTS BIT_U32(29)
 #define PKT_FIRST_TAG    BIT_U32(30)
 
-/** Packet updated the app-layer. */
-#define PKT_APPLAYER_UPDATE BIT_U32(31)
-
 /** \brief return 1 if the packet is a pseudo packet */
 #define PKT_IS_PSEUDOPKT(p) \
     ((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH))
index 99c45acda82e15058af1fbd121c6b6a7144acd0b..a20e053c59c9e5a38432104f3e4147626e35c246 100644 (file)
@@ -512,6 +512,44 @@ static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadD
     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
 }
 
+/** \internal
+ *  \brief apply Packet::app_update_direction to the flow flags
+ */
+static void PacketAppUpdate2FlowFlags(Packet *p)
+{
+    switch ((enum StreamUpdateDir)p->app_update_direction) {
+        case UPDATE_DIR_NONE: // NONE implies pseudo packet
+            break;
+        case UPDATE_DIR_PACKET:
+            if (PKT_IS_TOSERVER(p)) {
+                p->flow->flags |= FLOW_TS_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
+            } else {
+                p->flow->flags |= FLOW_TC_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
+            }
+            break;
+        case UPDATE_DIR_BOTH:
+            if (PKT_IS_TOSERVER(p)) {
+                p->flow->flags |= FLOW_TS_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
+            } else {
+                p->flow->flags |= FLOW_TC_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
+            }
+            /* fall through */
+        case UPDATE_DIR_OPPOSING:
+            if (PKT_IS_TOSERVER(p)) {
+                p->flow->flags |= FLOW_TC_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
+            } else {
+                p->flow->flags |= FLOW_TS_APP_UPDATED;
+                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
+            }
+            break;
+    }
+}
+
 static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
 {
     FlowWorkerThreadData *fw = data;
@@ -567,12 +605,14 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
             }
 
             FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
+            PacketAppUpdate2FlowFlags(p);
 
             /* handle the app layer part of the UDP packet payload */
         } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
             FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
             AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
             FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
+            PacketAppUpdate2FlowFlags(p);
         }
     }
 
@@ -609,13 +649,29 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
             FramesPrune(p->flow, p);
         }
 
-        if ((PKT_IS_PSEUDOPKT(p)) || ((p->flags & PKT_APPLAYER_UPDATE) != 0)) {
-            SCLogDebug("pseudo or app update: run cleanup");
-            /* run tx cleanup last */
-            AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
+        if ((PKT_IS_PSEUDOPKT(p)) ||
+                (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
+            if (PKT_IS_TOSERVER(p)) {
+                if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
+                    AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
+                    p->flow->flags &= ~FLOW_TS_APP_UPDATED;
+                }
+            } else {
+                if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
+                    AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
+                    p->flow->flags &= ~FLOW_TC_APP_UPDATED;
+                }
+            }
+
         } else {
             SCLogDebug("not pseudo, no app update: skip");
         }
+
+        if (p->flow->flags & FLOW_ACTION_DROP) {
+            SCLogDebug("flow drop in place: remove app update flags");
+            p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
+        }
+
         Flow *f = p->flow;
         FlowDeReference(&p->flow);
         FLOWLOCK_UNLOCK(f);
index f6c2eb47fda9896efe8915ade136396d905295f8..0a730e0ea3b8d59254d543a36dadb695203a4626 100644 (file)
@@ -112,6 +112,9 @@ typedef struct AppLayerParserState_ AppLayerParserState;
 /** All packets in this flow should be passed */
 #define FLOW_ACTION_PASS BIT_U32(28)
 
+#define FLOW_TS_APP_UPDATED BIT_U32(29)
+#define FLOW_TC_APP_UPDATED BIT_U32(30)
+
 /* File flags */
 
 #define FLOWFILE_INIT                   0
index d932b9493fbebb9ed20feacde61dfccb3d7bb65d..18a34e78a7342cb404b3945569faccdeff51bd1a 100644 (file)
@@ -339,7 +339,7 @@ static TmEcode OutputTxLog(ThreadVars *tv, Packet *p, void *thread_data)
     DEBUG_VALIDATE_BUG_ON(thread_data == NULL);
     if (p->flow == NULL)
         return TM_ECODE_OK;
-    if (!((PKT_IS_PSEUDOPKT(p)) || (p->flags & PKT_APPLAYER_UPDATE) != 0)) {
+    if (!((PKT_IS_PSEUDOPKT(p)) || p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
         SCLogDebug("not pseudo, no app update: skip");
         return TM_ECODE_OK;
     }
index 4bc2c4d89893628116ae2af7b5e0298564585f03..40f3bdfcf385a2277bdc7ea387d4a6739dd04f35 100644 (file)
@@ -96,6 +96,7 @@ void PacketReinit(Packet *p)
     p->proto = 0;
     p->recursion_level = 0;
     PACKET_FREE_EXTDATA(p);
+    p->app_update_direction = 0;
     p->flags = 0;
     p->flowflags = 0;
     p->pkt_src = 0;
index ee8d0680d1333e3c2d17d8b33345748fb3575abd..21d74dd1c3ab542ef6cf407e821f41c80fec7bd0 100644 (file)
@@ -767,7 +767,7 @@ int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThre
         StreamTcpSetEvent(p, STREAM_REASSEMBLY_DEPTH_REACHED);
         /* increment stream depth counter */
         StatsIncr(tv, ra_ctx->counter_tcp_stream_depth);
-        p->flags |= PKT_APPLAYER_UPDATE;
+        p->app_update_direction = UPDATE_DIR_PACKET;
     }
     if (size == 0) {
         SCLogDebug("ssn %p: depth reached, not reassembling", ssn);
@@ -1246,9 +1246,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
         if (mydata == NULL && mydata_len > 0 && CheckGap(ssn, *stream, p)) {
             SCLogDebug("sending GAP to app-layer (size: %u)", mydata_len);
 
-            int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
-                    NULL, mydata_len,
-                    StreamGetAppLayerFlags(ssn, *stream, p)|STREAM_GAP);
+            int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream, NULL, mydata_len,
+                    StreamGetAppLayerFlags(ssn, *stream, p) | STREAM_GAP, dir);
             AppLayerProfilingStore(ra_ctx->app_tctx, p);
 
             StreamTcpSetEvent(p, STREAM_REASSEMBLY_SEQ_GAP);
@@ -1320,8 +1319,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv,
 
         SCLogDebug("parser");
         /* update the app-layer */
-        (void)AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream,
-                (uint8_t *)mydata, mydata_len, flags);
+        (void)AppLayerHandleTCPData(
+                tv, ra_ctx, p, p->flow, ssn, stream, (uint8_t *)mydata, mydata_len, flags, dir);
         AppLayerProfilingStore(ra_ctx->app_tctx, p);
         AppLayerFrameDump(p->flow);
         uint64_t new_app_progress = STREAM_APP_PROGRESS(*stream);
@@ -1374,9 +1373,8 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
         if (ssn->state >= TCP_CLOSING || (p->flags & PKT_PSEUDO_STREAM_END)) {
             SCLogDebug("sending empty eof message");
             /* send EOF to app layer */
-            AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream,
-                                  NULL, 0,
-                                  StreamGetAppLayerFlags(ssn, stream, p));
+            AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream, NULL, 0,
+                    StreamGetAppLayerFlags(ssn, stream, p), dir);
             AppLayerProfilingStore(ra_ctx->app_tctx, p);
 
             SCReturnInt(0);
index 0f1e0fcc9434dcf4a753f18c809385a88a433e4a..6f761fc0b4e4aaa73238097d34314c68278819b5 100644 (file)
@@ -51,6 +51,7 @@ enum
 };
 
 enum StreamUpdateDir {
+    UPDATE_DIR_NONE = 0,
     UPDATE_DIR_PACKET,
     UPDATE_DIR_OPPOSING,
     UPDATE_DIR_BOTH,