]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: prepare flow forced reuse logging
authorVictor Julien <victor@inliniac.net>
Fri, 9 May 2014 12:37:07 +0000 (14:37 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 28 Jul 2014 13:47:44 +0000 (15:47 +0200)
Most flows are marked for clean up by the flow manager, which then
passes them to the recycler. The recycler logs and cleans up. However,
under resource stress conditions, the packet threads can recycle
existing flow directly. So here the recycler has no role to play, as
the flow is immediately used.

For this reason, the packet threads need to be able to invoke the
flow logger directly.

The flow logging thread ctx will stored in the DecodeThreadVars
stucture. Therefore, this patch makes the DecodeThreadVars an argument
to FlowHandlePacket.

src/decode-icmpv4.c
src/decode-icmpv6.c
src/decode-sctp.c
src/decode-tcp.c
src/decode-udp.c
src/decode.h
src/flow-hash.c
src/flow-hash.h
src/flow.c
src/flow.h
src/util-unittest-helper.c

index 8b4402e6ff6957f78f1ed77f711057164011f27c..6493550d99eec9a9a21fe6497eb624a67dd56ded 100644 (file)
@@ -192,7 +192,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt,
 
                     /* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */
                     if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
-                        FlowHandlePacket(tv, p);
+                        FlowHandlePacket(tv, dtv, p);
                     }
                 }
             }
index 7819f254982d67d0f04c1116986541dbcd045195..0a58db54889aea85776117f75ee8c68a34706536 100644 (file)
@@ -337,7 +337,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p,
 #endif
 
     /* Flow is an integral part of us */
-    FlowHandlePacket(tv, p);
+    FlowHandlePacket(tv, dtv, p);
 
     return TM_ECODE_OK;
 }
index 178ed8bd699d8c19a45be7db95f8d51353a101be..6fd8be8d616d74c69c0fcf1b9713044d5541ffcf 100644 (file)
@@ -74,7 +74,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u
 #endif
 
     /* Flow is an integral part of us */
-    FlowHandlePacket(tv, p);
+    FlowHandlePacket(tv, dtv, p);
 
     return TM_ECODE_OK;
 }
index 11ba600f33fdf93c1fba93c57e0f1efdf5d8c5be..47c066160d0bebe19236431ece51f7110bf783f5 100644 (file)
@@ -203,7 +203,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
 #endif
 
     /* Flow is an integral part of us */
-    FlowHandlePacket(tv, p);
+    FlowHandlePacket(tv, dtv, p);
 
     return TM_ECODE_OK;
 }
index e1d52082e3f98f8067531ece515d9d61a754b1bb..5b83768f9e6f93976f35f62e7a536c89a6a75fcd 100644 (file)
@@ -85,12 +85,12 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
     if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) {
         /* Here we have a Teredo packet and don't need to handle app
          * layer */
-        FlowHandlePacket(tv, p);
+        FlowHandlePacket(tv, dtv, p);
         return TM_ECODE_OK;
     }
 
     /* Flow is an integral part of us */
