]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#42,!103] Initial receiver thread and packet queuing
authorThomas Markwalder <tmark@isc.org>
Mon, 29 Oct 2018 15:27:12 +0000 (11:27 -0400)
committerThomas Markwalder <tmark@isc.org>
Mon, 29 Oct 2018 15:27:12 +0000 (11:27 -0400)
New files:
    src/lib/dhcp -
    packet_queue.h - defines packet queuing template classes
    socket_info.h - contains existing class extracted iface_mgr.h
    tests/packet_queue4_unittest.cc
    tests/packet_queue6_unittest.cc

src/lib/dhcp/iface_mgr.*
IfaceMgr:: - new functions
    - receiveDHCP<4/6>Packets() - thread worker function which
    monitors interface sockets, enqueues packets as they are read

    - receiveDHCP<4/6>Packet() - reads a single packet from a socket

    - startDHCPReceiver(const uint16_t family) - runs
    receiveDHCP<4/6?appropriate worker function in a thread

    - stopReceiver() - stops the receiver thread

    - setPacketQueue<4/6> - replaces the default packet queue instance

    receiveDHCP<4/6>() - modified to monitor receiver watch
    socekts rather than interface sockets.  Dequeue packets
    from packet queue.

src/lib/dhcp/tests/iface_mgr_unittest.cc
    TEST_F(IfaceMgrTest, packetQueue4)
    TEST_F(IfaceMgrTest, packetQueue6)

src/lib/dhcpsrv/cfg_iface.cc
    CfgIface::openSockets() - starts DHCP receiver
    CfgIface::closeSockets() - stops DHCP receiver

16 files changed:
src/lib/dhcp/Makefile.am
src/lib/dhcp/iface_mgr.cc
src/lib/dhcp/iface_mgr.h
src/lib/dhcp/packet_queue.h [new file with mode: 0644]
src/lib/dhcp/socket_info.h [new file with mode: 0644]
src/lib/dhcp/tests/Makefile.am
src/lib/dhcp/tests/iface_mgr_test_config.cc
src/lib/dhcp/tests/iface_mgr_test_config.h
src/lib/dhcp/tests/iface_mgr_unittest.cc
src/lib/dhcp/tests/packet_queue4_unittest.cc [new file with mode: 0644]
src/lib/dhcp/tests/packet_queue6_unittest.cc [new file with mode: 0644]
src/lib/dhcpsrv/cfg_iface.cc
src/lib/dhcpsrv/cfg_iface.h
src/lib/dhcpsrv/parsers/ifaces_config_parser.cc
src/lib/dhcpsrv/tests/database_connection_unittest.cc
src/lib/util/threads/thread.h

index bc354b45e245dda4dd83c51c1388ae470cc210ae..54dc2d4500efc8e80af2b2386743eedb06c85c05 100644 (file)
@@ -43,6 +43,7 @@ libkea_dhcp___la_SOURCES += option_space_container.h
 libkea_dhcp___la_SOURCES += option_string.cc option_string.h
 libkea_dhcp___la_SOURCES += option_vendor.cc option_vendor.h
 libkea_dhcp___la_SOURCES += option_vendor_class.cc option_vendor_class.h
+libkea_dhcp___la_SOURCES += packet_queue.h 
 libkea_dhcp___la_SOURCES += pkt.cc pkt.h
 libkea_dhcp___la_SOURCES += pkt4.cc pkt4.h
 libkea_dhcp___la_SOURCES += pkt4o6.cc pkt4o6.h
@@ -51,6 +52,7 @@ libkea_dhcp___la_SOURCES += pkt_filter.h pkt_filter.cc
 libkea_dhcp___la_SOURCES += pkt_filter6.h pkt_filter6.cc
 libkea_dhcp___la_SOURCES += pkt_filter_inet.cc pkt_filter_inet.h
 libkea_dhcp___la_SOURCES += pkt_filter_inet6.cc pkt_filter_inet6.h
+libkea_dhcp___la_SOURCES += socket_info.h 
 
 # Utilize Linux Packet Filtering on Linux.
 if OS_LINUX
index 7d06326c0cf610952a369e3fa64b76cbb20ed34a..b9309172f9b46684ba5ff8e0844c88691970969e 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <boost/foreach.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
 
 #include <cstring>
 #include <errno.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <string.h>
+#include <sys/ioctl.h>
 #include <sys/select.h>
 
+#ifndef FD_COPY
+#define FD_COPY(orig, copy) \
+    do { \
+        memmove(copy, orig, sizeof(fd_set)); \
+    } while (0)
+#endif
+
 using namespace std;
 using namespace isc::asiolink;
 using namespace isc::util;
+using namespace isc::util::thread;
+using namespace isc::util::io;
 using namespace isc::util::io::internal;
 
 namespace isc {
@@ -173,16 +184,16 @@ IfaceMgr::IfaceMgr()
      packet_filter_(new PktFilterInet()),
      packet_filter6_(new PktFilterInet6()),
      test_mode_(false),
-     allow_loopback_(false)
-{
+     allow_loopback_(false),
+     receiver_error_("no error"),
+     packet_queue4_(new PacketQueueRing4()),
+     packet_queue6_(new PacketQueueRing6()) {
 
     try {
+
         // required for sending/receiving packets
         // let's keep it in front, just in case someone
         // wants to send anything during initialization
-
-        // control_buf_ = boost::scoped_array<char>();
-
         detectIfaces();
 
     } catch (const std::exception& ex) {
@@ -269,17 +280,33 @@ void IfaceMgr::closeSockets() {
     }
 }
 
-void
-IfaceMgr::closeSockets(const uint16_t family) {
-    BOOST_FOREACH(IfacePtr iface, ifaces_) {
-        iface->closeSockets(family);
+void IfaceMgr::stopReceiver() {
+    if (receiver_thread_) {
+        terminate_watch_.markReady();
+        receiver_thread_->wait();
+        receiver_thread_.reset();
+        error_watch_.clearReady();
     }
+    receiver_error_ = "no error";
+    if (packet_queue4_) {
+        packet_queue4_->clear();
+    }
+
+    if (packet_queue6_) {
+        packet_queue6_->clear();
+    }
+}
+
+void
+IfaceMgr::closeSockets(const uint16_t) {
+    isc_throw(NotImplemented, "closeSockets(family) is obsolete");
 }
 
 IfaceMgr::~IfaceMgr() {
     // control_buf_ is deleted automatically (scoped_ptr)
     control_buf_len_ = 0;
 
+    stopReceiver();
     closeSockets();
 }
 
@@ -636,6 +663,25 @@ IfaceMgr::openSockets6(const uint16_t port,
     return (count > 0);
 }
 
+void
+IfaceMgr::startDHCPReceiver(const uint16_t family) {
+    if (receiver_thread_) {
+        isc_throw(Unexpected, "a receiver thread already exits");
+    }
+
+    switch (family) {
+    case AF_INET:
+        receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+        break;
+    case AF_INET6:
+        receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
+        break;
+    default:
+        isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
+        break;
+    }
+}
+
 void
 IfaceMgr::printIfaces(std::ostream& out /*= std::cout*/) {
     BOOST_FOREACH(IfacePtr iface, ifaces_) {
@@ -893,31 +939,11 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         isc_throw(BadValue, "fractional timeout must be shorter than"
                   " one million microseconds");
     }
-    boost::scoped_ptr<SocketInfo> candidate;
-    IfacePtr iface;
     fd_set sockets;
     int maxfd = 0;
 
     FD_ZERO(&sockets);
 
-    /// @todo: marginal performance optimization. We could create the set once
-    /// and then use its copy for select(). Please note that select() modifies
-    /// provided set to indicated which sockets have something to read.
-    BOOST_FOREACH(iface, ifaces_) {
-        BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
-            // Only deal with IPv4 addresses.
-            if (s.addr_.isV4()) {
-
-                // Add this socket to listening set
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
-            }
-        }
-    }
-
     // if there are any callbacks for external sockets registered...
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
@@ -928,16 +954,31 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         }
     }
 
+    // add watch sockets.
+    FD_SET(receive_watch_.getSelectFd(), &sockets);
+    if (maxfd < receive_watch_.getSelectFd()) {
+        maxfd = receive_watch_.getSelectFd();
+    }
+    FD_SET(error_watch_.getSelectFd(), &sockets);
+    if (maxfd < error_watch_.getSelectFd()) {
+        maxfd = error_watch_.getSelectFd();
+    }
+
     struct timeval select_timeout;
-    select_timeout.tv_sec = timeout_sec;
-    select_timeout.tv_usec = timeout_usec;
+    if (packet_queue4_->empty()) {
+        select_timeout.tv_sec = timeout_sec;
+        select_timeout.tv_usec = timeout_usec;
+    } else {
+        select_timeout.tv_sec = 0;
+        select_timeout.tv_usec = 0;
+    }
 
     // zero out the errno to be safe
     errno = 0;
 
     int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
 
-    if (result == 0) {
+    if ((result == 0) && packet_queue4_->empty()) {
         // nothing received and timeout has been reached
         return (Pkt4Ptr()); // NULL
 
@@ -956,6 +997,13 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         }
     }
 
+    // Check errors.
+    if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
+        string msg = receiver_error_;
+        error_watch_.clearReady();
+        isc_throw(SocketReadError, msg);
+    }
+
     // Let's find out which socket has the data
     BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
         if (!FD_ISSET(s.socket_, &sockets)) {
@@ -974,26 +1022,16 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         return (Pkt4Ptr());
     }
 
-    // Let's find out which interface/socket has the data
-    BOOST_FOREACH(iface, ifaces_) {
-        BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-            if (FD_ISSET(s.sockfd_, &sockets)) {
-                candidate.reset(new SocketInfo(s));
-                break;
-            }
-        }
-        if (candidate) {
-            break;
+    // Protected packet queue access.
+    {
+        Mutex::Locker lock(receiver_lock_);
+        Pkt4Ptr pkt = packet_queue4_->dequeuePacket();
+        if (!pkt) {
+            receive_watch_.clearReady();
         }
-    }
 
-    if (!candidate) {
-        isc_throw(SocketReadError, "received data over unknown socket");
+        return (pkt);
     }
-
-    // Now we have a socket, let's get some data from it!
-    // Assuming that packet filter is not NULL, because its modifier checks it.
-    return (packet_filter_->receive(*iface, *candidate));
 }
 
 Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
@@ -1003,30 +1041,11 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
                   " one million microseconds");
     }
 
-    boost::scoped_ptr<SocketInfo> candidate;
     fd_set sockets;
     int maxfd = 0;
 
     FD_ZERO(&sockets);
 
