}
}
-
-/* nudge capture loops to wake up */
-static void BreakCapture(void)
-{
- SCMutexLock(&tv_root_lock);
- for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
- if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
- continue;
- }
- /* find the correct slot */
- for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
- if (suricata_ctl_flags != 0) {
- SCMutexUnlock(&tv_root_lock);
- return;
- }
-
- TmModule *tm = TmModuleGetById(s->tm_id);
- if (!(tm->flags & TM_FLAG_RECEIVE_TM)) {
- continue;
- }
-
- /* signal capture method that we need a packet. */
- TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
- /* if the method supports it, BreakLoop. Otherwise we rely on
- * the capture method's recv timeout */
- if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
- tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
- }
- break;
- }
- }
- SCMutexUnlock(&tv_root_lock);
-}
-
/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
InjectPackets(detect_tvs, new_det_ctx, no_of_detect_tvs);
+ /* loop waiting for detect threads to switch to the new det_ctx. Try to
+ * wake up capture if needed (break loop). */
+ uint32_t threads_done = 0;
+retry:
for (i = 0; i < no_of_detect_tvs; i++) {
- int break_out = 0;
+ if (suricata_ctl_flags != 0) {
+ threads_done = no_of_detect_tvs;
+ break;
+ }
usleep(1000);
- while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
- if (suricata_ctl_flags != 0) {
- break_out = 1;
- break;
- }
-
- BreakCapture();
- usleep(1000);
+ if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) == 1) {
+ SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
+ threads_done++;
+ } else if (detect_tvs[i]->break_loop) {
+ TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
- if (break_out)
- break;
- SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
+ }
+ if (threads_done < no_of_detect_tvs) {
+ threads_done = 0;
+ SleepMsec(250);
+ goto retry;
}
/* this is to make sure that if someone initiated shutdown during a live
return thread_max;
}
-static inline void ThreadBreakLoop(ThreadVars *tv)
-{
- if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
- return;
- }
- /* find the correct slot */
- TmSlot *s = tv->tm_slots;
- TmModule *tm = TmModuleGetById(s->tm_id);
- if (tm->flags & TM_FLAG_RECEIVE_TM) {
- /* if the method supports it, BreakLoop. Otherwise we rely on
- * the capture method's recv timeout */
- if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
- tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
- }
- }
-}
-
/**
* \retval r 1 if packet was accepted, 0 otherwise
* \note if packet was not accepted, it's still the responsibility
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
- ThreadBreakLoop(tv);
+ TmThreadsCaptureBreakLoop(tv);
}
return 1;
}
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
- ThreadBreakLoop(tv);
+ TmThreadsCaptureBreakLoop(tv);
}
}
tv->tmqh_out(tv, p);
}
+static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
+{
+ if (unlikely(!tv->break_loop))
+ return;
+
+ if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
+ return;
+ }
+ /* find the correct slot */
+ TmSlot *s = tv->tm_slots;
+ TmModule *tm = TmModuleGetById(s->tm_id);
+ if (tm->flags & TM_FLAG_RECEIVE_TM) {
+ /* if the method supports it, BreakLoop. Otherwise we rely on
+ * the capture method's recv timeout */
+ if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
+ tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
+ }
+ TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
+ }
+}
+
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);