continue;
}
+ /* before grabbing the flow lock, make sure we have at least
+ * 3 packets in the pool */
+ PacketPoolWaitForN(3);
+
FLOWLOCK_WRLOCK(f);
Flow *next_flow = f->hprev;
for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
+ /* before grabbing the row lock, make sure we have at least
+ * 9 packets in the pool */
+ PacketPoolWaitForN(9);
+
if (FBLOCK_TRYLOCK(fb) != 0)
continue;
static int FlowMgrTest05 (void)
{
int result = 0;
+ extern intmax_t max_pending_packets;
+ max_pending_packets = 128;
+ PacketPoolInit();
FlowInitConfig(FLOW_QUIET);
FlowConfig backup;
memcpy(&backup, &flow_config, sizeof(FlowConfig));
memcpy(&flow_config, &backup, sizeof(FlowConfig));
FlowShutdown();
-
+ PacketPoolDestroy();
return result;
}
#endif /* UNITTESTS */
for (idx = 0; idx < flow_config.hash_size; idx++) {
FlowBucket *fb = &flow_hash[idx];
+ PacketPoolWaitForN(9);
FBLOCK_LOCK(fb);
/* get the topmost flow from the QUEUE */
/* we need to loop through all the flows in the queue */
while (f != NULL) {
+ PacketPoolWaitForN(3);
+
FLOWLOCK_WRLOCK(f);
/* Get the tcp session for the flow */
cc_barrier();
}
+/** \brief Wait until we have the requested ammount of packets in the pool
+ *
+ * In some cases waiting for packets is undesirable. Especially when
+ * a wait would happen under a lock of some kind, other parts of the
+ * engine could have to wait.
+ *
+ * This function only returns when at least N packets are in our pool.
+ *
+ * \param n number of packets needed
+ */
+void PacketPoolWaitForN(int n)
+{
+ PktPool *my_pool = GetThreadPacketPool();
+ Packet *p = NULL;
+
+ while (1) {
+ int i = 0;
+ PacketPoolWait();
+
+ /* count packets in our stack */
+ p = my_pool->head;
+ while (p != NULL) {
+ if (++i == n)
+ return;
+
+ p = p->next;
+ }
+
+ /* continue counting in the return stack */
+ if (my_pool->return_stack.head != NULL) {
+ SCMutexLock(&my_pool->return_stack.mutex);
+ p = my_pool->return_stack.head;
+ while (p != NULL) {
+ if (++i == n) {
+ SCMutexUnlock(&my_pool->return_stack.mutex);
+ return;
+ }
+ p = p->next;
+ }
+ SCMutexUnlock(&my_pool->return_stack.mutex);
+
+ /* or signal that we need packets and wait */
+ } else {
+ SCMutexLock(&my_pool->return_stack.mutex);
+ SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1);
+ SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
+ SCMutexUnlock(&my_pool->return_stack.mutex);
+ }
+ }
+}
+
/** \brief a initialized packet
*
* \warning Use *only* at init, not at packet runtime
void TmqhPacketpoolRegister(void);
Packet *PacketPoolGetPacket(void);
void PacketPoolWait(void);
+void PacketPoolWaitForN(int n);
void PacketPoolReturnPacket(Packet *p);
void PacketPoolInit(void);
void PacketPoolInitEmpty(void);