-    /// @todo: marginal performance optimization. We could create the set once
-    /// and then use its copy for select(). Please note that select() modifies
-    /// provided set to indicated which sockets have something to read.
-    BOOST_FOREACH(IfacePtr iface, ifaces_) {
-
-        BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-            // Only deal with IPv6 addresses.
-            if (s.addr_.isV6()) {
-
-                // Add this socket to listening set
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
-            }
-        }
-    }
-
     // if there are any callbacks for external sockets registered...
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
@@ -1038,16 +1057,31 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         }
     }
 
+    // add watch sockets.
+    FD_SET(receive_watch_.getSelectFd(), &sockets);
+    if (maxfd < receive_watch_.getSelectFd()) {
+        maxfd = receive_watch_.getSelectFd();
+    }
+    FD_SET(error_watch_.getSelectFd(), &sockets);
+    if (maxfd < error_watch_.getSelectFd()) {
+        maxfd = error_watch_.getSelectFd();
+    }
+
     struct timeval select_timeout;
-    select_timeout.tv_sec = timeout_sec;
-    select_timeout.tv_usec = timeout_usec;
+    if (packet_queue6_->empty()) {
+        select_timeout.tv_sec = timeout_sec;
+        select_timeout.tv_usec = timeout_usec;
+    } else {
+        select_timeout.tv_sec = 0;
+        select_timeout.tv_usec = 0;
+    }
 
     // zero out the errno to be safe
     errno = 0;
 
     int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
 
-    if (result == 0) {
+    if ((result == 0) && packet_queue6_->empty()) {
         // nothing received and timeout has been reached
         return (Pkt6Ptr()); // NULL
 
@@ -1066,6 +1100,13 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         }
     }
 
+    // Check errors.
+    if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
+        string msg = receiver_error_;
+        error_watch_.clearReady();
+        isc_throw(SocketReadError, msg);
+    }
+
     // Let's find out which socket has the data
     BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
         if (!FD_ISSET(s.socket_, &sockets)) {
@@ -1084,24 +1125,245 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
         return (Pkt6Ptr());
     }
 
-    // Let's find out which interface/socket has the data
-    BOOST_FOREACH(IfacePtr iface, ifaces_) {
+    // Protected DHCP packet queue access.
+    {
+        Mutex::Locker lock(receiver_lock_);
+        Pkt6Ptr pkt = packet_queue6_->dequeuePacket();
+        if (!pkt) {
+            receive_watch_.clearReady();
+        }
+
+        return (pkt);
+    }
+}
+
+void IfaceMgr::receiveDHCP4Packets() {
+    IfacePtr iface;
+    fd_set sockets;
+    int maxfd = 0;
+
+    FD_ZERO(&sockets);
+
+    // Add terminate watch socket.
+    FD_SET(terminate_watch_.getSelectFd(), &sockets);
+    if (maxfd < terminate_watch_.getSelectFd()) {
+        maxfd = terminate_watch_.getSelectFd();
+    }
+
+    // Add Interface sockets.
+    BOOST_FOREACH(iface, ifaces_) {
+        BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
+
+            // Only deal with IPv4 addresses.
+            if (s.addr_.isV4()) {
+                // Add this socket to listening set.
+                FD_SET(s.sockfd_, &sockets);
+                if (maxfd < s.sockfd_) {
+                    maxfd = s.sockfd_;
+                }
+            }
+        }
+    }
+
+    for (;;) {
+        // Check the watch socket.
+        if (terminate_watch_.isReady()) {
+            terminate_watch_.clearReady();
+            return;
+        }
+
+        fd_set rd_set;
+        FD_COPY(&sockets, &rd_set);
+
+        // zero out the errno to be safe.
+        errno = 0;
+
+        // Note we wait until something happen.
+        int result = select(maxfd + 1, &rd_set, 0, 0, 0);
+
+        // Re-check the watch socket.
+        if (terminate_watch_.isReady()) {
+            terminate_watch_.clearReady();
+            return;
+        }
+
+        if (result == 0) {
+            // nothing received?
+            continue;
+
+        } else if (result < 0) {
+            // This thread should not get signals?
+            if (errno != EINTR) {
+                // Signal the error to receive4.
+                receiver_error_ = strerror(errno);
+                error_watch_.markReady();
+                sleep(1);
+            }
+            continue;
+        }
+
+        // Let's find out which interface/socket has data.
+        BOOST_FOREACH(iface, ifaces_) {
+            BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
+                if (FD_ISSET(s.sockfd_, &sockets)) {
+                    receiveDHCP4Packet(*iface, s);
+                    // Can take time so check one more time the watch socket.
+                    if (terminate_watch_.isReady()) {
+                        terminate_watch_.clearReady();
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+}
+
+void IfaceMgr::receiveDHCP6Packets() {
+    IfacePtr iface;
+    fd_set sockets;
+    int maxfd = 0;
+
+    FD_ZERO(&sockets);
+
+    // Add terminate watch socket.
+    FD_SET(terminate_watch_.getSelectFd(), &sockets);
+    if (maxfd < terminate_watch_.getSelectFd()) {
+        maxfd = terminate_watch_.getSelectFd();
+    }
+
+    // Add Interface sockets.
+    BOOST_FOREACH(iface, ifaces_) {
         BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-            if (FD_ISSET(s.sockfd_, &sockets)) {
-                candidate.reset(new SocketInfo(s));
-                break;
+
+            // Only deal with IPv6 addresses.
+            if (s.addr_.isV6()) {
+
+                // Add this socket to listening set.
+                FD_SET(s.sockfd_, &sockets);
+                if (maxfd < s.sockfd_) {
+                    maxfd = s.sockfd_;
+                }
             }
         }
-        if (candidate) {
-            break;
+    }
+
+    for (;;) {
+        // Check the watch socket.
+        if (terminate_watch_.isReady()) {
+            terminate_watch_.clearReady();
+            return;
         }
+
+        fd_set rd_set;
+        FD_COPY(&sockets, &rd_set);
+
+        // zero out the errno to be safe.
+        errno = 0;
+
+        // Note we wait until something happen.
+        int result = select(maxfd + 1, &rd_set, 0, 0, 0);
+
+        // Re-check the watch socket.
+        if (terminate_watch_.isReady()) {
+            terminate_watch_.clearReady();
+            return;
+        }
+
+        if (result == 0) {
+            // nothing received?
+            continue;
+        } else if (result < 0) {
+            // This thread should not get signals?
+            if (errno != EINTR) {
+                // Signal the error to receive6.
+                receiver_error_ = strerror(errno);
+                error_watch_.markReady();
+                sleep(1);
+            }
+            continue;
+        }
+
+        // Let's find out which interface/socket has data.
+        BOOST_FOREACH(iface, ifaces_) {
+            BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
+                if (FD_ISSET(s.sockfd_, &sockets)) {
+                    receiveDHCP6Packet(s);
+                    // Can take time so check one more time the watch socket.
+                    if (terminate_watch_.isReady()) {
+                        terminate_watch_.clearReady();
+                        return;
+                    }
+                }
+            }
+        }
+    }
+}
+
+void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
+    int len;
+    int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
+    if (result < 0) {
+        // Signal the error to receive4.
+        receiver_error_ = strerror(errno);
+        error_watch_.markReady();
+        return;
+    }
+    if (len == 0) {
+        // Nothing to read.
+        return;
+    }
+
+    Pkt4Ptr pkt;
+
+    try {
+        pkt = packet_filter_->receive(iface, socket_info);
+    } catch (const std::exception& ex) {
+        receiver_error_ = ex.what();
+        error_watch_.markReady();
+    } catch (...) {
+        receiver_error_ = "packet filter receive() failed";
+        error_watch_.markReady();
     }
 
-    if (!candidate) {
-        isc_throw(SocketReadError, "received data over unknown socket");
+    if (pkt) {
+        Mutex::Locker lock(receiver_lock_);
+        packet_queue4_->enqueuePacket(pkt, socket_info);
+        receive_watch_.markReady();
+    }
+}
+
+void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
+    int len;
+    int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
+    if (result < 0) {
+        // Signal the error to receive6.
+        receiver_error_ = strerror(errno);
+        error_watch_.markReady();
+        return;
+    }
+    if (len == 0) {
+        // Nothing to read.
+        return;
+    }
+
+    Pkt6Ptr pkt;
+
+    try {
+        pkt = packet_filter6_->receive(socket_info);
+    } catch (const std::exception& ex) {
+        receiver_error_ = ex.what();
+        error_watch_.markReady();
+    } catch (...) {
+        receiver_error_ = "packet filter receive() failed";
+        error_watch_.markReady();
+    }
+
+    if (pkt) {
+        Mutex::Locker lock(receiver_lock_);
+        packet_queue6_->enqueuePacket(pkt, socket_info);
+        receive_watch_.markReady();
     }
-    // Assuming that packet filter is not NULL, because its modifier checks it.
-    return (packet_filter6_->receive(*candidate));
 }
 
 uint16_t IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
@@ -1193,5 +1455,51 @@ IfaceMgr::getSocket(isc::dhcp::Pkt4 const& pkt) {
     return (*candidate);
 }
 
+void 
+IfaceMgr::setPacketQueue4(PacketQueue4Ptr& packet_queue4) {
+    if (!packet_queue4) {
+        isc_throw(BadValue, "IfaceMgr::setPacketQueue4 "
+                  " queue pointer cannot be empty");
+    }
+
+    // On the off chance the existing impl doesn't clear on 
+    // destruction, we will as a safe guard.
+    packet_queue4_->clear();
+    packet_queue4_ = packet_queue4;
+}
+
+void 
+IfaceMgr::setPacketQueue6(PacketQueue6Ptr& packet_queue6) {
+    if (!packet_queue6) {
+        isc_throw(BadValue, "IfaceMgr::setPacketQueue6 "
+                  " queue pointer cannot be empty");
+        }
+
+    // On the off chance the existing impl doesn't clear on 
+    // destruction, we will as a safe guard.
+    packet_queue6_->clear();
+    packet_queue6_ = packet_queue6;
+}
+
+size_t 
+IfaceMgr::getPacketQueueCapacity4() const {
+    return (packet_queue4_->getCapacity());
+}
+
+void 
+IfaceMgr::setPacketQueueCapacity4(const size_t new_capacity) {
+    packet_queue4_->setCapacity(new_capacity);
+}
+
+size_t 
+IfaceMgr::getPacketQueueCapacity6() const {
+    return (packet_queue6_->getCapacity());
+}
+
+void 
+IfaceMgr::setPacketQueueCapacity6(const size_t new_capacity) {
+    packet_queue6_->setCapacity(new_capacity);
+}
+
 } // end of namespace isc::dhcp
 } // end of namespace isc
