From: Richard McConnell Date: Tue, 4 Oct 2022 16:13:01 +0000 (+0100) Subject: source: add THV_RUNNING flag to notify of running state X-Git-Tag: suricata-7.0.0-rc1~458 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=13beba141c98debc4d7e29081c91a799362f19fb;p=thirdparty%2Fsuricata.git source: add THV_RUNNING flag to notify of running state Each module (thread) updates its status to indicate running. Main thread awaits for all threads to be in a running state before continuing the initialisation process Implements feature 5384 (https://redmine.openinfosecfoundation.org/issues/5384) --- diff --git a/src/counters.c b/src/counters.c index d323bb2d5e..18d7e19509 100644 --- a/src/counters.c +++ b/src/counters.c @@ -411,7 +411,7 @@ static void *StatsMgmtThread(void *arg) } SCLogDebug("stats_thread_data %p", &stats_thread_data); - TmThreadsSetFlag(tv_local, THV_INIT_DONE); + TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING); while (1) { if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) { TmThreadsSetFlag(tv_local, THV_PAUSED); @@ -480,7 +480,8 @@ static void *StatsWakeupThread(void *arg) return NULL; } - TmThreadsSetFlag(tv_local, THV_INIT_DONE); + TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING); + while (1) { if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) { TmThreadsSetFlag(tv_local, THV_PAUSED); diff --git a/src/flow-manager.c b/src/flow-manager.c index b26542706b..298c8be64a 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -799,6 +799,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); + TmThreadsSetFlag(th_v, THV_RUNNING); + while (1) { if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { @@ -1063,6 +1065,8 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) struct timeval ts; memset(&ts, 0, sizeof(ts)); + TmThreadsSetFlag(th_v, THV_RUNNING); + while (1) { if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 0340d505c6..c94b4a6b83 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -560,7 +560,6 @@ static void AFPPeersListReachedInc(void) return; if ((SC_ATOMIC_ADD(peerslist.reached, 1) + 1) == peerslist.turn) { - SCLogInfo("All AFP capture threads are running."); (void)SC_ATOMIC_SET(peerslist.reached, 0); /* Set turn to 0 to skip syncrhonization when ReceiveAFPLoop is * restarted. @@ -1339,6 +1338,10 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) fds.fd = ptv->socket; fds.events = POLLIN; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { /* Start by checking the state of our interface */ if (unlikely(ptv->afp_state == AFP_STATE_DOWN)) { diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c index d0925cc7a2..3f6365a850 100644 --- a/src/source-erf-dag.c +++ b/src/source-erf-dag.c @@ -338,6 +338,10 @@ ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot) dtv->slot = s->slot_next; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { if (suricata_ctl_flags & SURICATA_STOP) { SCReturnInt(TM_ECODE_OK); diff --git a/src/source-erf-file.c b/src/source-erf-file.c index bfeae1fbff..eaa750dbc8 100644 --- a/src/source-erf-file.c +++ b/src/source-erf-file.c @@ -116,6 +116,10 @@ TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot) etv->slot = ((TmSlot *)slot)->slot_next; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { if (suricata_ctl_flags & SURICATA_STOP) { SCReturnInt(TM_ECODE_OK); diff --git a/src/source-ipfw.c b/src/source-ipfw.c index f92d37fcba..92c96caa33 100644 --- a/src/source-ipfw.c +++ b/src/source-ipfw.c @@ -240,6 +240,11 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot) SCLogInfo("Thread '%s' will run on port %d (item %d)", tv->name, nq->port_num, ptv->ipfw_index); + + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { if (unlikely(suricata_ctl_flags != 0)) { SCReturnInt(TM_ECODE_OK); diff --git a/src/source-napatech.c b/src/source-napatech.c index 3a9160ec01..8d2ae69e2c 100644 --- a/src/source-napatech.c +++ b/src/source-napatech.c @@ -911,6 +911,10 @@ TmEcode NapatechPacketLoop(ThreadVars *tv, void *data, void *slot) TmSlot *s = (TmSlot *) slot; ntv->slot = s->slot_next; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (!(suricata_ctl_flags & SURICATA_STOP)) { /* make sure we have at least one packet in the packet pool, to prevent * us from alloc'ing packets at line rate */ diff --git a/src/source-netmap.c b/src/source-netmap.c index 603c58b77f..54e79b93cf 100644 --- a/src/source-netmap.c +++ b/src/source-netmap.c @@ -787,6 +787,11 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot) fds.events = POLLIN; SCLogDebug("thread %s polling on %d", tv->name, fds.fd); + + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + for(;;) { if (unlikely(suricata_ctl_flags != 0)) { break; diff --git a/src/source-nflog.c b/src/source-nflog.c index 2d83bc432b..e9d035e145 100644 --- a/src/source-nflog.c +++ b/src/source-nflog.c @@ -430,6 +430,10 @@ TmEcode ReceiveNFLOGLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_FAILED); } + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { if (suricata_ctl_flags != 0) break; diff --git a/src/source-nfq.c b/src/source-nfq.c index 0e8b1bdce6..d1a916f976 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -1009,6 +1009,10 @@ TmEcode ReceiveNFQLoop(ThreadVars *tv, void *data, void *slot) ntv->slot = ((TmSlot *) slot)->slot_next; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while(1) { if (unlikely(suricata_ctl_flags != 0)) { NFQDestroyQueue(nq); diff --git a/src/source-pcap-file.c b/src/source-pcap-file.c index eb26833789..417196556d 100644 --- a/src/source-pcap-file.c +++ b/src/source-pcap-file.c @@ -171,6 +171,10 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot) ptv->shared.slot = s->slot_next; ptv->shared.cb_result = TM_ECODE_OK; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + if(ptv->is_directory == 0) { SCLogInfo("Starting file run for %s", ptv->behavior.file->filename); status = PcapFileDispatch(ptv->behavior.file); diff --git a/src/source-pcap.c b/src/source-pcap.c index 5e7ebc5f88..8df56da60c 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -312,6 +312,10 @@ static TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot) ptv->slot = s->slot_next; ptv->cb_result = TM_ECODE_OK; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (1) { if (suricata_ctl_flags & SURICATA_STOP) { SCReturnInt(TM_ECODE_OK); diff --git a/src/source-pfring.c b/src/source-pfring.c index c75b275c9e..acd3ab8ef8 100644 --- a/src/source-pfring.c +++ b/src/source-pfring.c @@ -361,6 +361,10 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_FAILED); } + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while(1) { if (suricata_ctl_flags & SURICATA_STOP) { SCReturnInt(TM_ECODE_OK); diff --git a/src/source-windivert.c b/src/source-windivert.c index 4e7b7ac255..1cade18373 100644 --- a/src/source-windivert.c +++ b/src/source-windivert.c @@ -410,6 +410,10 @@ TmEcode ReceiveWinDivertLoop(ThreadVars *tv, void *data, void *slot) WinDivertThreadVars *wd_tv = (WinDivertThreadVars *)data; wd_tv->slot = ((TmSlot *)slot)->slot_next; + // Indicate that the thread is actually running its application level code (i.e., it can poll + // packets) + TmThreadsSetFlag(tv, THV_RUNNING); + while (true) { if (suricata_ctl_flags & SURICATA_STOP) { SCReturnInt(TM_ECODE_OK); diff --git a/src/suricata.c b/src/suricata.c index 81c2e6802d..335d6e301f 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -31,6 +31,10 @@ #include #endif +#if HAVE_LIBSYSTEMD +#include +#endif + #include "suricata.h" #include "conf.h" @@ -393,6 +397,23 @@ static void GlobalsDestroy(SCInstance *suri) suri->pid_filename = NULL; } +/** + * \brief Used to send OS specific notification of running threads + * + * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure. + */ +static void OnNotifyRunning(void) +{ +#if HAVE_LIBSYSTEMD + if (sd_notify(0, "READY=1") < 0) { + SCLogWarning(SC_ERR_SYSCALL, "failed to notify systemd"); + /* Please refer to: + * https://www.freedesktop.org/software/systemd/man/sd_notify.html#Return%20Value + * for discussion on why failure should not be considered an error */ + } +#endif +} + /** \brief make sure threads can stop the engine by calling this * function. Purpose: pcap file mode needs to be able to tell the * engine the file eof is reached. */ @@ -2888,6 +2909,14 @@ int SuricataMain(int argc, char **argv) /* Un-pause all the paused threads */ TmThreadContinueThreads(); + /* Must ensure all threads are fully operational before continuing with init process */ + if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) { + exit(EXIT_FAILURE); + } + + /* Print notice and send OS specific notification of threads in running state */ + OnNotifyRunning(); + PostRunStartedDetectSetup(&suricata); SCPledge(); diff --git a/src/threadvars.h b/src/threadvars.h index 1d866073c9..5c2efaf3b0 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -52,6 +52,7 @@ struct TmSlot_; * rule reloads even if no packets are read by the capture method. */ #define THV_CAPTURE_INJECT_PKT BIT_U32(11) #define THV_DEAD BIT_U32(12) /**< thread has been joined with pthread_join() */ +#define THV_RUNNING BIT_U32(13) /**< thread is running */ /** \brief Per thread variable structure */ typedef struct ThreadVars_ { diff --git a/src/tm-threads.c b/src/tm-threads.c index 4ae4aade2b..7a0f195826 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -428,7 +428,11 @@ static void *TmThreadsSlotVar(void *td) StatsSetupPrivate(tv); - TmThreadsSetFlag(tv, THV_INIT_DONE); + // Each 'worker' thread uses this func to process/decode the packet read. + // Each decode method is different to receive methods in that they do not + // enter infinite loops. They use this as the core loop. As a result, at this + // point the worker threads can be considered both initialized and running. + TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING); s = (TmSlot *)tv->tm_slots; @@ -1033,7 +1037,6 @@ ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name, tv->id = TmThreadsRegisterThread(tv, tv->type); } - return tv; } @@ -1773,10 +1776,113 @@ void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags) void TmThreadContinue(ThreadVars *tv) { TmThreadsUnsetFlag(tv, THV_PAUSE); - return; } +/** + * \brief Waits for all threads to be in a running state + * + * \retval TM_ECODE_OK if all are running or error if a thread failed + */ +TmEcode TmThreadWaitOnThreadRunning(void) +{ + uint16_t RX_num = 0; + uint16_t W_num = 0; + uint16_t FM_num = 0; + uint16_t FR_num = 0; + uint16_t TX_num = 0; + + struct timeval start_ts; + struct timeval cur_ts; + gettimeofday(&start_ts, NULL); + +again: + SCMutexLock(&tv_root_lock); + for (int i = 0; i < TVT_MAX; i++) { + ThreadVars *tv = tv_root[i]; + while (tv != NULL) { + if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) { + SCMutexUnlock(&tv_root_lock); + + SCLogError(SC_ERR_THREAD_INIT, + "thread \"%s\" failed to " + "start: flags %04x", + tv->name, SC_ATOMIC_GET(tv->flags)); + return TM_ECODE_FAILED; + } + + if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) { + SCMutexUnlock(&tv_root_lock); + + /* 60 seconds provided for the thread to transition from + * THV_INIT_DONE to THV_RUNNING */ + gettimeofday(&cur_ts, NULL); + if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) { + SCLogError(SC_ERR_THREAD_INIT, + "thread \"%s\" failed to " + "start in time: flags %04x", + tv->name, SC_ATOMIC_GET(tv->flags)); + return TM_ECODE_FAILED; + } + + /* sleep a little to give the thread some + * time to start running */ + SleepUsec(100); + goto again; + } + + if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0) + RX_num++; + else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0) + W_num++; + else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0) + TX_num++; + else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0) + FM_num++; + else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0) + FR_num++; + + tv = tv->next; + } + } + SCMutexUnlock(&tv_root_lock); + + /* Construct a welcome string displaying + * initialized thread types and counts */ + uint16_t app_len = 32; + uint16_t buf_len = 256; + + char append_str[app_len]; + char thread_counts[buf_len]; + + strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1); + if (RX_num > 0) { + snprintf(append_str, app_len, "RX: %u ", RX_num); + strlcat(thread_counts, append_str, buf_len); + } + if (W_num > 0) { + snprintf(append_str, app_len, "W: %u ", W_num); + strlcat(thread_counts, append_str, buf_len); + } + if (TX_num > 0) { + snprintf(append_str, app_len, "TX: %u ", TX_num); + strlcat(thread_counts, append_str, buf_len); + } + if (FM_num > 0) { + snprintf(append_str, app_len, "FM: %u ", FM_num); + strlcat(thread_counts, append_str, buf_len); + } + if (FR_num > 0) { + snprintf(append_str, app_len, "FR: %u ", FR_num); + strlcat(thread_counts, append_str, buf_len); + } + snprintf(append_str, app_len, " Engine started."); + strlcat(thread_counts, append_str, buf_len); + SCLogNotice("%s", thread_counts); + + return TM_ECODE_OK; +} + /** * \brief Unpauses all threads present in tv_root */ @@ -1823,12 +1929,6 @@ void TmThreadCheckThreadState(void) */ TmEcode TmThreadWaitOnThreadInit(void) { - uint16_t RX_num = 0; - uint16_t W_num = 0; - uint16_t FM_num = 0; - uint16_t FR_num = 0; - uint16_t TX_num = 0; - struct timeval start_ts; struct timeval cur_ts; gettimeofday(&start_ts, NULL); @@ -1877,55 +1977,11 @@ again: return TM_ECODE_FAILED; } - if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0) - RX_num++; - else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0) - W_num++; - else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0) - TX_num++; - else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0) - FM_num++; - else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0) - FR_num++; - tv = tv->next; } } SCMutexUnlock(&tv_root_lock); - /* Construct a welcome string displaying - * initialized thread types and counts */ - uint16_t app_len = 32; - uint16_t buf_len = 256; - - char append_str[app_len]; - char thread_counts[buf_len]; - - strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1); - if (RX_num > 0) { - snprintf(append_str, app_len, "RX: %u ", RX_num); - strlcat(thread_counts, append_str, buf_len); - } - if (W_num > 0) { - snprintf(append_str, app_len, "W: %u ", W_num); - strlcat(thread_counts, append_str, buf_len); - } - if (TX_num > 0) { - snprintf(append_str, app_len, "TX: %u ", TX_num); - strlcat(thread_counts, append_str, buf_len); - } - if (FM_num > 0) { - snprintf(append_str, app_len, "FM: %u ", FM_num); - strlcat(thread_counts, append_str, buf_len); - } - if (FR_num > 0) { - snprintf(append_str, app_len, "FR: %u ", FR_num); - strlcat(thread_counts, append_str, buf_len); - } - snprintf(append_str, app_len, " Engine started."); - strlcat(thread_counts, append_str, buf_len); - SCLogNotice("%s", thread_counts); - return TM_ECODE_OK; } diff --git a/src/tm-threads.h b/src/tm-threads.h index 6249305611..2ba6ddf306 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -123,6 +123,8 @@ void TmThreadDisableReceiveThreads(void); uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags); +TmEcode TmThreadWaitOnThreadRunning(void); + static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq) { while (1) { diff --git a/src/unix-manager.c b/src/unix-manager.c index 95a0a8c14c..09488a2bbf 100644 --- a/src/unix-manager.c +++ b/src/unix-manager.c @@ -1130,7 +1130,8 @@ static TmEcode UnixManager(ThreadVars *th_v, void *thread_data) th_v->cap_flags = 0; SCDropCaps(th_v); - TmThreadsSetFlag(th_v, THV_INIT_DONE); + TmThreadsSetFlag(th_v, THV_INIT_DONE | THV_RUNNING); + while (1) { ret = UnixMain(&command); if (ret == 0) {