From: Victor Julien Date: Thu, 8 Jun 2023 16:37:44 +0000 (+0200) Subject: app-layer: improve/fix updates logic X-Git-Tag: suricata-7.0.0-rc2~34 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c90f67ac55bd2bae6efabb2dcde7677336fa1773;p=thirdparty%2Fsuricata.git app-layer: improve/fix updates logic In 23323a961fac ("app-layer: reduce app cleanup and output-tx calls"), flag was set per packet updating the app-layer state. However this was missing a common pattern: in IDS mode most updates are done in the opposite direction of the traffic due to updates getting triggered by ACK's. This meant that file store processing might not happen for a long time, or at all. Also, app layer cleanup might not be called, which includes file pruning. This patch sets per flow set of flags to indicate app layer is (potentially) updated. It sets this per direction, based on how the parsers were invoked. If an ACK triggers an app update, the flow is tagged for the opposite direction and the next packet in that direction triggers output and cleanup. Fixes: 23323a961fac ("app-layer: reduce app cleanup and output-tx calls") Bug: #6120. --- diff --git a/src/app-layer.c b/src/app-layer.c index 6d4ac2ea8f..67b8807314 100644 --- a/src/app-layer.c +++ b/src/app-layer.c @@ -338,10 +338,9 @@ extern enum ExceptionPolicy g_applayerparser_error_policy; * \retval int -1 error * \retval int 0 ok */ -static int TCPProtoDetect(ThreadVars *tv, - TcpReassemblyThreadCtx *ra_ctx, AppLayerThreadCtx *app_tctx, - Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream, - uint8_t *data, uint32_t data_len, uint8_t flags) +static int TCPProtoDetect(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, + AppLayerThreadCtx *app_tctx, Packet *p, Flow *f, TcpSession *ssn, TcpStream **stream, + uint8_t *data, uint32_t data_len, uint8_t flags, enum StreamUpdateDir dir) { AppProto *alproto; AppProto *alproto_otherdir; @@ -507,7 +506,7 @@ static int TCPProtoDetect(ThreadVars *tv, int r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto, flags, data, data_len); PACKET_PROFILING_APP_END(app_tctx, f->alproto); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)dir; if (r != 1) { StreamTcpUpdateAppLayerProgress(ssn, direction, data_len); } @@ -581,7 +580,7 @@ static int TCPProtoDetect(ThreadVars *tv, f->alproto, flags, data, data_len); PACKET_PROFILING_APP_END(app_tctx, f->alproto); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)dir; if (r != 1) { StreamTcpUpdateAppLayerProgress(ssn, direction, data_len); } @@ -641,11 +640,9 @@ detect_error: * \param stream ptr-to-ptr to stream object. Might change if flow dir is * reversed. */ -int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, - Packet *p, Flow *f, - TcpSession *ssn, TcpStream **stream, - uint8_t *data, uint32_t data_len, - uint8_t flags) +int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f, + TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags, + enum StreamUpdateDir dir) { SCEnter(); @@ -691,7 +688,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto, flags, data, data_len); PACKET_PROFILING_APP_END(app_tctx, f->alproto); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)dir; /* ignore parser result for gap */ StreamTcpUpdateAppLayerProgress(ssn, direction, data_len); if (r < 0) { @@ -709,8 +706,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, if (alproto == ALPROTO_UNKNOWN && (flags & STREAM_START)) { DEBUG_VALIDATE_BUG_ON(FlowChangeProto(f)); /* run protocol detection */ - if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, - data, data_len, flags) != 0) { + if (TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir) != + 0) { goto failure; } } else if (alproto != ALPROTO_UNKNOWN && FlowChangeProto(f)) { @@ -722,7 +719,8 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->client); StreamTcpResetStreamFlagAppProtoDetectionCompleted(&ssn->server); /* rerun protocol detection */ - int rd = TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags); + int rd = + TCPProtoDetect(tv, ra_ctx, app_tctx, p, f, ssn, stream, data, data_len, flags, dir); if (f->alproto == ALPROTO_UNKNOWN) { DEBUG_VALIDATE_BUG_ON(alstate_orig != f->alstate); // not enough data, revert AppLayerProtoDetectReset to rerun detection @@ -775,7 +773,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, r = AppLayerParserParse(tv, app_tctx->alp_tctx, f, f->alproto, flags, data, data_len); PACKET_PROFILING_APP_END(app_tctx, f->alproto); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)dir; if (r != 1) { StreamTcpUpdateAppLayerProgress(ssn, direction, data_len); if (r < 0) { @@ -900,7 +898,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow * r = AppLayerParserParse(tv, tctx->alp_tctx, f, f->alproto, flags, p->payload, p->payload_len); PACKET_PROFILING_APP_END(tctx, f->alproto); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET; } PACKET_PROFILING_APP_STORE(tctx, p); /* we do only inspection in one direction, so flag both @@ -917,7 +915,7 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow * flags, p->payload, p->payload_len); PACKET_PROFILING_APP_END(tctx, f->alproto); PACKET_PROFILING_APP_STORE(tctx, p); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = (uint8_t)UPDATE_DIR_PACKET; } if (r < 0) { ExceptionPolicyApply(p, g_applayerparser_error_policy, PKT_DROP_REASON_APPLAYER_ERROR); diff --git a/src/app-layer.h b/src/app-layer.h index bb2464f77d..d08d785d73 100644 --- a/src/app-layer.h +++ b/src/app-layer.h @@ -41,11 +41,9 @@ /** * \brief Handles reassembled tcp stream. */ -int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, - Packet *p, Flow *f, - TcpSession *ssn, TcpStream **stream, - uint8_t *data, uint32_t data_len, - uint8_t flags); +int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, Packet *p, Flow *f, + TcpSession *ssn, TcpStream **stream, uint8_t *data, uint32_t data_len, uint8_t flags, + enum StreamUpdateDir dir); /** * \brief Handles an udp chunk. diff --git a/src/decode.h b/src/decode.h index b39502f545..c3c2a94c9b 100644 --- a/src/decode.h +++ b/src/decode.h @@ -459,6 +459,8 @@ typedef struct Packet_ uint8_t flowflags; /* coccinelle: Packet:flowflags:FLOW_PKT_ */ + uint8_t app_update_direction; // enum StreamUpdateDir + /* Pkt Flags */ uint32_t flags; @@ -1056,9 +1058,6 @@ void DecodeUnregisterCounters(void); #define PKT_FIRST_ALERTS BIT_U32(29) #define PKT_FIRST_TAG BIT_U32(30) -/** Packet updated the app-layer. */ -#define PKT_APPLAYER_UPDATE BIT_U32(31) - /** \brief return 1 if the packet is a pseudo packet */ #define PKT_IS_PSEUDOPKT(p) \ ((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH)) diff --git a/src/flow-worker.c b/src/flow-worker.c index 99c45acda8..a20e053c59 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -512,6 +512,44 @@ static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadD FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED); } +/** \internal + * \brief apply Packet::app_update_direction to the flow flags + */ +static void PacketAppUpdate2FlowFlags(Packet *p) +{ + switch ((enum StreamUpdateDir)p->app_update_direction) { + case UPDATE_DIR_NONE: // NONE implies pseudo packet + break; + case UPDATE_DIR_PACKET: + if (PKT_IS_TOSERVER(p)) { + p->flow->flags |= FLOW_TS_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); + } else { + p->flow->flags |= FLOW_TC_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); + } + break; + case UPDATE_DIR_BOTH: + if (PKT_IS_TOSERVER(p)) { + p->flow->flags |= FLOW_TS_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); + } else { + p->flow->flags |= FLOW_TC_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); + } + /* fall through */ + case UPDATE_DIR_OPPOSING: + if (PKT_IS_TOSERVER(p)) { + p->flow->flags |= FLOW_TC_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); + } else { + p->flow->flags |= FLOW_TS_APP_UPDATED; + SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); + } + break; + } +} + static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) { FlowWorkerThreadData *fw = data; @@ -567,12 +605,14 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) } FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false); + PacketAppUpdate2FlowFlags(p); /* handle the app layer part of the UDP packet payload */ } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) { FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP); AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow); FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP); + PacketAppUpdate2FlowFlags(p); } } @@ -609,13 +649,29 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) FramesPrune(p->flow, p); } - if ((PKT_IS_PSEUDOPKT(p)) || ((p->flags & PKT_APPLAYER_UPDATE) != 0)) { - SCLogDebug("pseudo or app update: run cleanup"); - /* run tx cleanup last */ - AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p)); + if ((PKT_IS_PSEUDOPKT(p)) || + (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) { + if (PKT_IS_TOSERVER(p)) { + if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) { + AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER); + p->flow->flags &= ~FLOW_TS_APP_UPDATED; + } + } else { + if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) { + AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT); + p->flow->flags &= ~FLOW_TC_APP_UPDATED; + } + } + } else { SCLogDebug("not pseudo, no app update: skip"); } + + if (p->flow->flags & FLOW_ACTION_DROP) { + SCLogDebug("flow drop in place: remove app update flags"); + p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED); + } + Flow *f = p->flow; FlowDeReference(&p->flow); FLOWLOCK_UNLOCK(f); diff --git a/src/flow.h b/src/flow.h index f6c2eb47fd..0a730e0ea3 100644 --- a/src/flow.h +++ b/src/flow.h @@ -112,6 +112,9 @@ typedef struct AppLayerParserState_ AppLayerParserState; /** All packets in this flow should be passed */ #define FLOW_ACTION_PASS BIT_U32(28) +#define FLOW_TS_APP_UPDATED BIT_U32(29) +#define FLOW_TC_APP_UPDATED BIT_U32(30) + /* File flags */ #define FLOWFILE_INIT 0 diff --git a/src/output-tx.c b/src/output-tx.c index d932b9493f..18a34e78a7 100644 --- a/src/output-tx.c +++ b/src/output-tx.c @@ -339,7 +339,7 @@ static TmEcode OutputTxLog(ThreadVars *tv, Packet *p, void *thread_data) DEBUG_VALIDATE_BUG_ON(thread_data == NULL); if (p->flow == NULL) return TM_ECODE_OK; - if (!((PKT_IS_PSEUDOPKT(p)) || (p->flags & PKT_APPLAYER_UPDATE) != 0)) { + if (!((PKT_IS_PSEUDOPKT(p)) || p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) { SCLogDebug("not pseudo, no app update: skip"); return TM_ECODE_OK; } diff --git a/src/packet.c b/src/packet.c index 4bc2c4d898..40f3bdfcf3 100644 --- a/src/packet.c +++ b/src/packet.c @@ -96,6 +96,7 @@ void PacketReinit(Packet *p) p->proto = 0; p->recursion_level = 0; PACKET_FREE_EXTDATA(p); + p->app_update_direction = 0; p->flags = 0; p->flowflags = 0; p->pkt_src = 0; diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index ee8d0680d1..21d74dd1c3 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -767,7 +767,7 @@ int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThre StreamTcpSetEvent(p, STREAM_REASSEMBLY_DEPTH_REACHED); /* increment stream depth counter */ StatsIncr(tv, ra_ctx->counter_tcp_stream_depth); - p->flags |= PKT_APPLAYER_UPDATE; + p->app_update_direction = UPDATE_DIR_PACKET; } if (size == 0) { SCLogDebug("ssn %p: depth reached, not reassembling", ssn); @@ -1246,9 +1246,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv, if (mydata == NULL && mydata_len > 0 && CheckGap(ssn, *stream, p)) { SCLogDebug("sending GAP to app-layer (size: %u)", mydata_len); - int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream, - NULL, mydata_len, - StreamGetAppLayerFlags(ssn, *stream, p)|STREAM_GAP); + int r = AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream, NULL, mydata_len, + StreamGetAppLayerFlags(ssn, *stream, p) | STREAM_GAP, dir); AppLayerProfilingStore(ra_ctx->app_tctx, p); StreamTcpSetEvent(p, STREAM_REASSEMBLY_SEQ_GAP); @@ -1320,8 +1319,8 @@ static int ReassembleUpdateAppLayer (ThreadVars *tv, SCLogDebug("parser"); /* update the app-layer */ - (void)AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, stream, - (uint8_t *)mydata, mydata_len, flags); + (void)AppLayerHandleTCPData( + tv, ra_ctx, p, p->flow, ssn, stream, (uint8_t *)mydata, mydata_len, flags, dir); AppLayerProfilingStore(ra_ctx->app_tctx, p); AppLayerFrameDump(p->flow); uint64_t new_app_progress = STREAM_APP_PROGRESS(*stream); @@ -1374,9 +1373,8 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, if (ssn->state >= TCP_CLOSING || (p->flags & PKT_PSEUDO_STREAM_END)) { SCLogDebug("sending empty eof message"); /* send EOF to app layer */ - AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream, - NULL, 0, - StreamGetAppLayerFlags(ssn, stream, p)); + AppLayerHandleTCPData(tv, ra_ctx, p, p->flow, ssn, &stream, NULL, 0, + StreamGetAppLayerFlags(ssn, stream, p), dir); AppLayerProfilingStore(ra_ctx->app_tctx, p); SCReturnInt(0); diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index 0f1e0fcc94..6f761fc0b4 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -51,6 +51,7 @@ enum }; enum StreamUpdateDir { + UPDATE_DIR_NONE = 0, UPDATE_DIR_PACKET, UPDATE_DIR_OPPOSING, UPDATE_DIR_BOTH,