index 42b50376649236e285107efcfaf567e5776abcdc..2b235e120cd32a740d5cd9ff636feb1fe06e1b08 100644 (file)
 #include <dhcp/dhcp6.h>
 #include <dhcp/pkt4.h>
 #include <dhcp/pkt6.h>
+#include <dhcp/packet_queue.h>
 #include <dhcp/pkt_filter.h>
 #include <dhcp/pkt_filter6.h>
 #include <util/optional_value.h>
+#include <util/watch_socket.h>
+#include <util/threads/thread.h>
+#include <util/threads/sync.h>
 
 #include <boost/function.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_array.hpp>
 #include <boost/shared_ptr.hpp>
+#include <boost/circular_buffer.hpp>
 
 #include <list>
 #include <vector>
@@ -88,54 +93,6 @@ public:
         isc::Exception(file, line, what) { };
 };
 
-
-/// Holds information about socket.
-struct SocketInfo {
-
-    isc::asiolink::IOAddress addr_; /// bound address
-    uint16_t port_;   /// socket port
-    uint16_t family_; /// IPv4 or IPv6
-
-    /// @brief Socket descriptor (a.k.a. primary socket).
-    int sockfd_;
-
-    /// @brief Fallback socket descriptor.
-    ///
-    /// This socket descriptor holds the handle to the fallback socket.
-    /// The fallback socket is created when there is a need for the regular
-    /// datagram socket to be bound to an IP address and port, besides
-    /// primary socket (sockfd_) which is actually used to receive and process
-    /// the DHCP messages. The fallback socket (if exists) is always associated
-    /// with the primary socket. In particular, the need for the fallback socket
-    /// arises when raw socket is a primary one. When primary socket is open,
-    /// it is bound to an interface not the address and port. The implications
-    /// include the possibility that the other process (e.g. the other instance
-    /// of DHCP server) will bind to the same address and port through which the
-    /// raw socket receives the DHCP messages.Another implication is that the
-    /// kernel, being unaware of the DHCP server operating through the raw
-    /// socket, will respond with the ICMP "Destination port unreachable"
-    /// messages when DHCP messages are only received through the raw socket.
-    /// In order to workaround the issues mentioned here, the fallback socket
-    /// should be opened so as/ the kernel is aware that the certain address
-    /// and port is in use.
-    ///
-    /// The fallback description is supposed to be set to a negative value if
-    /// the fallback socket is closed (not open).
-    int fallbackfd_;
-
-    /// @brief SocketInfo constructor.
-    ///
-    /// @param addr An address the socket is bound to.
-    /// @param port A port the socket is bound to.
-    /// @param sockfd Socket descriptor.
-    /// @param fallbackfd A descriptor of the fallback socket.
-    SocketInfo(const isc::asiolink::IOAddress& addr, const uint16_t port,
-               const int sockfd, const int fallbackfd = -1)
-        : addr_(addr), port_(port), family_(addr.getFamily()),
-          sockfd_(sockfd), fallbackfd_(fallbackfd) { }
-
-};
-
 /// @brief Represents a single network interface
 ///
 /// Iface structure represents network interface with all useful
@@ -873,7 +830,7 @@ public:
     ///
     /// @warning This function does not check if there has been any sockets
     /// already open by the @c IfaceMgr. Therefore a caller should call
-    /// @c IfaceMgr::closeSockets(AF_INET6) before calling this function.
+    /// @c IfaceMgr::closeSockets() before calling this function.
     /// If there are any sockets open, the function may either throw an
     /// exception or invoke an error handler on attempt to bind the new socket
     /// to the same address and port.
@@ -939,7 +896,7 @@ public:
     ///
     /// @warning This function does not check if there has been any sockets
     /// already open by the @c IfaceMgr. Therefore a caller should call
-    /// @c IfaceMgr::closeSockets(AF_INET) before calling this function.
+    /// @c IfaceMgr::closeSockets() before calling this function.
     /// If there are any sockets open, the function may either throw an
     /// exception or invoke an error handler on attempt to bind the new socket
     /// to the same address and port.
@@ -963,23 +920,8 @@ public:
 
     /// @brief Closes all IPv4 or IPv6 sockets.
     ///
-    /// This function closes sockets of the specific 'type' and closes them.
-    /// The 'type' of the socket indicates whether it is used to send IPv4
-    /// or IPv6 packets. The allowed values of the parameter are AF_INET and
-    /// AF_INET6 for IPv4 and IPv6 packets respectively. It is important
-    /// to realize that the actual types of sockets may be different than
-    /// AF_INET for IPv4 packets. This is because, historically the IfaceMgr
-    /// always used AF_INET sockets for IPv4 traffic. This is no longer the
-    /// case when the Direct IPv4 traffic must be supported. In order to support
-    /// direct traffic, the IfaceMgr operates on raw sockets, e.g. AF_PACKET
-    /// family sockets on Linux.
-    ///
-    /// @todo Replace the AF_INET and AF_INET6 values with an enum
-    /// which will not be confused with the actual socket type.
-    ///
-    /// @param family type of the sockets to be closed (AF_INET or AF_INET6)
-    ///
-    /// @throw BadValue if family value is different than AF_INET or AF_INET6.
+    /// Obsolete!
+    /// @throw NotImplemented
     void closeSockets(const uint16_t family);
 
     /// @brief Returns number of detected interfaces.
@@ -1089,6 +1031,66 @@ public:
     /// @return true if there is a socket bound to the specified address.
     bool hasOpenSocket(const isc::asiolink::IOAddress& addr) const;
 
+    /// @brief Sets the DHCPv4 packet queue
+    ///
+    /// Replaces the existing DHCPv4 packet queue with the
+    /// given queue.  Any packets contained in the existing 
+    /// queue will be discarded.  This method is intended to 
+    /// be used by hook developers to install their own packet
+    /// queue implementation(s).
+    ///
+    /// @param packet_queue4 pointer to a PacketQueue4 instance
+    /// to use to manage inbound DHCPv4 packets.
+    ///
+    /// @throw BadValue if given an empty pointer
+    void setPacketQueue4(PacketQueue4Ptr& packet_queue4);
+
+    /// @brief Sets the DHCPv6 packet queue
+    ///
+    /// Replaces the existing DHCPv6 packet queue with the
+    /// given queue.  Any packets contained in the existing 
+    /// queue will be discarded.  This method is intended to 
+    /// be used by hook developers to install their own packet
+    /// queue implementation(s).
+    ///
+    /// @param packet_queue6 pointer to a PacketQueue6 instance
+    /// to use to manage inbound DHCPv6 packets.
+    ///
+    /// @throw BadValue if given an empty pointer
+    void setPacketQueue6(PacketQueue6Ptr& packet_queue6);
+
+    /// @brief Returns the current capacity of the DHCPv4 packet queue buffer.
+    size_t getPacketQueueCapacity4() const;
+
+    /// @brief Set the capacity of the DHCPv4 packet queue buffer.
+    ///
+    /// @param new_capacity New capacity of the buffer.
+    void setPacketQueueCapacity4(const size_t new_capacity);
+
+    /// @brief Returns the current capacity of the DHCPv6 packet queue buffer.
+    size_t getPacketQueueCapacity6() const;
+
+    /// @brief Set the capacity of the DHCPv6 packet queue buffer.
+    ///
+    /// @param new_capacity New capacity of the buffer.
+    void setPacketQueueCapacity6(const size_t new_capacity);
+
+    /// @brief Starts DHCP packet receiver.
+    ///
+    /// Starts the DHCP packet receiver thread for the given.
+    /// protocol, AF_NET or AF_INET6
+    ///
+    /// @param family indicates which receiver to start,
+    /// (AF_INET or AF_INET6)
+    ///
+    /// @throw Unexpected if the thread already exists.
+    void startDHCPReceiver(const uint16_t family);
+
+    /// @brief Stops the DHCP packet receiver.
+    ///
+    /// Stops the receiver and delete the dedicated thread.
+    void stopReceiver();
+
     // don't use private, we need derived classes in tests
 protected:
 
@@ -1138,8 +1140,7 @@ protected:
     /// and pretends to detect such interface. First interface name and
     /// link-local IPv6 address or IPv4 address is read from the
     /// interfaces.txt file.
-    void
-    stubDetectIfaces();
+    void stubDetectIfaces();
 
     // TODO: having 2 maps (ifindex->iface and ifname->iface would)
     //      probably be better for performance reasons
@@ -1205,6 +1206,52 @@ private:
                              const uint16_t port,
                              IfaceMgrErrorMsgCallback error_handler = 0);
 
+    /// @brief DHCPv4 receiver method.
+    ///
+    /// Loops forever reading DHCPv4 packets from the interface sockets
+    /// and adds them to the packet queue.  It monitors the "terminate"
+    /// watch socket, and exits if it is marked ready.  This is method
+    /// is used as the worker function in the thread created by @c 
+    /// startDHCP4Receiver().  It currently uses select() to monitor
+    /// socket readiness.  If the select errors out (other than EINTR),
+    /// it marks the "error" watch socket as ready.
+    void receiveDHCP4Packets();
+
+    /// @brief Receives a single DHCPv4 packet from an interface socket
+    ///
+    /// Called by @c receiveDHPC4Packets when a socket fd is flagged as
+    /// ready. It uses the DHCPv4 packet filter to receive a single packet 
+    /// from the given interface socket, adds it to the packet queue, and 
+    /// marks the "receive" watch socket ready. If an error occurs during
+    /// the read, the "error" watch socket is marked ready.
+    /// 
+    /// @param iface interface
+    /// @param socket_info structure holding socket information
+    void receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info);
+
+    /// @brief DHCPv6 receiver method.
+    ///
+    /// Loops forever reading DHCPv6 packets from the interface sockets
+    /// and adds them to the packet queue.  It monitors the "terminate"
+    /// watch socket, and exits if it is marked ready.  This is method
+    /// is used as the worker function in the thread created by @c 
+    /// startDHCP6Receiver().  It currently uses select() to monitor
+    /// socket readiness.  If the select errors out (other than EINTR),
+    /// it marks the "error" watch socket as ready.
+    void receiveDHCP6Packets();
+
+    /// @brief Receives a single DHCPv6 packet from an interface socket
+    ///
+    /// Called by @c receiveDHPC6Packets when a socket fd is flagged as
+    /// ready. It uses the DHCPv6 packet filter to receive a single packet 
+    /// from the given interface socket, adds it to the packet queue, and 
+    /// marks the "receive" watch socket ready. If an error occurs during
+    /// the read, the "error" watch socket is marked ready.
+    /// 
+    /// @param iface interface
+    /// @param socket_info structure holding socket information
+    void receiveDHCP6Packet(const SocketInfo& socket_info);
+
     /// Holds instance of a class derived from PktFilter, used by the
     /// IfaceMgr to open sockets and send/receive packets through these
     /// sockets. It is possible to supply custom object using
