return r;
}
-/*
-
- pcap/nfq
-
- pkt read
- callback
- process_pkt
-
- pkt read
- process_pkt
-
- slot:
- setup
-
- pkt_ack_loop(tv, slot_data)
-
- deinit
-
- process_pkt:
- while(s)
- run s;
- queue;
-
- */
-
-static void *TmThreadsSlotPktAcqLoop(void *td)
+static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
{
- ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
- TmEcode r = TM_ECODE_OK;
- TmSlot *slot = NULL;
SCSetThreadName(tv->name);
CaptureStatsSetup(tv);
PacketPoolInit();
- /* check if we are setup properly */
- if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
- SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
- " PktAcqLoop=%p, tmqh_in=%p,"
- " tmqh_out=%p",
- s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
- TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- pthread_exit(NULL);
- return NULL;
- }
-
- for (slot = s; slot != NULL; slot = slot->slot_next) {
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
- r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
+ TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
if (r == TM_ECODE_DONE) {
EngineDone();
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- pthread_exit(NULL);
- return NULL;
+ goto error;
}
/* setup a queue */
} else if (slot->tm_id == TMM_FLOWWORKER) {
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- pthread_exit(NULL);
- return NULL;
+ goto error;
}
}
}
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
- bool run = TmThreadsWaitForUnpause(tv);
- while (run) {
- r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
+ return true;
+
+error:
+ return false;
+}
+
+bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv)
+{
+ TmSlot *s = tv->tm_slots;
+ bool rc = true;
- if (r == TM_ECODE_FAILED) {
- TmThreadsSetFlag(tv, THV_FAILED);
- run = false;
- }
- if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
- run = false;
- }
- if (r == TM_ECODE_DONE) {
- run = false;
- }
- }
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
PacketPoolDestroy();
- for (slot = s; slot != NULL; slot = slot->slot_next) {
+ for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
}
if (slot->SlotThreadDeinit != NULL) {
- r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
+ TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
- goto error;
+ rc = false;
+ break;
}
}
}
tv->stream_pq = NULL;
- SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
+ return rc;
+}
+
+static void *TmThreadsSlotPktAcqLoop(void *td)
+{
+ ThreadVars *tv = (ThreadVars *)td;
+ TmSlot *s = tv->tm_slots;
+ TmEcode r = TM_ECODE_OK;
+
+ /* check if we are setup properly */
+ if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
+ SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
+ " PktAcqLoop=%p, tmqh_in=%p,"
+ " tmqh_out=%p",
+ s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+ pthread_exit(NULL);
+ return NULL;
+ }
+
+ if (!TmThreadsSlotPktAcqLoopInit(td)) {
+ goto error;
+ }
+
+ bool run = TmThreadsWaitForUnpause(tv);
+
+ while (run) {
+ r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
+
+ if (r == TM_ECODE_FAILED) {
+ TmThreadsSetFlag(tv, THV_FAILED);
+ run = false;
+ }
+ if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
+ run = false;
+ }
+ if (r == TM_ECODE_DONE) {
+ run = false;
+ }
+ }
+ if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) {
+ goto error;
+ }
+
+ SCLogDebug("%s ending", tv->name);
pthread_exit((void *) 0);
return NULL;
error:
- tv->stream_pq = NULL;
pthread_exit(NULL);
return NULL;
}
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
- TmEcode r = TM_ECODE_OK;
- TmSlot *slot = NULL;
-
- /* Set the thread name */
- SCSetThreadName(tv->name);
-
- if (tv->thread_setup_flags != 0)
- TmThreadSetupOptions(tv);
-
- /* Drop the capabilities for this thread */
- SCDropCaps(tv);
-
- PacketPoolInit();
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
return NULL;
}
- for (slot = s; slot != NULL; slot = slot->slot_next) {
- if (slot->SlotThreadInit != NULL) {
- void *slot_data = NULL;
- r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
- if (r != TM_ECODE_OK) {
- if (r == TM_ECODE_DONE) {
- EngineDone();
- TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
- goto error;
- } else {
- TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- goto error;
- }
- }
- (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
- }
-
- /* if the flowworker module is the first, get the threads input queue */
- if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
- tv->stream_pq = tv->inq->pq;
- tv->tm_flowworker = slot;
- SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
- tv->flow_queue = FlowQueueNew();
- if (tv->flow_queue == NULL) {
- TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- return NULL;
- }
- /* setup a queue */
- } else if (slot->tm_id == TMM_FLOWWORKER) {
- tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
- if (tv->stream_pq_local == NULL)
- FatalError("failed to alloc PacketQueue");
- SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
- tv->stream_pq = tv->stream_pq_local;
- tv->tm_flowworker = slot;
- SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
- tv->flow_queue = FlowQueueNew();
- if (tv->flow_queue == NULL) {
- TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
- return NULL;
- }
- }
+ if (!TmThreadsSlotPktAcqLoopInit(tv)) {
+ goto error;
}
- StatsSetupPrivate(tv);
-
- TmThreadsSetFlag(tv, THV_INIT_DONE);
TmThreadsWaitForUnpause(tv);