LIBSURICATA_CONFIG ?= @CONFIGURE_PREFIX@/bin/libsuricata-config
-SURICATA_LIBS = `$(LIBSURICATA_CONFIG) --libs`
+SURICATA_LIBS = `$(LIBSURICATA_CONFIG) --libs --static`
SURICATA_CFLAGS := `$(LIBSURICATA_CONFIG) --cflags`
+# Currently the Suricata logging system requires this to be even for
+# plugins.
+CPPFLAGS += "-D__SCFILENAME__=\"$(*F)\""
+
all: simple
simple: main.c
- $(CC) -o $@ $^ $(CFLAGS) $(SURICATA_CFLAGS) $(SURICATA_LIBS)
+ $(CC) -o $@ $^ $(CPPFLAGS) $(CFLAGS) $(SURICATA_CFLAGS) $(SURICATA_LIBS)
clean:
rm -f simple
*/
#include "suricata.h"
+#include "conf.h"
+#include "pcap.h"
+#include "runmode-lib.h"
+#include "source-lib.h"
+#include "threadvars.h"
+
+/* Suricata worker thread in library mode.
+ The functions should be wrapped in an API layer. */
+static void *SimpleWorker(void *arg)
+{
+ char *pcap_file = (char *)arg;
+
+ /* Create worker. */
+ ThreadVars *tv = RunModeCreateWorker();
+ if (!tv) {
+ pthread_exit(NULL);
+ }
+
+ /* Start worker. */
+ if (RunModeSpawnWorker(tv) != 0) {
+ pthread_exit(NULL);
+ }
+
+ /* Replay pcap. */
+ pcap_t *fp = pcap_open_offline(pcap_file, NULL);
+ if (fp == NULL) {
+ pthread_exit(NULL);
+ }
+
+ int datalink = pcap_datalink(fp);
+ struct pcap_pkthdr pkthdr;
+ const u_char *packet;
+ while ((packet = pcap_next(fp, &pkthdr)) != NULL) {
+ if (TmModuleLibHandlePacket(tv, packet, datalink, pkthdr.ts, pkthdr.len, 0, 0, NULL) != 0) {
+ pthread_exit(NULL);
+ }
+ }
+ pcap_close(fp);
+
+ /* Cleanup. */
+ RunModeDestroyWorker(tv);
+ pthread_exit(NULL);
+}
int main(int argc, char **argv)
{
SuricataPreInit(argv[0]);
/* Parse command line options. This is optional, you could
- * directly configure Suricata through the Conf API. */
- SCParseCommandLine(argc, argv);
+ * directly configure Suricata through the Conf API.
+ The last argument is the PCAP file to replay. */
+ SCParseCommandLine(argc - 1, argv);
+
+ /* Set lib runmode. There is currently no way to set it via
+ the Conf API. */
+ SuricataSetLibRunmode();
/* Validate/finalize the runmode. */
if (SCFinalizeRunMode() != TM_ECODE_OK) {
exit(EXIT_FAILURE);
}
+ /* Set "offline" runmode to replay a pcap in library mode. */
+ if (!ConfSetFromString("runmode=offline", 1)) {
+ exit(EXIT_FAILURE);
+ }
+
SuricataInit();
- SuricataPostInit();
- /* Suricata is now running, but we enter a loop to keep it running
- * until it shouldn't be running anymore. */
- SuricataMainLoop();
+ /* Create and start worker on its own thread, passing the PCAP file
+ as argument. This needs to be done in between SuricataInit and
+ SuricataPostInit. */
+ pthread_t worker;
+ if (pthread_create(&worker, NULL, SimpleWorker, argv[argc - 1]) != 0) {
+ exit(EXIT_FAILURE);
+ }
+
+ /* Need to introduce a little sleep to allow the worker thread to
+ initialize before SuricataPostInit invokes TmThreadContinueThreads().
+ This should be handle at the API level. */
+ usleep(100);
+
+ SuricataPostInit();
/* Shutdown engine. */
SuricataShutdown();
runmode-erf-dag.h \
runmode-erf-file.h \
runmode-ipfw.h \
+ runmode-lib.h \
runmode-netmap.h \
runmode-nflog.h \
runmode-nfq.h \
source-erf-dag.h \
source-erf-file.h \
source-ipfw.h \
+ source-lib.h \
source-netmap.h \
source-nflog.h \
source-nfq.h \
runmode-erf-dag.c \
runmode-erf-file.c \
runmode-ipfw.c \
+ runmode-lib.c \
runmode-netmap.c \
runmode-nflog.c \
runmode-nfq.c \
source-erf-dag.c \
source-erf-file.c \
source-ipfw.c \
+ source-lib.c \
source-netmap.c \
source-nflog.c \
source-nfq.c \
--- /dev/null
+/* Copyright (C) 2023-2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** \file
+ *
+ * \author Angelo Mirabella <angelo.mirabella@broadcom.com>
+ *
+ * Library runmode.
+ */
+#include "suricata-common.h"
+#include "runmode-lib.h"
+#include "runmodes.h"
+#include "tm-threads.h"
+#include "util-device.h"
+
+static int g_thread_id = 0;
+
+/** \brief register runmodes for suricata as a library */
+void RunModeIdsLibRegister(void)
+{
+ RunModeRegisterNewRunMode(RUNMODE_LIB, "offline", "Library offline mode (pcap replaying)",
+ RunModeIdsLibOffline, NULL);
+ RunModeRegisterNewRunMode(RUNMODE_LIB, "live", "Library live mode", RunModeIdsLibLive, NULL);
+ return;
+}
+
+/** \brief runmode for offline packet processing (pcap files) */
+int RunModeIdsLibOffline(void)
+{
+ TimeModeSetOffline();
+
+ return 0;
+}
+
+/** \brief runmode for live packet processing */
+int RunModeIdsLibLive(void)
+{
+ TimeModeSetLive();
+
+ return 0;
+}
+
+const char *RunModeLibGetDefaultMode(void)
+{
+ return "live";
+}
+
+/** \brief create a "fake" worker thread in charge of processing the packets.
+ *
+ * This method just creates a context representing the worker, which is handled from the library
+ * client. No actual thread (pthread_t) is created.
+ *
+ * \return Pointer to ThreadVars structure representing the worker thread */
+void *RunModeCreateWorker(void)
+{
+ char tname[TM_THREAD_NAME_MAX];
+ TmModule *tm_module = NULL;
+ snprintf(tname, sizeof(tname), "%s#%02d", thread_name_workers, ++g_thread_id);
+
+ ThreadVars *tv = TmThreadCreatePacketHandler(
+ tname, "packetpool", "packetpool", "packetpool", "packetpool", "lib");
+ if (tv == NULL) {
+ SCLogError("TmThreadsCreate failed");
+ return NULL;
+ }
+
+ tm_module = TmModuleGetByName("DecodeLib");
+ if (tm_module == NULL) {
+ SCLogError("TmModuleGetByName DecodeLib failed");
+ return NULL;
+ }
+ TmSlotSetFuncAppend(tv, tm_module, NULL);
+
+ tm_module = TmModuleGetByName("FlowWorker");
+ if (tm_module == NULL) {
+ SCLogError("TmModuleGetByName for FlowWorker failed");
+ return NULL;
+ }
+ TmSlotSetFuncAppend(tv, tm_module, NULL);
+
+ TmThreadAppend(tv, tv->type);
+
+ return tv;
+}
+
+/** \brief start the "fake" worker.
+ *
+ * This method performs all the initialization tasks.
+ */
+int RunModeSpawnWorker(void *td)
+{
+ ThreadVars *tv = (ThreadVars *)td;
+
+ if (TmThreadLibSpawn(tv) != TM_ECODE_OK) {
+ SCLogError("TmThreadLibSpawn failed");
+ return -1;
+ }
+
+ TmThreadsSetFlag(tv, THV_RUNNING);
+ return 0;
+}
+
+/** \brief destroy a worker thread */
+void RunModeDestroyWorker(void *td)
+{
+ ThreadVars *tv = (ThreadVars *)td;
+ TmSlot *s = tv->tm_slots;
+ TmEcode r;
+ TmSlot *slot = NULL;
+
+ StatsSyncCounters(tv);
+
+ TmThreadsSetFlag(tv, THV_FLOW_LOOP);
+
+ /* process all pseudo packets the flow timeout may throw at us */
+ TmThreadTimeoutLoop(tv, s);
+
+ TmThreadsSetFlag(tv, THV_RUNNING_DONE);
+ TmThreadWaitForFlag(tv, THV_DEINIT);
+
+ PacketPoolDestroy();
+
+ for (slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->SlotThreadExitPrintStats != NULL) {
+ slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
+ }
+
+ if (slot->SlotThreadDeinit != NULL) {
+ r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
+ if (r != TM_ECODE_OK) {
+ break;
+ }
+ }
+ }
+
+ tv->stream_pq = NULL;
+ --g_thread_id;
+ SCLogDebug("%s ending", tv->name);
+ TmThreadsSetFlag(tv, THV_CLOSED);
+}
--- /dev/null
+/* Copyright (C) 2023-2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** \file
+ *
+ * \author Angelo Mirabella <angelo.mirabella@broadcom.com>
+ *
+ * Library runmode.
+ */
+
+#ifndef SURICATA_RUNMODE_LIB_H
+#define SURICATA_RUNMODE_LIB_H
+
+/** \brief register runmodes for suricata as a library */
+void RunModeIdsLibRegister(void);
+
+/** \brief runmode for live packet processing */
+int RunModeIdsLibLive(void);
+
+/** \brief runmode for offline packet processing (pcap files) */
+int RunModeIdsLibOffline(void);
+
+/** \brief runmode default mode (live) */
+const char *RunModeLibGetDefaultMode(void);
+
+/** \brief create a "fake" worker thread in charge of processing the packets.
+ *
+ * This method just creates a context representing the worker, which is handled from the library
+ * client. No actual thread (pthread_t) is created.
+ *
+ * \return Pointer to ThreadVars structure representing the worker thread */
+void *RunModeCreateWorker(void);
+
+/** \brief start the "fake" worker.
+ *
+ * This method performs all the initialization tasks.
+ */
+int RunModeSpawnWorker(void *);
+
+/** \brief destroy a worker thread */
+void RunModeDestroyWorker(void *);
+
+#endif /* SURICATA_RUNMODE_LIB_H */
#include "runmode-erf-dag.h"
#include "runmode-erf-file.h"
#include "runmode-ipfw.h"
+#include "runmode-lib.h"
#include "runmode-netmap.h"
#include "runmode-nflog.h"
#include "runmode-nfq.h"
#else
return "DPDK(DISABLED)";
#endif
+ case RUNMODE_LIB:
+ return "LIB";
default:
FatalError("Unknown runtime mode. Aborting");
RunModeUnixSocketRegister();
RunModeIpsWinDivertRegister();
RunModeDpdkRegister();
+ RunModeIdsLibRegister();
#ifdef UNITTESTS
UtRunModeRegister();
#endif
custom_mode = RunModeDpdkGetDefaultMode();
break;
#endif
+ case RUNMODE_LIB:
+ custom_mode = RunModeLibGetDefaultMode();
+ break;
default:
return NULL;
}
RUNMODE_AFXDP_DEV,
RUNMODE_NETMAP,
RUNMODE_DPDK,
+ RUNMODE_LIB,
RUNMODE_UNITTEST,
RUNMODE_UNIX_SOCKET,
RUNMODE_WINDIVERT,
--- /dev/null
+/* Copyright (C) 2023-2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** \file
+ *
+ * \author Angelo Mirabella <angelo.mirabella@broadcom.com>
+ *
+ * LIB packet and stream decoding support
+ *
+ */
+
+#include "suricata-common.h"
+#include "source-lib.h"
+#include "util-device.h"
+
+static TmEcode DecodeLibThreadInit(ThreadVars *tv, const void *initdata, void **data);
+static TmEcode DecodeLibThreadDeinit(ThreadVars *tv, void *data);
+static TmEcode DecodeLib(ThreadVars *tv, Packet *p, void *data);
+
+/* Set time to the first packet timestamp when replaying a PCAP. */
+static bool time_set = false;
+
+/** \brief register a "Decode" module for suricata as a library.
+ *
+ * The "Decode" module is the first module invoked when processing a packet */
+void TmModuleDecodeLibRegister(void)
+{
+ tmm_modules[TMM_DECODELIB].name = "DecodeLib";
+ tmm_modules[TMM_DECODELIB].ThreadInit = DecodeLibThreadInit;
+ tmm_modules[TMM_DECODELIB].Func = DecodeLib;
+ tmm_modules[TMM_DECODELIB].ThreadExitPrintStats = NULL;
+ tmm_modules[TMM_DECODELIB].ThreadDeinit = DecodeLibThreadDeinit;
+ tmm_modules[TMM_DECODELIB].cap_flags = 0;
+ tmm_modules[TMM_DECODELIB].flags = TM_FLAG_DECODE_TM;
+}
+
+/** \brief initialize the "Decode" module.
+ *
+ * \param tv Pointer to the per-thread structure.
+ * \param initdata Pointer to initialization context.
+ * \param data Pointer to the initialized context.
+ * \return Error code.
+ */
+TmEcode DecodeLibThreadInit(ThreadVars *tv, const void *initdata, void **data)
+{
+ SCEnter();
+ DecodeThreadVars *dtv = NULL;
+
+ dtv = DecodeThreadVarsAlloc(tv);
+
+ if (dtv == NULL)
+ SCReturnInt(TM_ECODE_FAILED);
+
+ DecodeRegisterPerfCounters(dtv, tv);
+
+ *data = (void *)dtv;
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/** \brief deinitialize the "Decode" module.
+ *
+ * \param tv Pointer to the per-thread structure.
+ * \param data Pointer to the context.
+ * \return Error code.
+ */
+TmEcode DecodeLibThreadDeinit(ThreadVars *tv, void *data)
+{
+ if (data != NULL)
+ DecodeThreadVarsFree(tv, data);
+
+ time_set = false;
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/** \brief main decoding function.
+ *
+ * This method receives a packet and tries to identify layer 2 to 4 layers.
+ *
+ * \param tv Pointer to the per-thread structure.
+ * \param p Pointer to the packet.
+ * \param data Pointer to the context.
+ * \return Error code.
+ */
+TmEcode DecodeLib(ThreadVars *tv, Packet *p, void *data)
+{
+ SCEnter();
+ DecodeThreadVars *dtv = (DecodeThreadVars *)data;
+
+ BUG_ON(PKT_IS_PSEUDOPKT(p));
+
+ /* update counters */
+ DecodeUpdatePacketCounters(tv, dtv, p);
+
+ /* If suri has set vlan during reading, we increase vlan counter */
+ if (p->vlan_idx) {
+ StatsIncr(tv, dtv->counter_vlan);
+ }
+
+ /* call the decoder */
+ DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
+
+ PacketDecodeFinalize(tv, dtv, p);
+
+ SCReturnInt(TM_ECODE_OK);
+}
+
+/** \brief process a single packet.
+ *
+ * \param tv Pointer to the per-thread structure.
+ * \param data Pointer to the raw packet.
+ * \param datalink Datalink type.
+ * \param ts Timeval structure.
+ * \param len Packet length.
+ * \param tenant_id Tenant id of the detection engine to use.
+ * \param flags Packet flags (packet checksum, rule profiling...).
+ * \param iface Sniffing interface this packet comes from (can be NULL).
+ * \return Error code.
+ */
+int TmModuleLibHandlePacket(ThreadVars *tv, const uint8_t *data, int datalink, struct timeval ts,
+ uint32_t len, uint32_t tenant_id, uint32_t flags, const char *iface)
+{
+
+ /* If the packet is NULL, consider it as a read timeout. */
+ if (data == NULL) {
+ TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
+ TmThreadsCaptureHandleTimeout(tv, NULL);
+ SCReturnInt(TM_ECODE_OK);
+ }
+
+ Packet *p = PacketGetFromQueueOrAlloc();
+ if (unlikely(p == NULL)) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ /* If we are processing a PCAP and it is the first packet we need to set the timestamp. */
+ SCTime_t timestamp = SCTIME_FROM_TIMEVAL(&ts);
+ if (!time_set && !TimeModeIsLive()) {
+ TmThreadsInitThreadsTimestamp(timestamp);
+ time_set = true;
+ }
+
+ PKT_SET_SRC(p, PKT_SRC_WIRE);
+ p->ts = timestamp;
+ p->datalink = datalink;
+ p->tenant_id = tenant_id;
+ p->flags |= flags;
+
+ /* Set the sniffing interface. */
+ if (iface) {
+ p->livedev = LiveGetDevice(iface);
+ }
+
+ if (PacketSetData(p, data, len) == -1) {
+ TmqhOutputPacketpool(tv, p);
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", GET_PKT_LEN(p), p, GET_PKT_DATA(p));
+
+ if (TmThreadsSlotProcessPkt(tv, tv->tm_slots, p) != TM_ECODE_OK) {
+ SCReturnInt(TM_ECODE_FAILED);
+ }
+
+ SCReturnInt(TM_ECODE_OK);
+}
--- /dev/null
+/* Copyright (C) 2023-2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/** \file
+ *
+ * \author Angelo Mirabella <angelo.mirabella@broadcom.com>
+ *
+ * LIB packet and stream decoding support
+ *
+ */
+
+#ifndef SURICATA_SOURCE_LIB_H
+#define SURICATA_SOURCE_LIB_H
+
+#include "tm-threads.h"
+
+/** \brief register a "Decode" module for suricata as a library.
+ *
+ * The "Decode" module is the first module invoked when processing a packet */
+void TmModuleDecodeLibRegister(void);
+
+/** \brief process a single packet.
+ *
+ * \param tv Pointer to the per-thread structure.
+ * \param data Pointer to the raw packet.
+ * \param datalink Datalink type.
+ * \param ts Timeval structure.
+ * \param len Packet length.
+ * \param tenant_id Tenant id of the detection engine to use.
+ * \param flags Packet flags (packet checksum, rule profiling...).
+ * \param iface Sniffing interface this packet comes from (can be NULL).
+ * \return Error code.
+ */
+int TmModuleLibHandlePacket(ThreadVars *tv, const uint8_t *data, int datalink, struct timeval ts,
+ uint32_t len, uint32_t tenant_id, uint32_t flags, const char *iface);
+
+#endif /* SURICATA_SOURCE_LIB_H */
#include "source-nfq-prototypes.h"
#include "source-nflog.h"
#include "source-ipfw.h"
+#include "source-lib.h"
#include "source-pcap.h"
#include "source-pcap-file.h"
#include "source-pcap-file-helper.h"
/* Dpdk */
TmModuleReceiveDPDKRegister();
TmModuleDecodeDPDKRegister();
+
+ /* Library */
+ TmModuleDecodeLibRegister();
}
TmEcode SCLoadYamlConfig(void)
}
SCPledge();
}
+
+void SuricataSetLibRunmode(void)
+{
+ suricata.run_mode = RUNMODE_LIB;
+}
const char *GetProgramVersion(void);
+/* Library only methods. */
+void SuricataSetLibRunmode(void);
+
#endif /* SURICATA_SURICATA_H */
CASE_CODE(TMM_DECODEPCAPFILE);
CASE_CODE(TMM_RECEIVEDPDK);
CASE_CODE(TMM_DECODEDPDK);
+ CASE_CODE(TMM_DECODELIB);
CASE_CODE (TMM_RECEIVEPLUGIN);
CASE_CODE (TMM_DECODEPLUGIN);
CASE_CODE (TMM_RESPONDREJECT);
TMM_RECEIVEWINDIVERT,
TMM_VERDICTWINDIVERT,
TMM_DECODEWINDIVERT,
+ TMM_DECODELIB,
TMM_FLOWMANAGER,
TMM_FLOWRECYCLER,
* is run until the flow engine kills the thread and the queue is
* empty.
*/
-static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
+int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
{
TmSlot *fw_slot = tv->tm_flowworker;
int r = TM_ECODE_OK;
return true;
}
+static void *TmThreadsLib(void *td)
+{
+ ThreadVars *tv = (ThreadVars *)td;
+ TmSlot *s = tv->tm_slots;
+ TmEcode r = TM_ECODE_OK;
+ TmSlot *slot = NULL;
+
+ /* Set the thread name */
+ SCSetThreadName(tv->name);
+
+ if (tv->thread_setup_flags != 0)
+ TmThreadSetupOptions(tv);
+
+ /* Drop the capabilities for this thread */
+ SCDropCaps(tv);
+
+ PacketPoolInit();
+
+ /* check if we are setup properly */
+ if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
+ SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
+ " tmqh_out=%p",
+ s, tv->tmqh_in, tv->tmqh_out);
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+ return NULL;
+ }
+
+ for (slot = s; slot != NULL; slot = slot->slot_next) {
+ if (slot->SlotThreadInit != NULL) {
+ void *slot_data = NULL;
+ r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
+ if (r != TM_ECODE_OK) {
+ if (r == TM_ECODE_DONE) {
+ EngineDone();
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
+ goto error;
+ } else {
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+ goto error;
+ }
+ }
+ (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
+ }
+
+ /* if the flowworker module is the first, get the threads input queue */
+ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
+ tv->stream_pq = tv->inq->pq;
+ tv->tm_flowworker = slot;
+ SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+ tv->flow_queue = FlowQueueNew();
+ if (tv->flow_queue == NULL) {
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+ return NULL;
+ }
+ /* setup a queue */
+ } else if (slot->tm_id == TMM_FLOWWORKER) {
+ tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+ if (tv->stream_pq_local == NULL)
+ FatalError("failed to alloc PacketQueue");
+ SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+ tv->stream_pq = tv->stream_pq_local;
+ tv->tm_flowworker = slot;
+ SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
+ tv->flow_queue = FlowQueueNew();
+ if (tv->flow_queue == NULL) {
+ TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+ return NULL;
+ }
+ }
+ }
+ StatsSetupPrivate(tv);
+
+ TmThreadsSetFlag(tv, THV_INIT_DONE);
+
+ TmThreadsWaitForUnpause(tv);
+
+ return NULL;
+
+error:
+ tv->stream_pq = NULL;
+ return NULL;
+}
+
static void *TmThreadsSlotVar(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "command") == 0) {
tv->tm_func = TmThreadsManagement;
+ } else if (strcmp(name, "lib") == 0) {
+ tv->tm_func = TmThreadsLib;
} else if (strcmp(name, "custom") == 0) {
if (fn_p == NULL)
goto error;
return TM_ECODE_OK;
}
+/**
+ * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
+ *
+ * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
+ */
+TmEcode TmThreadLibSpawn(ThreadVars *tv)
+{
+ if (tv->tm_func == NULL) {
+ printf("ERROR: no thread function set\n");
+ return TM_ECODE_FAILED;
+ }
+
+ tv->tm_func((void *)tv);
+
+ TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
+
+ return TM_ECODE_OK;
+}
+
/**
* \brief Initializes the mutex and condition variables for this TV
*
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
int mucond);
TmEcode TmThreadSpawn(ThreadVars *);
+TmEcode TmThreadLibSpawn(ThreadVars *);
+int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s);
void TmThreadKillThreadsFamily(int family);
void TmThreadKillThreads(void);
void TmThreadClearThreadsFamily(int family);