@@ -1229,6 +1276,43 @@ private:
 
     /// @brief Allows to use loopback
     bool allow_loopback_;
+
+    /// @brief Error message of the last DHCP packet receive error.
+    std::string receiver_error_;
+
+    /// @brief DHCPv4 receiver packet queue.
+    ///
+    /// Incoming packets are read by the receiver thread and
+    /// added to this queue. @c receive4() dequeues and 
+    /// returns them.
+    PacketQueue4Ptr packet_queue4_;
+
+    /// @brief DHCPv6 receiver packet queue.
+    ///
+    /// Incoming packets are read by the receiver thread and
+    /// added to this queue. @c receive6() dequeues and
+    /// returns them.
+    PacketQueue6Ptr packet_queue6_;
+
+    /// @brief DHCP packet receive error watch socket.
+    /// Marked as ready when the DHCP packet receiver experiences 
+    /// an I/O error.
+    isc::util::WatchSocket error_watch_;
+
+    /// @brief DHCP packet receive watch socket.
+    /// Marked as ready when the DHCP packet receiver adds a packet
+    /// to the packet queue.
+    isc::util::WatchSocket receive_watch_;
+
+    /// @brief Packet receiver terminate watch socket.
+    /// Marked as ready when the DHCP packet receiver thread should terminate.
+    isc::util::WatchSocket terminate_watch_;
+
+    /// DHCP packet receiver mutex.
+    isc::util::thread::Mutex receiver_lock_;
+
+    /// DHCP packet receiver thread.
+    isc::util::thread::ThreadPtr receiver_thread_;
 };
 
 }; // namespace isc::dhcp
diff --git a/src/lib/dhcp/packet_queue.h b/src/lib/dhcp/packet_queue.h
new file mode 100644 (file)
index 0000000..d0d80dc
--- /dev/null
@@ -0,0 +1,300 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef PACKET_QUEUE_H
+#define PACKET_QUEUE_H
+
+#include <dhcp/socket_info.h>
+#include <dhcp/pkt4.h>
+#include <dhcp/pkt6.h>
+
+#include <boost/function.hpp>
+#include <boost/circular_buffer.hpp>
+
+namespace isc {
+
+namespace dhcp {
+
+/// @brief Enumerates choices between the two ends of the queue.
+enum class QueueEnd {
+    FRONT,  // Typically the end packets are read from
+    BACK    // Typically the end packets are written to
+};
+
+/// @brief Interface for managing a queue of inbound DHCP packets
+template<typename PacketTypePtr> 
+class PacketQueue {
+public:
+
+    /// @brief Constructor
+    PacketQueue() {}
+
+    /// @brief virtual Destructor
+    virtual ~PacketQueue(){};
+
+    /// @brief Adds a packet to the queue
+    ///
+    /// Calls @c dropPacket to determine if the packet should be queued
+    /// or dropped.  If it should be queued it is added to the end of the
+    /// specified by the "to" parameter. 
+    ///
+    /// @param packet packet to enqueue 
+    /// @param source socket the packet came from - this can be
+    /// @param to end of the queue from which to remove packet(s).
+    /// Defaults to BACK.
+    ///
+    void enqueuePacket(PacketTypePtr packet, const SocketInfo& source,
+                       const QueueEnd& to=QueueEnd::BACK) {
+        if (!dropPacket(packet, source)) {
+            pushPacket(packet, to);
+        }
+    }
+
+    /// @brief Dequeues the next packet from the queue
+    ///
+    /// Calls @eatPackets to discard packets as needed, and then
+    /// dequeues the next packet (if any) and returns it.  Packets
+    /// are dequeued from the end of the queue specified by the "from"
+    /// parameter.
+    ///
+    /// @param from end of the queue from which to remove packet(s).
+    /// Defaults to FRONT.
+    ///
+    /// @return A pointer to dequeued packet, or an empty pointer
+    /// if the queue is empty.
+    PacketTypePtr dequeuePacket(const QueueEnd& from=QueueEnd::FRONT) {
+        eatPackets(from);
+        return(popPacket(from));
+    }
+
+    /// @brief Determines if a packet should be discarded.
+    ///
+    /// This function is called at the beginning of @enqueuePacket and
+    /// provides an opportunity to examine the packet and its source
+    /// and decide whether it should be dropped or added to the queue.
+    /// Derivations are expected to provide implementations based on 
+    /// their own requirements.  Bear in mind that the packet has NOT
+    /// been unpacked at this point. The default implementation simply
+    /// returns false.
+    ///
+    /// @param packet the packet under consideration
+    /// @param source the socket the packet came from
+    ///
+    /// @return true if the packet should be dropped, false if it should be
+    /// kept.
+    virtual bool dropPacket(PacketTypePtr /* packet */, 
+                            const SocketInfo& /* source */) {
+        return (false);
+    }
+
+    /// Discards packets from one end of the queue.
+    ///
+    /// This function is called at the beginning of @c dequeuPacket and
+    /// provides an opportunity to examine and discard packets from
+    /// the queue prior to dequeuing the next packet to be
+    /// processed.  Derivations are expected to provide implementations 
+    /// based on their own requirements.  The default implemenation is to 
+    /// to simply returns without skipping any packets.
+    ///
+    /// @param from end of the queue from which packets should discarded
+    /// This is passed in from @c dequeuePackets.
+    ///
+    /// @return The number of packets discarded.
+    virtual int eatPackets(const QueueEnd& /* from */) {
+        return (0); 
+    }
+
+    /// @brief Pushes a packet onto the queue
+    ///
+    ///  Adds a packet onto the end of queue specified.
+    virtual void pushPacket(PacketTypePtr& packet, const QueueEnd& to=QueueEnd::BACK) = 0;
+
+    /// @brief Pops a packet from the queue
+    ///
+    /// Removes a packet from the end of the queue specified and returns it.
+    ///
+    /// @return A pointer to dequeued packet, or an empty pointer
+    /// if the queue is empty.
+    virtual PacketTypePtr popPacket(const QueueEnd& from=QueueEnd::FRONT) = 0;
+
+    /// @brief Gets the packet currently at one end of the queue
+    ///
+    /// Returns a pointer the packet at the specified end of the
+    /// queue without dequeuing it.  
+    ///
+    /// @return A pointer to packet, or an empty pointer if the 
+    /// queue is empty.
+    virtual const PacketTypePtr peek(const QueueEnd& from=QueueEnd::FRONT) const = 0;
+
+    /// @brief return True if the queue is empty.
+    virtual bool empty() const = 0;
+
+    /// @brief Returns the maximum number of packets allowed in the buffer.
+    virtual size_t getCapacity() const = 0;
+
+    /// @brief Sets the maximum number of packets allowed in the buffer.
+    virtual void setCapacity(size_t capacity) = 0;
+
+    /// @brief Returns the current number of packets in the buffer.
+    virtual size_t getSize() const = 0;
+
+    /// @brief Discards all packets currently in the buffer.
+    virtual void clear() = 0;
+};
+
+/// @brief Defines pointer to the DHCPv4 queue interface used at the application level. 
+typedef boost::shared_ptr<PacketQueue<Pkt4Ptr>> PacketQueue4Ptr;
+
+/// @brief Defines pointer to the DHCPv6 queue interface used at the application level. 
+typedef boost::shared_ptr<PacketQueue<Pkt6Ptr>> PacketQueue6Ptr;
+
+/// @brief Provides an abstract ring-buffer implementation of the PacketQueue interface.
+template<typename PacketTypePtr> 
+class PacketQueueRing : public PacketQueue<PacketTypePtr> {
+public:
+    static const size_t MIN_RING_CAPACITY = 5;
+
+    /// @brief Constructor
+    ///
+    /// @param queue_capacity maximum number of packets the queue can hold
+    /// Defaults to DEFAULT_RING_CAPACITY.
+    PacketQueueRing(size_t capacity) { 
+        queue_.set_capacity(capacity);
+    }
+
+    /// @brief virtual Destructor
+    virtual ~PacketQueueRing(){};
+
+    /// @brief Pushes a packet onto the queue
+    ///
+    ///  Adds a packet onto the end of queue specified.
+    virtual void pushPacket(PacketTypePtr& packet, const QueueEnd& to=QueueEnd::BACK) {
+        if (to == QueueEnd::BACK) {
+            queue_.push_back(packet);
+        } else {
+            queue_.push_front(packet);
+        }
+    }
+
+    /// @brief Pops a packet from the queue
+    ///
+    /// Removes a packet from the end of the queue specified and returns it.
+    ///
+    /// @return A pointer to dequeued packet, or an empty pointer
+    /// if the queue is empty.
+    virtual PacketTypePtr popPacket(const QueueEnd& from = QueueEnd::FRONT) {
+        PacketTypePtr packet;
+        if (queue_.empty()) {
+            return (packet);
+        }
+
+        if (from == QueueEnd::FRONT) {
+            packet = queue_.front();
+            queue_.pop_front();
+        } else {
+            packet = queue_.back();
+            queue_.pop_back();
+        }
+
+        return (packet);
+    }
+
+
+    /// @brief Gets the packet currently at one end of the queue
+    ///
+    /// Returns a pointer the packet at the specified end of the
+    /// queue without dequeuing it.  
+    ///
+    /// @return A pointer to packet, or an empty pointer if the 
+    /// queue is empty.
+    virtual const PacketTypePtr peek(const QueueEnd& from=QueueEnd::FRONT) const {
+        PacketTypePtr packet;
+        if (!queue_.empty()) {
+            packet = (from == QueueEnd::FRONT ? queue_.front() : queue_.back());
+        }
+
+        return (packet);
+    }
+
+    /// @brief Returns True if the queue is empty.
+    virtual bool empty() const {
+        return(queue_.empty());
+    } 
+
+    /// @brief Returns the maximum number of packets allowed in the buffer.
+    virtual size_t getCapacity() const {
+        return (queue_.capacity());
+    }
+
+    /// @brief Sets the maximum number of packets allowed in the buffer.
+    ///
+    /// @throw BadValue if capacity is too low.
+    virtual void setCapacity(size_t capacity) {
+        if (capacity < MIN_RING_CAPACITY) {
+            isc_throw(BadValue, "Queue capacity of " << capacity 
+                      << " is invalid.  It must be at least " 
+                      << MIN_RING_CAPACITY);
+        }
+
+        /// @todo should probably throw if it's zero
+        queue_.set_capacity(capacity);
+    }
+
+    /// @brief Returns the current number of packets in the buffer.
+    virtual size_t getSize() const {
+        return (queue_.size());
+    }
+
+    /// @brief Discards all packets currently in the buffer.
+    virtual void clear()  {
+        queue_.clear();
+    }
+
+private:
+
+    /// @brief Packet queue
+    boost::circular_buffer<PacketTypePtr> queue_;
+};
+
+
+/// @brief Default DHCPv4 packet queue buffer implementation
+class PacketQueueRing4 : public PacketQueueRing<Pkt4Ptr> {
+public:
+    static const size_t DEFAULT_RING_CAPACITY = 500;
+
+    /// @brief Constructor
+    ///
+    /// @param capacity maximum number of packets the queue can hold
+    PacketQueueRing4(size_t capacity=DEFAULT_RING_CAPACITY) 
+        : PacketQueueRing(capacity) {
+    };
+
+    /// @brief virtual Destructor
+    virtual ~PacketQueueRing4(){};
+};
+
+/// @brief Default DHCPv6 packet queue buffer implementation
+class PacketQueueRing6 : public PacketQueueRing<Pkt6Ptr> {
+public:
+    static const size_t DEFAULT_RING_CAPACITY = 500;
+
+    /// @brief Constructor
+    ///
+    /// @param capacity maximum number of packets the queue can hold
+    PacketQueueRing6(size_t capacity=DEFAULT_RING_CAPACITY)
+        : PacketQueueRing(capacity) {
+    };
+
+    /// @brief virtual Destructor
+    virtual ~PacketQueueRing6(){};
+};
+
+
+
+}; // namespace isc::dhcp
+}; // namespace isc
+
+#endif // PACKET_QUEUE_H
diff --git a/src/lib/dhcp/socket_info.h b/src/lib/dhcp/socket_info.h
new file mode 100644 (file)
index 0000000..7914a4e
--- /dev/null
@@ -0,0 +1,66 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef DHCP_SOCKET_INFO_H
+#define DHCP_SOCKET_INFO_H
+
+#include <asiolink/io_address.h>
+
+namespace isc {
+
+namespace dhcp {
+
+/// Holds information about socket.
+struct SocketInfo {
+
+    isc::asiolink::IOAddress addr_; /// bound address
+    uint16_t port_;   /// socket port
+    uint16_t family_; /// IPv4 or IPv6
+
+    /// @brief Socket descriptor (a.k.a. primary socket).
+    int sockfd_;
+
+    /// @brief Fallback socket descriptor.
+    ///
+    /// This socket descriptor holds the handle to the fallback socket.
+    /// The fallback socket is created when there is a need for the regular
+    /// datagram socket to be bound to an IP address and port, besides
+    /// primary socket (sockfd_) which is actually used to receive and process
+    /// the DHCP messages. The fallback socket (if exists) is always associated
+    /// with the primary socket. In particular, the need for the fallback socket
+    /// arises when raw socket is a primary one. When primary socket is open,
+    /// it is bound to an interface not the address and port. The implications
+    /// include the possibility that the other process (e.g. the other instance
+    /// of DHCP server) will bind to the same address and port through which the
+    /// raw socket receives the DHCP messages.Another implication is that the
+    /// kernel, being unaware of the DHCP server operating through the raw
+    /// socket, will respond with the ICMP "Destination port unreachable"
+    /// messages when DHCP messages are only received through the raw socket.
+    /// In order to workaround the issues mentioned here, the fallback socket
+    /// should be opened so as/ the kernel is aware that the certain address
+    /// and port is in use.
+    ///
+    /// The fallback description is supposed to be set to a negative value if
+    /// the fallback socket is closed (not open).
+    int fallbackfd_;
+
+    /// @brief SocketInfo constructor.
+    ///
+    /// @param addr An address the socket is bound to.
+    /// @param port A port the socket is bound to.
+    /// @param sockfd Socket descriptor.
+    /// @param fallbackfd A descriptor of the fallback socket.
+    SocketInfo(const isc::asiolink::IOAddress& addr, const uint16_t port,
+               const int sockfd, const int fallbackfd = -1)
+        : addr_(addr), port_(port), family_(addr.getFamily()),
+          sockfd_(sockfd), fallbackfd_(fallbackfd) { }
+
+};
+
+}; // namespace isc::dhcp
+}; // namespace isc
+
+#endif // DHCP_SOCKET_INFO_H
index c5a0a943305dc3d0310b8d99db5b401c2258bc13..da596e436055f4eb70c4a6ff675daee32abb1281 100644 (file)
@@ -70,6 +70,8 @@ libdhcp___unittests_SOURCES += option_string_unittest.cc
 libdhcp___unittests_SOURCES += option_vendor_unittest.cc
 libdhcp___unittests_SOURCES += option_vendor_class_unittest.cc
 libdhcp___unittests_SOURCES  += pkt_captures4.cc pkt_captures6.cc pkt_captures.h