-    FlowHandlePacket(tv, p);
+    FlowHandlePacket(tv, dtv, p);
 
     /* handle the app layer part of the UDP packet payload */
     if (unlikely(p->flow != NULL)) {
index fa48f4ac896ee163fdbfdb4f54d92851a68163e0..dd60e788761cc7188b128c755c5f9e2f7e90a9a8 100644 (file)
@@ -573,6 +573,9 @@ typedef struct DecodeThreadVars_
 
     int vlan_disabled;
 
+    /* thread data for flow logging api */
+    void *output_flow_thread_data;
+
     /** stats/counters */
     uint16_t counter_pkts;
     uint16_t counter_bytes;
index 52bbd8ebcd1485eae090508e92b0345cbc3797d8..607e00c5329b8895d64286b2b271983db4f6bf5c 100644 (file)
 
 #include "util-hash-lookup3.h"
 
+#include "conf.h"
+#include "output.h"
+#include "output-flow.h"
+
 #define FLOW_DEFAULT_FLOW_PRUNE 5
 
 SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
 
-static Flow *FlowGetUsedFlow(void);
+static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
 
 #ifdef FLOW_DEBUG_STATS
 #define FLOW_DEBUG_STATS_PROTO_ALL      0
@@ -422,9 +426,12 @@ static inline int FlowCreateCheck(const Packet *p)
  *  Get a new flow. We're checking memcap first and will try to make room
  *  if the memcap is reached.
  *
+ *  \param tv thread vars
+ *  \param dtv decode thread vars (for flow log api thread data)
+ *
  *  \retval f *LOCKED* flow on succes, NULL on error.
  */
-static Flow *FlowGetNew(const Packet *p)
+static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
 {
     Flow *f = NULL;
 
@@ -447,7 +454,7 @@ static Flow *FlowGetNew(const Packet *p)
                 FlowWakeupFlowManagerThread();
             }
 
-            f = FlowGetUsedFlow();
+            f = FlowGetUsedFlow(tv, dtv);
             if (f == NULL) {
                 /* very rare, but we can fail. Just giving up */
                 return NULL;
@@ -473,7 +480,7 @@ static Flow *FlowGetNew(const Packet *p)
     return f;
 }
 
-/* FlowGetFlowFromHash
+/** \brief Get Flow for packet
  *
  * Hash retrieval function for flows. Looks up the hash bucket containing the
  * flow pointer. Then compares the packet with the found flow to see if it is
@@ -485,9 +492,12 @@ static Flow *FlowGetNew(const Packet *p)
  *
  * The p->flow pointer is updated to point to the flow.
  *
- * returns a *LOCKED* flow or NULL
+ *  \param tv thread vars
+ *  \param dtv decode thread vars (for flow log api thread data)
+ *
+ *  \retval f *LOCKED* flow or NULL
  */
-Flow *FlowGetFlowFromHash(const Packet *p)
+Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
 {
     Flow *f = NULL;
     FlowHashCountInit;
@@ -504,7 +514,7 @@ Flow *FlowGetFlowFromHash(const Packet *p)
 
     /* see if the bucket already has a flow */
     if (fb->head == NULL) {
-        f = FlowGetNew(p);
+        f = FlowGetNew(tv, dtv, p);
         if (f == NULL) {
             FBLOCK_UNLOCK(fb);
             FlowHashCountUpdate;
@@ -538,7 +548,7 @@ Flow *FlowGetFlowFromHash(const Packet *p)
             f = f->hnext;
 
             if (f == NULL) {
-                f = pf->hnext = FlowGetNew(p);
+                f = pf->hnext = FlowGetNew(tv, dtv, p);
                 if (f == NULL) {
                     FBLOCK_UNLOCK(fb);
                     FlowHashCountUpdate;
@@ -603,9 +613,12 @@ Flow *FlowGetFlowFromHash(const Packet *p)
  *  top each time since that would clear the top of the hash leading to longer
  *  and longer search times under high pressure (observed).
  *
+ *  \param tv thread vars
+ *  \param dtv decode thread vars (for flow log api thread data)
+ *
  *  \retval f flow or NULL
  */
-static Flow *FlowGetUsedFlow(void)
+static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
 {
     uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
     uint32_t cnt = flow_config.hash_size;
@@ -653,6 +666,10 @@ static Flow *FlowGetUsedFlow(void)
         f->fb = NULL;
         FBLOCK_UNLOCK(fb);
 
+        /* invoke flow log api */
+        if (dtv && dtv->output_flow_thread_data)
+            (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
+
         FlowClearMemory(f, f->protomap);
 
         FLOWLOCK_UNLOCK(f);
index 0de1a64346902d6954cd74d0f96258e2058cd7af..a5635b06ce19e64e7cd64dc0813582ab0313af9e 100644 (file)
@@ -68,7 +68,7 @@ typedef struct FlowBucket_ {
 
 /* prototypes */
 
-Flow *FlowGetFlowFromHash(const Packet *);
+Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *);
 
 /** enable to print stats on hash lookups in flow-debug.log */
 //#define FLOW_DEBUG_STATS
index 3ba14d7464f0d20126567ba26743785a05e352d6..516e684e946f1400a16d0d1751dea0980e49fc8b 100644 (file)
@@ -232,14 +232,15 @@ static inline int FlowUpdateSeenFlag(const Packet *p)
  * This is called for every packet.
  *
  *  \param tv threadvars
+ *  \param dtv decode thread vars (for flow output api thread data)
  *  \param p packet to handle flow for
  */
-void FlowHandlePacket(ThreadVars *tv, Packet *p)
+void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
 {
     /* Get this packet's flow from the hash. FlowHandlePacket() will setup
      * a new flow if nescesary. If we get NULL, we're out of flow memory.
      * The returned flow is locked. */
-    Flow *f = FlowGetFlowFromHash(p);
+    Flow *f = FlowGetFlowFromHash(tv, dtv, p);
     if (f == NULL)
         return;
 
index d1896d2019df9dec5202d484d2b459776ebbf601..1cec317603f02ad1d4d09d445e1e155e341161ad 100644 (file)
@@ -401,7 +401,7 @@ typedef struct FlowProto_ {
     int (*GetProtoState)(void *);
 } FlowProto;
 
-void FlowHandlePacket (ThreadVars *, Packet *);
+void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
 void FlowInitConfig (char);
 void FlowPrintQueueInfo (void);
 void FlowShutdown(void);
index bac2083e21615891105960292c63d0e99ff83e35..f2c96b08e3fc4c20c20d04ea69211ef31f4f9962 100644 (file)
@@ -831,7 +831,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) {
             p->src.addr_data32[0] = i + 1;
             p->dst.addr_data32[0] = i;
         }
-        FlowHandlePacket(NULL, p);
+        FlowHandlePacket(NULL, NULL, p);
         if (p->flow != NULL)
             SC_ATOMIC_RESET(p->flow->use_cnt);