StatsSetupPrivate(tv);
- TmThreadsSetFlag(tv, THV_INIT_DONE);
+ // Each 'worker' thread uses this func to process/decode the packet read.
+ // Each decode method is different to receive methods in that they do not
+ // enter infinite loops. They use this as the core loop. As a result, at this
+ // point the worker threads can be considered both initialized and running.
+ TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
s = (TmSlot *)tv->tm_slots;
tv->id = TmThreadsRegisterThread(tv, tv->type);
}
-
return tv;
}
void TmThreadContinue(ThreadVars *tv)
{
TmThreadsUnsetFlag(tv, THV_PAUSE);
-
return;
}
+/**
+ * \brief Waits for all threads to be in a running state
+ *
+ * \retval TM_ECODE_OK if all are running or error if a thread failed
+ */
+TmEcode TmThreadWaitOnThreadRunning(void)
+{
+ uint16_t RX_num = 0;
+ uint16_t W_num = 0;
+ uint16_t FM_num = 0;
+ uint16_t FR_num = 0;
+ uint16_t TX_num = 0;
+
+ struct timeval start_ts;
+ struct timeval cur_ts;
+ gettimeofday(&start_ts, NULL);
+
+again:
+ SCMutexLock(&tv_root_lock);
+ for (int i = 0; i < TVT_MAX; i++) {
+ ThreadVars *tv = tv_root[i];
+ while (tv != NULL) {
+ if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) {
+ SCMutexUnlock(&tv_root_lock);
+
+ SCLogError(SC_ERR_THREAD_INIT,
+ "thread \"%s\" failed to "
+ "start: flags %04x",
+ tv->name, SC_ATOMIC_GET(tv->flags));
+ return TM_ECODE_FAILED;
+ }
+
+ if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) {
+ SCMutexUnlock(&tv_root_lock);
+
+ /* 60 seconds provided for the thread to transition from
+ * THV_INIT_DONE to THV_RUNNING */
+ gettimeofday(&cur_ts, NULL);
+ if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
+ SCLogError(SC_ERR_THREAD_INIT,
+ "thread \"%s\" failed to "
+ "start in time: flags %04x",
+ tv->name, SC_ATOMIC_GET(tv->flags));
+ return TM_ECODE_FAILED;
+ }
+
+ /* sleep a little to give the thread some
+ * time to start running */
+ SleepUsec(100);
+ goto again;
+ }
+
+ if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
+ RX_num++;
+ else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
+ W_num++;
+ else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
+ TX_num++;
+ else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
+ FM_num++;
+ else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
+ FR_num++;
+
+ tv = tv->next;
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ /* Construct a welcome string displaying
+ * initialized thread types and counts */
+ uint16_t app_len = 32;
+ uint16_t buf_len = 256;
+
+ char append_str[app_len];
+ char thread_counts[buf_len];
+
+ strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
+ if (RX_num > 0) {
+ snprintf(append_str, app_len, "RX: %u ", RX_num);
+ strlcat(thread_counts, append_str, buf_len);
+ }
+ if (W_num > 0) {
+ snprintf(append_str, app_len, "W: %u ", W_num);
+ strlcat(thread_counts, append_str, buf_len);
+ }
+ if (TX_num > 0) {
+ snprintf(append_str, app_len, "TX: %u ", TX_num);
+ strlcat(thread_counts, append_str, buf_len);
+ }
+ if (FM_num > 0) {
+ snprintf(append_str, app_len, "FM: %u ", FM_num);
+ strlcat(thread_counts, append_str, buf_len);
+ }
+ if (FR_num > 0) {
+ snprintf(append_str, app_len, "FR: %u ", FR_num);
+ strlcat(thread_counts, append_str, buf_len);
+ }
+ snprintf(append_str, app_len, " Engine started.");
+ strlcat(thread_counts, append_str, buf_len);
+ SCLogNotice("%s", thread_counts);
+
+ return TM_ECODE_OK;
+}
+
/**
* \brief Unpauses all threads present in tv_root
*/
*/
TmEcode TmThreadWaitOnThreadInit(void)
{
- uint16_t RX_num = 0;
- uint16_t W_num = 0;
- uint16_t FM_num = 0;
- uint16_t FR_num = 0;
- uint16_t TX_num = 0;
-
struct timeval start_ts;
struct timeval cur_ts;
gettimeofday(&start_ts, NULL);
return TM_ECODE_FAILED;
}
- if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
- RX_num++;
- else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
- W_num++;
- else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
- TX_num++;
- else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
- FM_num++;
- else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
- FR_num++;
-
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
- /* Construct a welcome string displaying
- * initialized thread types and counts */
- uint16_t app_len = 32;
- uint16_t buf_len = 256;
-
- char append_str[app_len];
- char thread_counts[buf_len];
-
- strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
- if (RX_num > 0) {
- snprintf(append_str, app_len, "RX: %u ", RX_num);
- strlcat(thread_counts, append_str, buf_len);
- }
- if (W_num > 0) {
- snprintf(append_str, app_len, "W: %u ", W_num);
- strlcat(thread_counts, append_str, buf_len);
- }
- if (TX_num > 0) {
- snprintf(append_str, app_len, "TX: %u ", TX_num);
- strlcat(thread_counts, append_str, buf_len);
- }
- if (FM_num > 0) {
- snprintf(append_str, app_len, "FM: %u ", FM_num);
- strlcat(thread_counts, append_str, buf_len);
- }
- if (FR_num > 0) {
- snprintf(append_str, app_len, "FR: %u ", FR_num);
- strlcat(thread_counts, append_str, buf_len);
- }
- snprintf(append_str, app_len, " Engine started.");
- strlcat(thread_counts, append_str, buf_len);
- SCLogNotice("%s", thread_counts);
-
return TM_ECODE_OK;
}