+libdhcp___unittests_SOURCES += packet_queue4_unittest.cc
+libdhcp___unittests_SOURCES += packet_queue6_unittest.cc
 libdhcp___unittests_SOURCES += pkt4_unittest.cc
 libdhcp___unittests_SOURCES += pkt6_unittest.cc
 libdhcp___unittests_SOURCES += pkt4o6_unittest.cc
index 87a67c55aff78a094a62029773fab74aa06c4477..60a2fee36627fb564fbe996a2fa035b6e75d0019 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2015 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2014-2018 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -37,6 +37,7 @@ IfaceMgrTestConfig::IfaceMgrTestConfig(const bool default_config) {
 }
 
 IfaceMgrTestConfig::~IfaceMgrTestConfig() {
+    IfaceMgr::instance().stopReceiver();
     IfaceMgr::instance().closeSockets();
     IfaceMgr::instance().clearIfaces();
     IfaceMgr::instance().setPacketFilter(PktFilterPtr(new PktFilterInet()));
index ab435d59bc7db8387705ec72d0bb0fdd9fd982a6..95e704a46db6005ab0d3cf13abf3359afe97e39c 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2017 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2014-2018 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -138,6 +138,7 @@ public:
     /// Closes all currently opened sockets, removes current interfaces and
     /// sets the default packet filtering classes. The default packet filtering
     /// classes are used for IO operations on real sockets/interfaces.
+    /// Receiver is stopped.
     ///
     /// Destructor also re-detects real interfaces.
     ~IfaceMgrTestConfig();
index 10f943336770561d4cf9cb814e0f0f1cdc81b270..a3771bcca41cece543f5b504c83593fa157d4b6a 100644 (file)
@@ -525,65 +525,6 @@ TEST_F(IfaceMgrTest, instance) {
     EXPECT_NO_THROW(IfaceMgr::instance());
 }
 
-// This test verifies that sockets can be closed selectively, i.e. all
-// IPv4 sockets can be closed first and all IPv6 sockets remain open.
-TEST_F(IfaceMgrTest, closeSockets) {
-    // Will be using local loopback addresses for this test.
-    IOAddress loaddr("127.0.0.1");
-    IOAddress loaddr6("::1");
-
-    // Create instance of IfaceMgr.
-    boost::scoped_ptr<NakedIfaceMgr> iface_mgr(new NakedIfaceMgr());
-    ASSERT_TRUE(iface_mgr);
-
-    // Out constructor does not detect interfaces by itself. We need
-    // to create one and add.
-    int ifindex = if_nametoindex(LOOPBACK);
-    ASSERT_GT(ifindex, 0);
-    IfacePtr lo_iface(new Iface(LOOPBACK, ifindex));
-    iface_mgr->getIfacesLst().push_back(lo_iface);
-
-    // Create set of V4 and V6 sockets on the loopback interface.
-    // They must differ by a port they are bound to.
-    for (unsigned i = 0; i < 6; ++i) {
-        // Every other socket will be IPv4.
-        if (i % 2) {
-            ASSERT_NO_THROW(
-                iface_mgr->openSocket(LOOPBACK, loaddr, 10000 + i)
-            );
-        } else {
-            ASSERT_NO_THROW(
-                iface_mgr->openSocket(LOOPBACK, loaddr6, 10000 + i)
-            );
-        }
-    }
-
-    // At the end we should have 3 IPv4 and 3 IPv6 sockets open.
-    IfacePtr iface = iface_mgr->getIface(LOOPBACK);
-    ASSERT_TRUE(iface != NULL);
-
-    int v4_sockets_count = getOpenSocketsCount(*iface, AF_INET);
-    ASSERT_EQ(3, v4_sockets_count);
-    int v6_sockets_count = getOpenSocketsCount(*iface, AF_INET6);
-    ASSERT_EQ(3, v6_sockets_count);
-
-    // Let's try to close only IPv4 sockets.
-    ASSERT_NO_THROW(iface_mgr->closeSockets(AF_INET));
-    v4_sockets_count = getOpenSocketsCount(*iface, AF_INET);
-    EXPECT_EQ(0, v4_sockets_count);
-    // The IPv6 sockets should remain open.
-    v6_sockets_count = getOpenSocketsCount(*iface, AF_INET6);
-    EXPECT_EQ(3, v6_sockets_count);
-
-    // Let's try to close IPv6 sockets.
-    ASSERT_NO_THROW(iface_mgr->closeSockets(AF_INET6));
-    v4_sockets_count = getOpenSocketsCount(*iface, AF_INET);
-    EXPECT_EQ(0, v4_sockets_count);
-    // They should have been closed now.
-    v6_sockets_count = getOpenSocketsCount(*iface, AF_INET6);
-    EXPECT_EQ(0, v6_sockets_count);
-}
-
 TEST_F(IfaceMgrTest, ifaceClass) {
     // Basic tests for Iface inner class
 
@@ -699,6 +640,60 @@ TEST_F(IfaceMgrTest, clearIfaces) {
     EXPECT_EQ(0, ifacemgr.countIfaces());
 }
 
+// Verify that we can manipulate the DHCPv4 packet queue.
+TEST_F(IfaceMgrTest, packetQueue4) {
+    NakedIfaceMgr ifacemgr;
+  
+    // Verify the default packet queue exists and has the default capacity.
+    size_t default_cap = PacketQueueRing4::DEFAULT_RING_CAPACITY;
+    EXPECT_EQ(default_cap, ifacemgr.getPacketQueueCapacity4());
+
+    PacketQueue4Ptr myQueue;
+    // Verify we cannot set the queue to an empty pointer.
+    ASSERT_THROW(ifacemgr.setPacketQueue4(myQueue), BadValue);
+
+    // Verify we can replace the default packet queue with our own.
+    myQueue.reset(new PacketQueueRing4(default_cap + 1));
+    ASSERT_NO_THROW(ifacemgr.setPacketQueue4(myQueue));
+
+    // Verify the new queue has the expected capacity.
+    EXPECT_EQ(default_cap + 1, ifacemgr.getPacketQueueCapacity4());
+
+    // Verify we can't set the capacity to an invalid value.
+    ASSERT_THROW(ifacemgr.setPacketQueueCapacity4(0), BadValue);
+
+    // Verify we can set the capacity to an invalid value.
+    ASSERT_NO_THROW(ifacemgr.setPacketQueueCapacity4(default_cap + 2));
+    EXPECT_EQ(default_cap + 2, ifacemgr.getPacketQueueCapacity4());
+}
+
+// Verify that we can manipulate the DHCPv6 packet queue.
+TEST_F(IfaceMgrTest, packetQueue6) {
+    NakedIfaceMgr ifacemgr;
+  
+    // Verify the default packet queue exists and has the default capacity.
+    size_t default_cap = PacketQueueRing6::DEFAULT_RING_CAPACITY;
+    EXPECT_EQ(default_cap, ifacemgr.getPacketQueueCapacity6());
+
+    PacketQueue6Ptr myQueue;
+    // Verify we cannot set the queue to an empty pointer.
+    ASSERT_THROW(ifacemgr.setPacketQueue6(myQueue), BadValue);
+
+    // Verify we can replace the default packet queue with our own.
+    myQueue.reset(new PacketQueueRing6(default_cap + 1));
+    ASSERT_NO_THROW(ifacemgr.setPacketQueue6(myQueue));
+
+    // Verify the new queue has the expected capacity.
+    EXPECT_EQ(default_cap + 1, ifacemgr.getPacketQueueCapacity6());
+
+    // Verify we can't set the capacity to an invalid value.
+    ASSERT_THROW(ifacemgr.setPacketQueueCapacity6(0), BadValue);
+
+    // Verify we can set the capacity to an invalid value.
+    ASSERT_NO_THROW(ifacemgr.setPacketQueueCapacity6(default_cap + 2));
+    EXPECT_EQ(default_cap + 2, ifacemgr.getPacketQueueCapacity6());
+}
+
 TEST_F(IfaceMgrTest, receiveTimeout6) {
     using namespace boost::posix_time;
     std::cout << "Testing DHCPv6 packet reception timeouts."
@@ -714,6 +709,8 @@ TEST_F(IfaceMgrTest, receiveTimeout6) {
     );
     // Socket is open if result is non-negative.
     ASSERT_GE(socket1, 0);
+    // Start receiver.
+    ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6));
 
     // Remember when we call receive6().
     ptime start_time = microsec_clock::universal_time();
@@ -749,6 +746,9 @@ TEST_F(IfaceMgrTest, receiveTimeout6) {
     // Test with invalid fractional timeout values.
     EXPECT_THROW(ifacemgr->receive6(0, 1000000), isc::BadValue);
     EXPECT_THROW(ifacemgr->receive6(1, 1000010), isc::BadValue);
+
+    // Stop receiver.
+    EXPECT_NO_THROW(ifacemgr->stopReceiver());
 }
 
 TEST_F(IfaceMgrTest, receiveTimeout4) {
@@ -766,6 +766,8 @@ TEST_F(IfaceMgrTest, receiveTimeout4) {
     );
     // Socket is open if returned value is non-negative.
     ASSERT_GE(socket1, 0);
+    // Start receiver.
+    ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
 
     Pkt4Ptr pkt;
     // Remember when we call receive4().
@@ -801,6 +803,9 @@ TEST_F(IfaceMgrTest, receiveTimeout4) {
     // Test with invalid fractional timeout values.
     EXPECT_THROW(ifacemgr->receive6(0, 1000000), isc::BadValue);
     EXPECT_THROW(ifacemgr->receive6(2, 1000005), isc::BadValue);
+
+    // Stop receiver.
+    EXPECT_NO_THROW(ifacemgr->stopReceiver());
 }
 
 TEST_F(IfaceMgrTest, multipleSockets) {
@@ -1089,6 +1094,7 @@ TEST_F(IfaceMgrTest, sendReceive6) {
     EXPECT_GE(socket1, 0);
     EXPECT_GE(socket2, 0);
 
+    ifacemgr->startDHCPReceiver(AF_INET6);
 
     // prepare dummy payload
     uint8_t data[128];
@@ -1124,6 +1130,8 @@ TEST_F(IfaceMgrTest, sendReceive6) {
     // assume the one or the other will always be chosen for sending data. Therefore
     // we should accept both values as source ports.
     EXPECT_TRUE((rcvPkt->getRemotePort() == 10546) || (rcvPkt->getRemotePort() == 10547));
+
+    ifacemgr->stopReceiver();
 }
 
 TEST_F(IfaceMgrTest, sendReceive4) {
@@ -1142,6 +1150,8 @@ TEST_F(IfaceMgrTest, sendReceive4) {
 
     EXPECT_GE(socket1, 0);
 
+    ifacemgr->startDHCPReceiver(AF_INET);
+
     boost::shared_ptr<Pkt4> sendPkt(new Pkt4(DHCPDISCOVER, 1234) );
 
     sendPkt->setLocalAddr(IOAddress("127.0.0.1"));
@@ -1208,31 +1218,23 @@ TEST_F(IfaceMgrTest, sendReceive4) {
     // assume the one or the other will always be chosen for sending data. We should
     // skip checking source port of sent address.
 
+
     // Close the socket. Further we will test if errors are reported
     // properly on attempt to use closed socket.
     close(socket1);
 
-// Warning: kernel bug on FreeBSD. The following code checks that attempt to
-// read through invalid descriptor will result in exception. The reason why
-// this failure is expected is that select() function should result in EBADF
-// error when invalid descriptor is passed to it. In particular, closed socket
-// descriptor is invalid. On the following OS:
-//
-// 8.1-RELEASE FreeBSD 8.1-RELEASE #0: Mon Jul 19 02:55:53 UTC 2010
-//
-// calling select() using invalid descriptor results in timeout and eventually
-// value of 0 is returned. This has been identified and reported as a bug in
-// FreeBSD: http://www.freebsd.org/cgi/query-pr.cgi?pr=155606
-//
-// @todo: This part of the test is currently disabled on all BSD systems as it was
-// the quick fix. We need a more elegant (config-based) solution to disable
-// this check on affected systems only. The ticket has been submitted for this
-// work: http://kea.isc.org/ticket/2971
-#ifndef OS_BSD
+#if 0
+    // @todo Closing the socket does NOT cause a read error out of the
+    // receiveDHCP<X>Packets() select.  Apparently this is because the
+    // thread is already inside the select when the socket is closed,
+    // and (at least under Centos 7.5), this does not interrupt the 
+    // select.
     EXPECT_THROW(ifacemgr->receive4(10), SocketReadError);
 #endif
 
     EXPECT_THROW(ifacemgr->send(sendPkt), SocketWriteError);
+
+    ifacemgr->stopReceiver();
 }
 
 // Verifies that it is possible to set custom packet filter object
@@ -1271,8 +1273,8 @@ TEST_F(IfaceMgrTest, setPacketFilter) {
     EXPECT_THROW(iface_mgr->setPacketFilter(custom_packet_filter),
                  PacketFilterChangeDenied);
 
-    // So, let's close the open IPv4 sockets and retry. Now it should succeed.
-    iface_mgr->closeSockets(AF_INET);
+    // So, let's close the open sockets and retry. Now it should succeed.
+    iface_mgr->closeSockets();
     EXPECT_NO_THROW(iface_mgr->setPacketFilter(custom_packet_filter));
 }
 
@@ -1312,8 +1314,8 @@ TEST_F(IfaceMgrTest, setPacketFilter6) {
     EXPECT_THROW(iface_mgr->setPacketFilter(custom_packet_filter),
                  PacketFilterChangeDenied);
 
-    // So, let's close the IPv6 sockets and retry. Now it should succeed.
-    iface_mgr->closeSockets(AF_INET6);
+    // So, let's close the sockets and retry. Now it should succeed.
+    iface_mgr->closeSockets();
     EXPECT_NO_THROW(iface_mgr->setPacketFilter(custom_packet_filter));
 
 }
@@ -2528,6 +2530,8 @@ TEST_F(IfaceMgrTest, SingleExternalSocket4) {
     EXPECT_TRUE(pipe(pipefd) == 0);
     EXPECT_NO_THROW(ifacemgr->addExternalSocket(pipefd[0], my_callback));
 
+    ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
+
     Pkt4Ptr pkt4;
     ASSERT_NO_THROW(pkt4 = ifacemgr->receive4(1));
 
@@ -2552,6 +2556,8 @@ TEST_F(IfaceMgrTest, SingleExternalSocket4) {
     // close both pipe ends
     close(pipefd[1]);
     close(pipefd[0]);
+
+    ASSERT_NO_THROW(ifacemgr->stopReceiver());
 }
 
 // Tests if multiple external sockets and their callbacks can be passed and
diff --git a/src/lib/dhcp/tests/packet_queue4_unittest.cc b/src/lib/dhcp/tests/packet_queue4_unittest.cc
new file mode 100644 (file)
index 0000000..61ea4c6
--- /dev/null
@@ -0,0 +1,289 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/io_address.h>
+#include <dhcp/packet_queue.h>
+
+#include <boost/shared_ptr.hpp>
+#include <gtest/gtest.h>
+
+using namespace std;
+using namespace isc;
+using namespace isc::dhcp;
+
+namespace {
+
+class TestQueue4 : public PacketQueueRing<Pkt4Ptr> {
+public:
+
+    /// @brief Constructor
+    ///
+    /// @param queue_size maximum number of packets the queue can hold
+    TestQueue4(size_t queue_size) 
+        : PacketQueueRing(queue_size), drop_enabled_(false), eat_count_(0) {
+    };
+
+    /// @brief virtual Destructor
+    virtual ~TestQueue4(){};
+
+    /// @brief Determines is a packet should be dropped.
+    ///
+    /// If drop is enabled and either the packet transaction
+    /// id or the socket source port are even numbers, drop the packet
+    ///
+    /// @param packet the packet under consideration
+    /// @param source the socket the packet came from
+    ///
+    /// @return True if the packet should be dropped.
+    virtual bool dropPacket(Pkt4Ptr packet,
+                            const SocketInfo& source) {
+        if (drop_enabled_) {
+            return ((packet->getTransid() % 2 == 0) || 
+                    (source.port_ % 2 == 0)); 
+        }
+
+        return (false);
+    }
+
+    /// @brief Discards a number of packets from one end of the queue
+    ///
+    /// Dequeue and discard eat_count_ packets from the given end of
+    /// the queue_.
+    ///
+    /// @param from end of the queue from which packets should discarded
+    ///
+    /// @return The number of packets discarded.
+    virtual int eatPackets(const QueueEnd& from) {
+        int eaten = 0;
+        for ( ; eaten < eat_count_; ++eaten) {
+            Pkt4Ptr pkt = popPacket(from);
+            if (!pkt) {
+                break;
+            }
+        }
+
+        return (eaten);
+    }
+
+    bool drop_enabled_;
+    int eat_count_;
+};
+
+TEST(TestQueue4, interfaceBasics) {
+    PacketQueue4Ptr q4(new TestQueue4(100));
+    ASSERT_TRUE(q4);
+    EXPECT_TRUE(q4->empty());
+    EXPECT_EQ(100, q4->getCapacity());
+    EXPECT_EQ(0, q4->getSize());
+
+    size_t min = TestQueue4::MIN_RING_CAPACITY;
+    ASSERT_THROW(q4->setCapacity(min - 1), BadValue);
+    ASSERT_NO_THROW(q4->setCapacity(min));
+    EXPECT_EQ(min, q4->getCapacity());
+}
+
+TEST(TestQueue4, ringTest) {
+    PacketQueue4Ptr q4(new TestQueue4(3));
+
+    EXPECT_EQ(3, q4->getCapacity());
+    // Enqueue five packets.  The first two should be pushed off.
+    SocketInfo sock1(isc::asiolink::IOAddress("127.0.0.1"), 777, 10);
+    for (int i = 1; i < 6; ++i) {
+        Pkt4Ptr pkt(new Pkt4(DHCPDISCOVER, 1000+i));
+        ASSERT_NO_THROW(q4->enqueuePacket(pkt, sock1));
+
+        int exp_size = (i > 3 ? 3 : i);
+        EXPECT_EQ(exp_size, q4->getSize()); 
+    }
+
+    // We should have transids 1005,1004,1003  (back to front)
+
+    // Peek front should be transid 1003.
+    Pkt4Ptr pkt;
+    ASSERT_NO_THROW(pkt = q4->peek(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Peek back should be transid 1005.
+    ASSERT_NO_THROW(pkt = q4->peek(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1005, pkt->getTransid());
+
+    // Pop front should return transid 1003.
+    ASSERT_NO_THROW(pkt = q4->popPacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Pop back should return transid 1005.
+    ASSERT_NO_THROW(pkt = q4->popPacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1005, pkt->getTransid());
+
+    // Peek front should be transid 1004.
+    ASSERT_NO_THROW(pkt = q4->peek(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Peek back should be transid 1004.
+    ASSERT_NO_THROW(pkt = q4->peek(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Pop front should return transid 1004.
+    ASSERT_NO_THROW(pkt = q4->popPacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Pop front should return an empty pointer.
+    ASSERT_NO_THROW(pkt = q4->popPacket(QueueEnd::BACK));
+    ASSERT_FALSE(pkt);
+
+    // Enqueue three packets.
+    for (int i = 1; i < 3; ++i) {
+        Pkt4Ptr pkt(new Pkt4(DHCPDISCOVER, 1000+i));
+        ASSERT_NO_THROW(q4->enqueuePacket(pkt, sock1));
+        EXPECT_EQ(i, q4->getSize()); 
+    }
+
+    // Let's flush the buffer and then verify it is empty.
+    q4->clear();
+    EXPECT_TRUE(q4->empty()); 
+    EXPECT_EQ(0, q4->getSize()); 
+}
+
+TEST(TestQueue4, enqueueDequeueTest) {
+    PacketQueue4Ptr q4(new TestQueue4(100));
+    EXPECT_TRUE(q4->empty());
+
+    SocketInfo sock1(isc::asiolink::IOAddress("127.0.0.1"), 777, 10);
+
+    // Enqueue the first packet.
+    Pkt4Ptr pkt(new Pkt4(DHCPDISCOVER, 1002));
+    ASSERT_NO_THROW(q4->enqueuePacket(pkt, sock1));
+    EXPECT_EQ(1, q4->getSize());
+
+    // Enqueue a packet onto the front.
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1003));
+    ASSERT_NO_THROW(q4->enqueuePacket(pkt, sock1, QueueEnd::FRONT));
+    EXPECT_EQ(2, q4->getSize());
+
+    // Enqueue a packet onto the back.
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1001));
+    ASSERT_NO_THROW(q4->enqueuePacket(pkt, sock1, QueueEnd::BACK));
+    EXPECT_EQ(3, q4->getSize());
+
+    // By default we dequeue from the front. We should get transid 1003.
+    ASSERT_NO_THROW(pkt = q4->dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Dequeue from the back, we should get transid 1001.
+    ASSERT_NO_THROW(pkt = q4->dequeuePacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1001, pkt->getTransid());
+
+    // Dequeue from the front, we should get transid 1002.
+    ASSERT_NO_THROW(pkt = q4->dequeuePacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1002, pkt->getTransid());
+
+    // Queue should be empty.
+    ASSERT_NO_THROW(pkt = q4->dequeuePacket());
+    ASSERT_FALSE(pkt);
+}
+
+TEST(TestQueue4, dropPacketTest) {
+    TestQueue4 q4(100);
+    EXPECT_TRUE(q4.empty());
+    ASSERT_FALSE(q4.drop_enabled_);
+    ASSERT_EQ(0, q4.eat_count_);
+
+    SocketInfo sockEven(isc::asiolink::IOAddress("127.0.0.1"), 888, 10);
+    SocketInfo sockOdd(isc::asiolink::IOAddress("127.0.0.1"), 777, 11);
+
+    // Drop is not enabled. 
+    // We should be able to enqueu a packet with even numbered values.
+    Pkt4Ptr pkt(new Pkt4(DHCPDISCOVER, 1002));
+    ASSERT_NO_THROW(q4.enqueuePacket(pkt, sockEven));
+    EXPECT_EQ(1, q4.getSize());
+
+    // We should be able to enqueu a packet with odd numbered values.
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1003));
+    ASSERT_NO_THROW(q4.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(2, q4.getSize());
+
+    // Enable drop logic.
+    q4.drop_enabled_ = true;
+
+    // We should not be able to add one with an even-numbered transid.      
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1004));
+    ASSERT_NO_THROW(q4.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(2, q4.getSize());
+
+    // We should not be able to add one with from even-numbered port.      
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1005));
+    ASSERT_NO_THROW(q4.enqueuePacket(pkt, sockEven));
+    EXPECT_EQ(2, q4.getSize());
+
+    // We should be able to add one with an odd-numbered values.
+    pkt.reset(new Pkt4(DHCPDISCOVER, 1007));
+    ASSERT_NO_THROW(q4.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(3, q4.getSize());
+
+    // Dequeue them and make sure they are as expected: 1002,1003, and 1007. 
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1002, pkt->getTransid());
+
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1007, pkt->getTransid());
+
+    // Queue should be empty.
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket());
+    ASSERT_FALSE(pkt);
+}
+
+TEST(TestQueue4, eatPacketsTest) {
+    TestQueue4 q4(100);
+    EXPECT_TRUE(q4.empty());
+    ASSERT_FALSE(q4.drop_enabled_);
+    ASSERT_EQ(0, q4.eat_count_);
+
+    SocketInfo sock(isc::asiolink::IOAddress("127.0.0.1"), 888, 10);
+
+    Pkt4Ptr pkt;
+    // Let's add five packets.
+    for (int i = 1; i < 6; ++i) {
+        pkt.reset(new Pkt4(DHCPDISCOVER, 1000 + i));
+        ASSERT_NO_THROW(q4.enqueuePacket(pkt, sock));
+        EXPECT_EQ(i, q4.getSize());
+    }
+
+    // Setting eat count to two and dequeuing (from the front, by default), 
+    // should discard 1001 and 1002, resulting in a dequeue of 1003.
+    q4.eat_count_ = 2;
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+    EXPECT_EQ(2, q4.getSize());
+
+    // Setting eat count to one and dequeing from the back, should discard
+    // 1005 and dequeue 104.
+    q4.eat_count_ = 1;
+    ASSERT_NO_THROW(pkt = q4.dequeuePacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+    EXPECT_EQ(0, q4.getSize());
+}
+
+} // end of anonymous namespace
diff --git a/src/lib/dhcp/tests/packet_queue6_unittest.cc b/src/lib/dhcp/tests/packet_queue6_unittest.cc
new file mode 100644 (file)
index 0000000..5d2f91c
--- /dev/null
@@ -0,0 +1,293 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/io_address.h>
+#include <dhcp/dhcp6.h>
+#include <dhcp/packet_queue.h>
+
+#include <boost/shared_ptr.hpp>
+#include <gtest/gtest.h>
+
+#include <iostream>
+#include <sstream>
+
+using namespace std;
+using namespace isc;
+using namespace isc::dhcp;
+
+namespace {
+
+class TestQueue6 : public PacketQueueRing<Pkt6Ptr> {
+public:
+
+    /// @brief Constructor
+    ///
+    /// @param queue_size maximum number of packets the queue can hold
+    TestQueue6(size_t queue_size) 
+        : PacketQueueRing(queue_size), drop_enabled_(false), eat_count_(0) {
+    };
+
+    /// @brief virtual Destructor
+    virtual ~TestQueue6(){};
+
+    /// @brief Determines is a packet should be dropped.
+    ///
+    /// If drop is enabled and either the packet transaction
+    /// id or the socket source port are even numbers, drop the packet
+    ///
+    /// @param packet the packet under consideration
+    /// @param source the socket the packet came from
+    ///
+    /// @return True if the packet should be dropped.
+    virtual bool dropPacket(Pkt6Ptr packet,
+                            const SocketInfo& source) {
+        if (drop_enabled_) {
+            return ((packet->getTransid() % 2 == 0) || 
+                    (source.port_ % 2 == 0)); 
+        }
+
+        return (false);
+    }
+
+    /// @brief Discards a number of packets from one end of the queue
+    ///
+    /// Dequeue and discard eat_count_ packets from the given end of
+    /// the queue_.
+    ///
+    /// @param from end of the queue from which packets should discarded
+    ///
+    /// @return The number of packets discarded.
+    virtual int eatPackets(const QueueEnd& from) {
+        int eaten = 0;
+        for ( ; eaten < eat_count_; ++eaten) {
+            Pkt6Ptr pkt = popPacket(from);
+            if (!pkt) {
+                break;
+            }
+        }
+
+        return (eaten);
+    }
+
+    bool drop_enabled_;
+    int eat_count_;
+};
+
+TEST(TestQueue6, interfaceBasics) {
+    PacketQueue6Ptr q6(new TestQueue6(100));
+    ASSERT_TRUE(q6);
+    EXPECT_TRUE(q6->empty());
+    EXPECT_EQ(100, q6->getCapacity());
+    EXPECT_EQ(0, q6->getSize());
+
+    size_t min = TestQueue6::MIN_RING_CAPACITY;
+    ASSERT_THROW(q6->setCapacity(min - 1), BadValue);
+    ASSERT_NO_THROW(q6->setCapacity(min));
+    EXPECT_EQ(min, q6->getCapacity());
+}
+
+TEST(TestQueue6, ringTest) {
+    PacketQueue6Ptr q6(new TestQueue6(3));
+
+    EXPECT_EQ(3, q6->getCapacity());
+    // Enqueue five packets.  The first two should be pushed off.
+    SocketInfo sock1(isc::asiolink::IOAddress("127.0.0.1"), 777, 10);
+    for (int i = 1; i < 6; ++i) {
+        Pkt6Ptr pkt(new Pkt6(DHCPV6_SOLICIT, 1000+i));
+        ASSERT_NO_THROW(q6->enqueuePacket(pkt, sock1));
+
+        int exp_size = (i > 3 ? 3 : i);
+        EXPECT_EQ(exp_size, q6->getSize()); 
+    }
+
+    // We should have transids 1005,1004,1003  (back to front)
+
+    // Peek front should be transid 1003.
+    Pkt6Ptr pkt;
+    ASSERT_NO_THROW(pkt = q6->peek(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Peek back should be transid 1005.
+    ASSERT_NO_THROW(pkt = q6->peek(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1005, pkt->getTransid());
+
+    // Pop front should return transid 1003.
+    ASSERT_NO_THROW(pkt = q6->popPacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Pop back should return transid 1005.
+    ASSERT_NO_THROW(pkt = q6->popPacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1005, pkt->getTransid());
+
+    // Peek front should be transid 1004.
+    ASSERT_NO_THROW(pkt = q6->peek(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Peek back should be transid 1004.
+    ASSERT_NO_THROW(pkt = q6->peek(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Pop front should return transid 1004.
+    ASSERT_NO_THROW(pkt = q6->popPacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+
+    // Pop front should return an empty pointer.
+    ASSERT_NO_THROW(pkt = q6->popPacket(QueueEnd::BACK));
+    ASSERT_FALSE(pkt);
+
+    // Enqueue three packets.
+    for (int i = 1; i < 3; ++i) {
+        Pkt6Ptr pkt(new Pkt6(DHCPV6_SOLICIT, 1000+i));
+        ASSERT_NO_THROW(q6->enqueuePacket(pkt, sock1));
+        EXPECT_EQ(i, q6->getSize()); 
+    }
+
+    // Let's flush the buffer and then verify it is empty.
+    q6->clear();
+    EXPECT_TRUE(q6->empty()); 
+    EXPECT_EQ(0, q6->getSize()); 
+}
+
+TEST(TestQueue6, enqueueDequeueTest) {
+    PacketQueue6Ptr q6(new TestQueue6(100));
+    EXPECT_TRUE(q6->empty());
+
+    SocketInfo sock1(isc::asiolink::IOAddress("127.0.0.1"), 777, 10);
+
+    // Enqueue the first packet.
+    Pkt6Ptr pkt(new Pkt6(DHCPV6_SOLICIT, 1002));
+    ASSERT_NO_THROW(q6->enqueuePacket(pkt, sock1));
+    EXPECT_EQ(1, q6->getSize());
+
+    // Enqueue a packet onto the front.
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1003));
+    ASSERT_NO_THROW(q6->enqueuePacket(pkt, sock1, QueueEnd::FRONT));
+    EXPECT_EQ(2, q6->getSize());
+
+    // Enqueue a packet onto the back.
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1001));
+    ASSERT_NO_THROW(q6->enqueuePacket(pkt, sock1, QueueEnd::BACK));
+    EXPECT_EQ(3, q6->getSize());
+
+    // By default we dequeue from the front. We should get transid 1003.
+    ASSERT_NO_THROW(pkt = q6->dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    // Dequeue from the back, we should get transid 1001.
+    ASSERT_NO_THROW(pkt = q6->dequeuePacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1001, pkt->getTransid());
+
+    // Dequeue from the front, we should get transid 1002.
+    ASSERT_NO_THROW(pkt = q6->dequeuePacket(QueueEnd::FRONT));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1002, pkt->getTransid());
+
+    // Queue should be empty.
+    ASSERT_NO_THROW(pkt = q6->dequeuePacket());
+    ASSERT_FALSE(pkt);
+}
+
+TEST(TestQueue6, dropPacketTest) {
+    TestQueue6 q6(100);
+    EXPECT_TRUE(q6.empty());
+    ASSERT_FALSE(q6.drop_enabled_);
+    ASSERT_EQ(0, q6.eat_count_);
+
+    SocketInfo sockEven(isc::asiolink::IOAddress("127.0.0.1"), 888, 10);
+    SocketInfo sockOdd(isc::asiolink::IOAddress("127.0.0.1"), 777, 11);
+
+    // Drop is not enabled. 
+    // We should be able to enqueu a packet with even numbered values.
+    Pkt6Ptr pkt(new Pkt6(DHCPV6_SOLICIT, 1002));
+    ASSERT_NO_THROW(q6.enqueuePacket(pkt, sockEven));
+    EXPECT_EQ(1, q6.getSize());
+
+    // We should be able to enqueu a packet with odd numbered values.
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1003));
+    ASSERT_NO_THROW(q6.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(2, q6.getSize());
+
+    // Enable drop logic.
+    q6.drop_enabled_ = true;
+
+    // We should not be able to add one with an even-numbered transid.      
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1004));
+    ASSERT_NO_THROW(q6.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(2, q6.getSize());
+
+    // We should not be able to add one with from even-numbered port.      
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1005));
+    ASSERT_NO_THROW(q6.enqueuePacket(pkt, sockEven));
+    EXPECT_EQ(2, q6.getSize());
+
+    // We should be able to add one with an odd-numbered values.
+    pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1007));
+    ASSERT_NO_THROW(q6.enqueuePacket(pkt, sockOdd));
+    EXPECT_EQ(3, q6.getSize());
+
+    // Dequeue them and make sure they are as expected: 1002,1003, and 1007. 
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1002, pkt->getTransid());
+
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1007, pkt->getTransid());
+
+    // Queue should be empty.
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket());
+    ASSERT_FALSE(pkt);
+}
+
+TEST(TestQueue6, eatPacketsTest) {
+    TestQueue6 q6(100);
+    EXPECT_TRUE(q6.empty());
+    ASSERT_FALSE(q6.drop_enabled_);
+    ASSERT_EQ(0, q6.eat_count_);
+
+    SocketInfo sock(isc::asiolink::IOAddress("127.0.0.1"), 888, 10);
+
+    Pkt6Ptr pkt;
+    // Let's add five packets.
+    for (int i = 1; i < 6; ++i) {
+        pkt.reset(new Pkt6(DHCPV6_SOLICIT, 1000 + i));
+        ASSERT_NO_THROW(q6.enqueuePacket(pkt, sock));
+        EXPECT_EQ(i, q6.getSize());
+    }
+
+    // Setting eat count to two and dequeuing (from the front, by default), 
+    // should discard 1001 and 1002, resulting in a dequeue of 1003.
+    q6.eat_count_ = 2;
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket());
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1003, pkt->getTransid());
+    EXPECT_EQ(2, q6.getSize());
+
+    // Setting eat count to one and dequeing from the back, should discard
+    // 1005 and dequeue 104.
+    q6.eat_count_ = 1;
+    ASSERT_NO_THROW(pkt = q6.dequeuePacket(QueueEnd::BACK));
+    ASSERT_TRUE(pkt);
+    EXPECT_EQ(1004, pkt->getTransid());
+    EXPECT_EQ(0, q6.getSize());
+}
+
+} // end of anonymous namespace
index e6ba36f44ea93f9428d37f33857072542b971542..b60a841c14a3d70d7b2aaf96c66e57f489ad75d9 100644 (file)
@@ -28,6 +28,7 @@ CfgIface::CfgIface()
 
 void
 CfgIface::closeSockets() const {
+    IfaceMgr::instance().stopReceiver();
     IfaceMgr::instance().closeSockets();
 }
 
@@ -172,10 +173,14 @@ CfgIface::openSockets(const uint16_t family, const uint16_t port,
         // use_bcast is ignored for V6.
         sopen = IfaceMgr::instance().openSockets6(port, error_callback);
     }
-
-    // If no socket were opened, log a warning because the server will
-    // not respond to any queries.
-    if (!sopen) {
+    
+    if (sopen) {
+        // @todo we may consider starting/stopping this when DHCP service is
+        // enable/disabled, rather then when we open sockets.
+        IfaceMgr::instance().startDHCPReceiver(family);
+    } else {
+        // If no socket were opened, log a warning because the server will
+        // not respond to any queries.
         LOG_WARN(dhcpsrv_logger, DHCPSRV_NO_SOCKETS_OPEN);
     }
 }
index 4cb318cbdb52c325102b42af89e90363ec9703d1..743001277cbae3911bb9f04ac631022c8c4b4dec 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2015,2017 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2014-2018 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -157,6 +157,7 @@ public:
     CfgIface();
 
     /// @brief Convenience function which closes all open sockets.
+    /// It stops the receiver thread too.
     void closeSockets() const;
 
     /// @brief Compares two @c CfgIface objects for equality.
@@ -172,6 +173,7 @@ public:
     /// sockets bound to unicast address. See @c CfgIface::use function
     /// documentation for details how to specify interfaces and unicast
     /// addresses to bind the sockets to.
+    /// This function starts the family receiver.
     ///
     /// @param family Address family (AF_INET or AF_INET6).
     /// @param port Port number to be used to bind sockets to.
index c5559d810d9fb3c2fa6fb8ba9f12f89fc337e46c..d93431b7e2d8768c3f2935adcffa1764ca7b349a 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2015-2017 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2015-2018 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -47,6 +47,7 @@ IfacesConfigParser::parse(const CfgIfacePtr& cfg,
     if (re_detect) {
         // Interface clear will drop opened socket information
         // so close them if the caller did not.
+        IfaceMgr::instance().stopReceiver();
         IfaceMgr::instance().closeSockets();
         IfaceMgr::instance().clearIfaces();
         IfaceMgr::instance().detectIfaces();
index 3a9984187fc52412c94d6a5eedaa59785b6de39c..76eda4ee43ed2f2b57d07277f6dea74348627410 100644 (file)
@@ -20,6 +20,7 @@ public:
     /// Constructor
     DatabaseConnectionCallbackTest()
         : db_reconnect_ctl_(0) {
+        DatabaseConnection::db_lost_callback = 0;
     }
 
     /// @brief Callback to register with a DatabaseConnection
index 2f56c33e254c5171b435748e2d2daecc89a78f7f..b64621a09489b80dfc47e9ed85315336bdb98703 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2012-2015 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2012-2018 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -97,6 +97,9 @@ private:
     Impl* impl_;
 };
 
+/// @brief Thread pointer type.
+typedef boost::shared_ptr<Thread> ThreadPtr;
+
 }
 }
 }