* If the protocol is yet unknown, the proto detection code is run first.
*
* \param dp_ctx Thread app layer detect context
- * \param f unlocked flow
+ * \param f *locked* flow
* \param p UDP packet
*
* \retval 0 ok
int r = 0;
- FLOWLOCK_WRLOCK(f);
-
uint8_t flags = 0;
if (p->flowflags & FLOW_PKT_TOSERVER) {
flags |= STREAM_TOSERVER;
}
}
- FLOWLOCK_UNLOCK(f);
PACKET_PROFILING_APP_STORE(tctx, p);
SCReturnInt(r);
{
/* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */
if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
- FlowHandlePacket(tv, dtv, p);
+ FlowSetupPacket(p);
}
}
}
SCLogDebug("Unknown Type, ICMPV6_UNKNOWN_TYPE");
#endif
- /* Flow is an integral part of us */
- FlowHandlePacket(tv, dtv, p);
+ FlowSetupPacket(p);
return TM_ECODE_OK;
}
0x80, 0x00, 0xb1, 0xa3, 0x00, 0x00
};
- Flow *f = NULL;
Packet *p = PacketGetFromAlloc();
if (unlikely(p == NULL))
return 0;
result = 0;
goto end;
}
- if (p->flow == NULL) {
+ if (!(p->flags & PKT_WANTS_FLOW)) {
printf("packet flow shouldn't be NULL\n");
result = 0;
goto end;
}
- f = p->flow;
PACKET_RECYCLE(p);
PacketCopyData(p, pkt1, sizeof(pkt1));
result = 0;
goto end;
}
- if (tp->flow == NULL) {
+ if (!(tp->flags & PKT_WANTS_FLOW)) {
result = 0;
goto end;
}
- if (tp->flow != f) {
+ if (tp->flow_hash != p->flow_hash) {
result = 0;
goto end;
}
SCTP_GET_SRC_PORT(p), SCTP_GET_DST_PORT(p));
#endif
- /* Flow is an integral part of us */
- FlowHandlePacket(tv, dtv, p);
+ FlowSetupPacket(p);
return TM_ECODE_OK;
}
TCP_HAS_MSS(p) ? "MSS " : "");
#endif
- /* Flow is an integral part of us */
- FlowHandlePacket(tv, dtv, p);
+ FlowSetupPacket(p);
return TM_ECODE_OK;
}
if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) {
/* Here we have a Teredo packet and don't need to handle app
* layer */
- FlowHandlePacket(tv, dtv, p);
+ FlowSetupPacket(p);
return TM_ECODE_OK;
}
- /* Flow is an integral part of us */
- FlowHandlePacket(tv, dtv, p);
-
- /* handle the app layer part of the UDP packet payload */
- if (unlikely(p->flow != NULL)) {
- AppLayerHandleUdp(tv, dtv->app_tctx, p, p->flow);
- }
+ FlowSetupPacket(p);
return TM_ECODE_OK;
}
struct Flow_ *flow;
+ /* raw hash value for looking up the flow, will need to modulated to the
+ * hash size still */
+ uint32_t flow_hash;
+
struct timeval ts;
union {
#define PKT_IS_INVALID (1<<20)
#define PKT_PROFILE (1<<21)
+/** indication by decoder that it feels the packet should be handled by
+ * flow engine: Packet::flow_hash will be set */
+#define PKT_WANTS_FLOW (1<<22)
+
/** \brief return 1 if the packet is a pseudo packet */
#define PKT_IS_PSEUDOPKT(p) ((p)->flags & PKT_PSEUDO_STREAM_END)
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
-static int handle_tcp_reuse = 1;
-
-void FlowDisableTcpReuseHandling(void)
-{
- handle_tcp_reuse = 0;
-}
/** \brief compare two raw ipv6 addrs
*
return 0;
}
+void FlowSetupPacket(Packet *p)
+{
+ p->flags |= PKT_WANTS_FLOW;
+ p->flow_hash = FlowGetHash(p);
+}
+
int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn);
static inline int FlowCompare(Flow *f, const Packet *p)
if (f->flags & FLOW_TCP_REUSED)
return 0;
- if (handle_tcp_reuse == 1) {
- /* lets see if we need to consider the existing session reuse */
- if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
- /* okay, we need to setup a new flow for this packet.
- * Flag the flow that it's been replaced by a new one */
- f->flags |= FLOW_TCP_REUSED;
- SCLogDebug("flow obsolete: TCP reuse will use a new flow "
- "starting with packet %"PRIu64, p->pcap_cnt);
- return 0;
- }
- }
return 1;
} else {
return CMP_FLOW(f, p);
return f;
}
-Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest)
+static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
+ FlowBucket *fb, Flow *old_f,
+ const uint32_t hash, const Packet *p)
{
- Flow *f = NULL;
-
- /* get the key to our bucket */
- uint32_t hash = FlowGetHash(p);
- /* get our hash bucket and lock it */
- FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
- FBLOCK_LOCK(fb);
+ /* tag flow as reused so future lookups won't find it */
+ old_f->flags |= FLOW_TCP_REUSED;
+ /* get some settings that we move over to the new flow */
+ FlowThreadId thread_id = old_f->thread_id;
+ int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
- SCLogDebug("fb %p fb->head %p", fb, fb->head);
-
- f = FlowGetNew(NULL, NULL, p);
- if (f != NULL) {
- /* flow is locked */
- if (fb->head == NULL) {
- fb->head = f;
- fb->tail = f;
- } else {
- f->hprev = fb->tail;
- fb->tail->hnext = f;
- fb->tail = f;
- }
-
- /* got one, now lock, initialize and return */
- FlowInit(f, p);
- f->flow_hash = hash;
- f->fb = fb;
- /* update the last seen timestamp of this flow */
- COPY_TIMESTAMP(&p->ts,&f->lastts);
- FlowReference(dest, f);
+ /* since fb lock is still held this flow won't be found until we are done */
+ FLOWLOCK_UNLOCK(old_f);
- }
- FBLOCK_UNLOCK(fb);
- return f;
-}
-
-/** \brief Lookup flow based on packet
- *
- * Find the flow belonging to this packet. If not found, no new flow
- * is set up.
- *
- * \param p packet to lookup the flow for
- *
- * \retval f flow or NULL if not found
- */
-Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest)
-{
- Flow *f = NULL;
-
- /* get the key to our bucket */
- uint32_t hash = FlowGetHash(p);
- /* get our hash bucket and lock it */
- FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
- FBLOCK_LOCK(fb);
-
- SCLogDebug("fb %p fb->head %p", fb, fb->head);
-
- /* see if the bucket already has a flow */
- if (fb->head == NULL) {
- FBLOCK_UNLOCK(fb);
+ /* Get a new flow. It will be either a locked flow or NULL */
+ Flow *f = FlowGetNew(tv, dtv, p);
+ if (f == NULL) {
return NULL;
}
- /* ok, we have a flow in the bucket. Let's find out if it is our flow */
- f = fb->head;
-
- /* see if this is the flow we are looking for */
- if (FlowCompare(f, p) == 0) {
- while (f) {
- f = f->hnext;
-
- if (f == NULL) {
- FBLOCK_UNLOCK(fb);
- return NULL;
- }
+ /* flow is locked */
- if (FlowCompare(f, p) != 0) {
- /* we found our flow, lets put it on top of the
- * hash list -- this rewards active flows */
- if (f->hnext) {
- f->hnext->hprev = f->hprev;
- }
- if (f->hprev) {
- f->hprev->hnext = f->hnext;
- }
- if (f == fb->tail) {
- fb->tail = f->hprev;
- }
+ /* put at the start of the list */
+ f->hnext = fb->head;
+ fb->head->hprev = f;
+ fb->head = f;
- f->hnext = fb->head;
- f->hprev = NULL;
- fb->head->hprev = f;
- fb->head = f;
+ /* initialize and return */
+ FlowInit(f, p);
+ f->flow_hash = hash;
+ f->fb = fb;
- /* found our flow, lock & return */
- FLOWLOCK_WRLOCK(f);
- /* update the last seen timestamp of this flow */
- COPY_TIMESTAMP(&p->ts,&f->lastts);
- FlowReference(dest, f);
-
- FBLOCK_UNLOCK(fb);
- return f;
- }
- }
+ f->thread_id = thread_id;
+ if (autofp_tmqh_flow_qid != -1) {
+ SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
}
-
- /* lock & return */
- FLOWLOCK_WRLOCK(f);
- /* update the last seen timestamp of this flow */
- COPY_TIMESTAMP(&p->ts,&f->lastts);
- FlowReference(dest, f);
-
- FBLOCK_UNLOCK(fb);
return f;
}
{
Flow *f = NULL;
- /* get the key to our bucket */
- uint32_t hash = FlowGetHash(p);
/* get our hash bucket and lock it */
- FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
+ const uint32_t hash = p->flow_hash;
+ FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
FBLOCK_LOCK(fb);
SCLogDebug("fb %p fb->head %p", fb, fb->head);
/* found our flow, lock & return */
FLOWLOCK_WRLOCK(f);
+ if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
+ f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+ }
+
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
/* lock & return */
FLOWLOCK_WRLOCK(f);
+ if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
+ f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+ }
+
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
return 1;
}
-/**
- *
- * Remove packet from flow. This assumes this happens *before* the packet
- * is added to the stream engine and other higher state.
- *
- * \todo we can't restore the lastts
- */
-void FlowHandlePacketUpdateRemove(Flow *f, Packet *p)
-{
- if (p->flowflags & FLOW_PKT_TOSERVER) {
- f->todstpktcnt--;
- f->todstbytecnt -= GET_PKT_LEN(p);
- p->flowflags &= ~(FLOW_PKT_TOSERVER|FLOW_PKT_TOSERVER_FIRST);
- } else {
- f->tosrcpktcnt--;
- f->tosrcbytecnt -= GET_PKT_LEN(p);
- p->flowflags &= ~(FLOW_PKT_TOCLIENT|FLOW_PKT_TOCLIENT_FIRST);
- }
- p->flowflags &= ~FLOW_PKT_ESTABLISHED;
-
- /*set the detection bypass flags*/
- if (f->flags & FLOW_NOPACKET_INSPECTION) {
- SCLogDebug("unsetting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
- DecodeUnsetNoPacketInspectionFlag(p);
- }
- if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {
- SCLogDebug("unsetting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);
- DecodeUnsetNoPayloadInspectionFlag(p);
- }
-}
-
/** \brief Update Packet and Flow
*
* Updates packet and flow based on the new packet.
if (f == NULL)
return;
- FlowHandlePacketUpdate(f, p);
-
- FLOWLOCK_UNLOCK(f);
-
/* set the flow in the packet */
p->flags |= PKT_HAS_FLOW;
return;
void (*Freefunc)(void *);
} FlowProto;
+/** \brief prepare packet for a life with flow
+ * Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup
+ * and calc the hash value to be used in the lookup and autofp flow
+ * balancing. */
+void FlowSetupPacket(Packet *p);
void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
void FlowInitConfig (char);
void FlowPrintQueueInfo (void);
void *FlowGetAppState(const Flow *f);
uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags);
-void FlowHandlePacketUpdateRemove(Flow *f, Packet *p);
void FlowHandlePacketUpdate(Flow *f, Packet *p);
-Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest);
-Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest);
-
#endif /* __FLOW_H__ */
int thread;
RunModeInitialize();
- RunmodeSetFlowStreamAsync();
char *file = NULL;
if (ConfGet("erf-file.file", &file) == 0) {
int thread;
RunModeInitialize();
- RunmodeSetFlowStreamAsync();
char *file = NULL;
if (ConfGet("pcap-file.file", &file) == 0) {
return 0;
}
-/** \brief Handle TCP reuse of tuple
- *
- * Logic:
- * 1. see if packet could trigger a new session
- * 2. see if the flow/ssn is in a state where we want to support the reuse
- * 3. disconnect packet from the old flow
- * -> at this point new packets can still find the old flow
- * -> as the flow's reference count != 0, it can't disappear
- * 4. setup a new flow unconditionally
- * 5. attach packet to new flow
- * 6. tag old flow as FLOW_TCP_REUSED
- * -> NEW packets won't find it
- * -> existing packets in our queues may still reference it
- * 7. dereference the old flow (reference cnt *may* now be 0,
- * if no other packets reference it)
- *
- * The packets that still hold a reference to the old flow are updated
- * by HandleFlowReuseApplyToPacket()
- */
-static void TcpSessionReuseHandle(Packet *p) {
- if (likely(TcpSessionPacketIsStreamStarter(p) == 0))
- return;
-
- int reuse = 0;
- FLOWLOCK_RDLOCK(p->flow);
- reuse = TcpSessionReuseDoneEnough(p, p->flow, p->flow->protoctx);
- if (!reuse) {
- SCLogDebug("steam starter packet %"PRIu64", but state not "
- "ready to be reused", p->pcap_cnt);
- FLOWLOCK_UNLOCK(p->flow);
- return;
- }
-
- SCLogDebug("steam starter packet %"PRIu64", and state "
- "ready to be reused", p->pcap_cnt);
-
- /* ok, this packet needs a new flow */
-
- /* first, get a reference to the old flow */
- Flow *old_f = NULL;
- FlowReference(&old_f, p->flow);
-
- /* get some settings that we move over to the new flow */
- FlowThreadId thread_id = old_f->thread_id;
- int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
-
- /* disconnect the packet from the old flow */
- FlowHandlePacketUpdateRemove(p->flow, p);
- FLOWLOCK_UNLOCK(p->flow);
- FlowDeReference(&p->flow); // < can't disappear while usecnt >0
-
- /* Can't tag flow as reused yet, would be a race condition:
- * new packets will not get old flow because of FLOW_TCP_REUSED,
- * so new flow may be created. This new flow could be handled in
- * a different thread. */
-
- /* Get a flow. It will be either a locked flow or NULL */
- Flow *new_f = FlowGetFlowFromHashByPacket(p, &p->flow);
- if (new_f == NULL) {
- FlowDeReference(&old_f); // < can't disappear while usecnt >0
- return;
- }
-
- /* update flow and packet */
- FlowHandlePacketUpdate(new_f, p);
- BUG_ON(new_f != p->flow);
-
- /* copy flow balancing settings */
- new_f->thread_id = thread_id;
- SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
-
- FLOWLOCK_UNLOCK(new_f);
-
- /* tag original flow that it's now unused */
- FLOWLOCK_WRLOCK(old_f);
- SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt);
- old_f->flags |= FLOW_TCP_REUSED;
- FLOWLOCK_UNLOCK(old_f);
- FlowDeReference(&old_f); // < can't disappear while usecnt >0
-
- SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt);
-}
-
-/** \brief Handle packets that reference the wrong flow because of TCP reuse
- *
- * In the case of TCP reuse we can have many packets that were assigned
- * a flow by the capture/decode threads before the stream engine decided
- * that a new flow was needed for these packets.
- * When HandleFlowReuse creates a new flow, the packets already processed
- * by the flow engine will still reference the old flow.
- *
- * This function detects this case and replaces the flow for those packets.
- * It's a fairly expensive operation, but it should be rare as it's only
- * done for packets that were already in the engine when the TCP reuse
- * case was handled. New packets are assigned the correct flow by the
- * flow engine.
- */
-static void TcpSessionReuseHandleApplyToPacket(Packet *p)
+void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
{
- int need_flow_replace = 0;
-
- FLOWLOCK_WRLOCK(p->flow);
- if (p->flow->flags & FLOW_TCP_REUSED) {
- SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt);
- need_flow_replace = 1;
- }
-
- if (likely(need_flow_replace == 0)) {
- /* Work around a race condition: if HandleFlowReuse has inserted a new flow,
- * it will not have seen both sides of the session yet. The packet we have here
- * may be the first that got the flow directly from the hash right after the
- * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */
- if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) {
- p->flowflags |= FLOW_PKT_ESTABLISHED;
- SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
- } else {
- SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
- }
- SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow);
- FLOWLOCK_UNLOCK(p->flow);
- return;
- }
-
- /* disconnect packet from old flow */
- FlowHandlePacketUpdateRemove(p->flow, p);
- FLOWLOCK_UNLOCK(p->flow);
- FlowDeReference(&p->flow); // < can't disappear while usecnt >0
+ FlowHandlePacketUpdate(p->flow, p);
- /* find the new flow that does belong to this packet */
- Flow *new_f = FlowLookupFlowFromHash(p, &p->flow);
- if (new_f == NULL) {
- // TODO reset packet flag wrt flow: direction, HAS_FLOW etc
- p->flags &= ~PKT_HAS_FLOW;
- return;
+ /* handle the app layer part of the UDP packet payload */
+ if (p->proto == IPPROTO_UDP) {
+ AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
}
- FlowHandlePacketUpdate(new_f, p);
- BUG_ON(new_f != p->flow);
- FLOWLOCK_UNLOCK(new_f);
- SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow);
}
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
StreamTcpThread *stt = (StreamTcpThread *)data;
TmEcode ret = TM_ECODE_OK;
- if (!(PKT_IS_TCP(p)))
+ SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);
+
+ if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) {
+ FLOWLOCK_WRLOCK(p->flow);
+ AppLayerProfilingReset(stt->ra_ctx->app_tctx);
+ (void)StreamTcpPacket(tv, p, stt, pq);
+ p->flags |= PKT_IGNORE_CHECKSUM;
+ stt->pkts++;
+ FLOWLOCK_UNLOCK(p->flow);
return TM_ECODE_OK;
+ }
+
+ if (!(p->flags & PKT_WANTS_FLOW)) {
+ return TM_ECODE_OK;
+ }
+
+ FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
+ if (likely(p->flow != NULL)) {
+ FlowUpdate(tv, stt, p);
+ }
+
+ if (!(PKT_IS_TCP(p))) {
+ goto unlock;
+ }
if (p->flow == NULL) {
StatsIncr(tv, stt->counter_tcp_no_flow);
- return TM_ECODE_OK;
+ goto unlock;
}
+ /* only TCP packets with a flow from here */
+
if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
if (StreamTcpValidateChecksum(p) == 0) {
StatsIncr(tv, stt->counter_tcp_invalid_checksum);
- return TM_ECODE_OK;
+ goto unlock;
}
} else {
p->flags |= PKT_IGNORE_CHECKSUM;
}
-
- if (stt->runmode_flow_stream_async) {
- /* "autofp" handling of TCP session/flow reuse */
- if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
- /* apply previous reuses to this packet */
- TcpSessionReuseHandleApplyToPacket(p);
- if (p->flow == NULL)
- return ret;
-
- if (!(p->flowflags & FLOW_PKT_TOSERVER_FIRST)) {
- /* after that, check for 'new' reuse */
- TcpSessionReuseHandle(p);
- if (p->flow == NULL)
- return ret;
- }
- }
- }
AppLayerProfilingReset(stt->ra_ctx->app_tctx);
- FLOWLOCK_WRLOCK(p->flow);
ret = StreamTcpPacket(tv, p, stt, pq);
- FLOWLOCK_UNLOCK(p->flow);
//if (ret)
// return TM_ECODE_FAILED;
stt->pkts++;
+
+ unlock:
+ if (p->flow) {
+ FLOWLOCK_UNLOCK(p->flow);
+ }
return ret;
}
if (stt->ssn_pool_id < 0 || ssn_pool == NULL)
SCReturnInt(TM_ECODE_FAILED);
- /* see if need to enable the TCP reuse handling in the stream engine */
- stt->runmode_flow_stream_async = RunmodeGetFlowStreamAsync();
- SCLogDebug("Flow and Stream engine run %s",
- stt->runmode_flow_stream_async ? "asynchronous" : "synchronous");
-
SCReturnInt(TM_ECODE_OK);
}
typedef struct StreamTcpThread_ {
int ssn_pool_id;
- /** if set to true, we activate the TCP tuple reuse code in the
- * stream engine. */
- int runmode_flow_stream_async;
-
uint64_t pkts;
/** queue for pseudo packet(s) that were created in the stream
if (strcasecmp(scheduler, "round-robin") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
} else if (strcasecmp(scheduler, "active-packets") == 0) {
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+ //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+ SCLogNotice("FIXME: using flow hash instead of active packets");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "ippair") == 0) {
exit(EXIT_FAILURE);
}
} else {
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+ //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}
return;
* \param tv thread vars.
* \param p packet.
*/
-void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
+void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
return;
}
+void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
+{
+ int16_t qid = 0;
+
+ TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+ if (p->flags & PKT_WANTS_FLOW) {
+ uint32_t hash = p->flow_hash;
+ qid = hash % ctx->size;
+ } else {
+ qid = ctx->last++;
+
+ if (ctx->last == ctx->size)
+ ctx->last = 0;
+ }
+ (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+ PacketQueue *q = ctx->queues[qid].q;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
+
+ return;
+}
/**
* \brief select the queue to output based on IP address pair.
*
#include "flow-hash.h"
-/** set to true if flow engine and stream engine run in different
- * threads. */
-static int runmode_flow_stream_async = 0;
-
-void RunmodeSetFlowStreamAsync(void)
-{
- runmode_flow_stream_async = 1;
- FlowDisableTcpReuseHandling();
-}
-
-int RunmodeGetFlowStreamAsync(void)
-{
- return runmode_flow_stream_async;
-}
-
/** \brief create a queue string for autofp to pass to
* the flow queue handler.
*
if (thread_max < 1)
thread_max = 1;
- RunmodeSetFlowStreamAsync();
-
queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
if (queues == NULL) {
SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");
if (thread_max < 1)
thread_max = 1;
- RunmodeSetFlowStreamAsync();
-
queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
if (queues == NULL) {
SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");
p->dst.addr_data32[0] = i;
}
FlowHandlePacket(NULL, NULL, p);
- if (p->flow != NULL)
+ if (p->flow != NULL) {
SC_ATOMIC_RESET(p->flow->use_cnt);
+ FLOWLOCK_UNLOCK(p->flow);
+ }
/* Now the queues shoul be updated */
UTHFreePacket(p);