]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
af-packet: synchronize reading start
authorEric Leblond <eric@regit.org>
Tue, 11 Mar 2014 08:48:34 +0000 (09:48 +0100)
committerEric Leblond <eric@regit.org>
Mon, 17 Mar 2014 23:08:46 +0000 (00:08 +0100)
This patch is updating af-packet to discard packets that have been
sent to a socket before all socket in a fanout group have been setup.
Without this, there is no way to assure that all packets for a single
flow will be treated by the same thread.

Tests have been done on a system with an ixgbe network card. When using
'cluster_flow' load balancing and disactivating receive hash on the iface:
 ethtool -K IFACE rxhash off
then suricata is behaving as expected and all packets for a single flow
are treated by the same thread.

For some unknown reason, this is not the case when using cluster_cpu. It
seems that in that case the load balancing is not perfect on the card side.

The rxhash offloading has a direct impact on the cluster_flow load balancing
because load balancing is done by using a generic hash key attached to
each skb. This hash can be computed by the network card or can be
computed by the kernel. In the xase of a ixgbe network card, it seems there
is some issue with the hash key for TCP. This explains why it is necessary to
remove the rxhash offloading to have a correct behavior. This could also
explain why cluster_cpu is currently failing because the card is using the
same hash key computation to do the RSS queues load balancing.

src/source-af-packet.c

index 51ed693e41f474ab2ebdfbd26d2e7ea4b5c2cd91..654145a6d706d070efa28e1ee895ffda11d7d7a8 100644 (file)
@@ -452,6 +452,11 @@ void AFPPeersListReachedInc()
     }
 }
 
+static int AFPPeersListStarted()
+{
+    return !peerslist.turn;
+}
+
 /**
  * \brief Clean the global peers list.
  */
@@ -940,6 +945,107 @@ void AFPSwitchState(AFPThreadVars *ptv, int state)
     }
 }
 
+static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv)
+{
+    struct sockaddr_ll from;
+    struct iovec iov;
+    struct msghdr msg;
+    struct timeval ts;
+    union {
+        struct cmsghdr cmsg;
+        char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))];
+    } cmsg_buf;
+
+
+    if (unlikely(suricata_ctl_flags != 0)) {
+        return 1;
+    }
+
+    msg.msg_name = &from;
+    msg.msg_namelen = sizeof(from);
+    msg.msg_iov = &iov;
+    msg.msg_iovlen = 1;
+    msg.msg_control = &cmsg_buf;
+    msg.msg_controllen = sizeof(cmsg_buf);
+    msg.msg_flags = 0;
+
+    iov.iov_len = ptv->datalen;
+    iov.iov_base = ptv->data;
+
+    recvmsg(ptv->socket, &msg, MSG_TRUNC);
+
+    if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) {
+        /* FIXME */
+        return -1;
+    }
+
+    if ((ts.tv_sec > synctv->tv_sec) ||
+        (ts.tv_sec >= synctv->tv_sec &&
+         ts.tv_usec > synctv->tv_usec)) {
+        return 1;
+    }
+    return 0;
+}
+
+static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv)
+{
+    union thdr h;
+
+    if (unlikely(suricata_ctl_flags != 0)) {
+        return 1;
+    }
+
+    /* Read packet from ring */
+    h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]);
+    if (h.raw == NULL) {
+        return -1;
+    }
+
+    if (((time_t)h.h2->tp_sec > synctv->tv_sec) ||
+        ((time_t)h.h2->tp_sec == synctv->tv_sec &&
+        (suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) {
+        return 1;
+    }
+
+    h.h2->tp_status = TP_STATUS_KERNEL;
+    if (++ptv->frame_offset >= ptv->req.tp_frame_nr) {
+        ptv->frame_offset = 0;
+    }
+
+
+    return 0;
+}
+
+static int AFPSynchronizeStart(AFPThreadVars *ptv)
+{
+    int r;
+    struct timeval synctv;
+
+    /* Set timeval to end of the world */
+    synctv.tv_sec = 0xffffffff;
+    synctv.tv_usec = 0xffffffff;
+
+    while (1) {
+        if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) {
+            gettimeofday(&synctv, NULL);
+        }
+        if (ptv->flags & AFP_RING_MODE) {
+            r = AFPReadAndDiscardFromRing(ptv, &synctv);
+        } else {
+            r = AFPReadAndDiscard(ptv, &synctv);
+        }
+        SCLogDebug("Discarding on %s", ptv->tv->name);
+        switch (r) {
+            case 1:
+                SCLogInfo("Starting to read on %s", ptv->tv->name);
+                return 1;
+            case -1:
+                return r;
+        }
+    }
+    return 1;
+}
+
 /**
  * \brief Try to reopen socket
  *
@@ -1001,6 +1107,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
     }
     if (ptv->afp_state == AFP_STATE_UP) {
         SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket);
+        AFPSynchronizeStart(ptv);
     }
 
     fds.fd = ptv->socket;