]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
added thread pool
authorRazvan Becheriu <razvan@isc.org>
Mon, 25 Mar 2019 14:34:27 +0000 (16:34 +0200)
committerRazvan Becheriu <razvan@isc.org>
Fri, 12 Apr 2019 12:07:33 +0000 (15:07 +0300)
src/bin/dhcp4/ctrl_dhcp4_srv.cc
src/bin/dhcp4/ctrl_dhcp4_srv.h
src/bin/dhcp4/dhcp4_srv.cc
src/bin/dhcp4/dhcp4_srv.h
src/bin/dhcp6/ctrl_dhcp6_srv.cc
src/bin/dhcp6/ctrl_dhcp6_srv.h
src/bin/dhcp6/dhcp6_srv.cc
src/bin/dhcp6/dhcp6_srv.h
src/lib/dhcpsrv/Makefile.am

index 0595c1ad01e68e1e3866322401fdca97614e095d..868bdace55ad6e2f9fd6061cee5369b1f0295ecc 100644 (file)
@@ -5,29 +5,38 @@
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 #include <config.h>
-#include <cc/data.h>
+
 #include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
 #include <config/command_mgr.h>
+#include <database/dbaccess_parser.h>
+#include <dhcp/libdhcp++.h>
 #include <dhcp4/ctrl_dhcp4_srv.h>
 #include <dhcp4/dhcp4_log.h>
 #include <dhcp4/dhcp4to6_ipc.h>
-#include <dhcp4/parser_context.h>
 #include <dhcp4/json_config_parser.h>
-#include <dhcpsrv/cfgmgr.h>
+#include <dhcp4/parser_context.h>
 #include <dhcpsrv/cfg_db_access.h>
-#include <hooks/hooks.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
 #include <hooks/hooks_manager.h>
 #include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
+#include <util/threads/lock_guard.h>
+#include <util/threads/reverse_lock.h>
+
 #include <signal.h>
+
+#include <memory>
 #include <sstream>
 
-using namespace isc::data;
+using namespace isc::config;
 using namespace isc::db;
 using namespace isc::dhcp;
+using namespace isc::data;
 using namespace isc::hooks;
-using namespace isc::config;
 using namespace isc::stats;
+using namespace isc::util::thread;
 using namespace std;
 
 namespace {
@@ -76,34 +85,6 @@ namespace dhcp {
 
 ControlledDhcpv4Srv* ControlledDhcpv4Srv::server_ = NULL;
 
-void
-ControlledDhcpv4Srv::init(const std::string& file_name) {
-    // Configure the server using JSON file.
-    ConstElementPtr result = loadConfigFile(file_name);
-    int rcode;
-    ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
-    if (rcode != 0) {
-        string reason = comment ? comment->stringValue() :
-            "no details available";
-        isc_throw(isc::BadValue, reason);
-    }
-
-    // We don't need to call openActiveSockets() or startD2() as these
-    // methods are called in processConfig() which is called by
-    // processCommand("config-set", ...)
-
-    // Set signal handlers. When the SIGHUP is received by the process
-    // the server reconfiguration will be triggered. When SIGTERM or
-    // SIGINT will be received, the server will start shutting down.
-    signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM));
-    // Set the pointer to the handler function.
-    signal_handler_ = signalHandler;
-}
-
-void ControlledDhcpv4Srv::cleanup() {
-    // Nothing to do here. No need to disconnect from anything.
-}
-
 /// @brief Configure DHCPv4 server using the configuration file specified.
 ///
 /// This function is used to both configure the DHCP server on its startup
@@ -120,16 +101,14 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) {
     // configuration from a JSON file.
 
     isc::data::ConstElementPtr json;
-    isc::data::ConstElementPtr dhcp4;
-    isc::data::ConstElementPtr logger;
     isc::data::ConstElementPtr result;
 
     // Basic sanity check: file name must not be empty.
     try {
         if (file_name.empty()) {
             // Basic sanity check: file name must not be empty.
-            isc_throw(isc::BadValue, "JSON configuration file not specified."
-                      " Please use -c command line option.");
+            isc_throw(isc::BadValue, "JSON configuration file not specified. Please "
+                      "use -c command line option.");
         }
 
         // Read contents of the file and parse it as JSON
@@ -183,6 +162,38 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) {
     return (result);
 }
 
+void
+ControlledDhcpv4Srv::init(const std::string& file_name) {
+    // Configure the server using JSON file.
+    ConstElementPtr result;
+    {
+        LockGuard<mutex> lock(serverLock());
+        result = loadConfigFile(file_name);
+    }
+
+    int rcode;
+    ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
+    if (rcode != 0) {
+        string reason = comment ? comment->stringValue() :
+            "no details available";
+        isc_throw(isc::BadValue, reason);
+    }
+
+    // We don't need to call openActiveSockets() or startD2() as these
+    // methods are called in processConfig() which is called by
+    // processCommand("config-set", ...)
+
+    // Set signal handlers. When the SIGHUP is received by the process
+    // the server reconfiguration will be triggered. When SIGTERM or
+    // SIGINT will be received, the server will start shutting down.
+    signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM));
+    // Set the pointer to the handler function.
+    signal_handler_ = signalHandler;
+}
+
+void ControlledDhcpv4Srv::cleanup() {
+    // Nothing to do here. No need to disconnect from anything.
+}
 
 ConstElementPtr
 ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) {
@@ -190,28 +201,26 @@ ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) {
         ControlledDhcpv4Srv::getInstance()->shutdown();
     } else {
         LOG_WARN(dhcp4_logger, DHCP4_NOT_RUNNING);
-        ConstElementPtr answer = isc::config::createAnswer(1,
-                                              "Shutdown failure.");
+        ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure.");
         return (answer);
     }
-    ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down.");
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down.");
     return (answer);
 }
 
 ConstElementPtr
 ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
-
     /// @todo delete any stored CalloutHandles referring to the old libraries
     /// Get list of currently loaded libraries and reload them.
     HookLibsCollection loaded = HooksManager::getLibraryInfo();
     bool status = HooksManager::loadLibraries(loaded);
     if (!status) {
         LOG_ERROR(dhcp4_logger, DHCP4_HOOKS_LIBS_RELOAD_FAIL);
-        ConstElementPtr answer = isc::config::createAnswer(1,
+        ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR,
                                  "Failed to reload hooks libraries.");
         return (answer);
     }
-    ConstElementPtr answer = isc::config::createAnswer(0,
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
                              "Hooks libraries successfully reloaded.");
     return (answer);
 }
@@ -219,7 +228,6 @@ ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
 ConstElementPtr
 ControlledDhcpv4Srv::commandConfigReloadHandler(const string&,
                                                 ConstElementPtr /*args*/) {
-
     // Get configuration file name.
     std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile();
     try {
@@ -232,7 +240,7 @@ ControlledDhcpv4Srv::commandConfigReloadHandler(const string&,
         LOG_ERROR(dhcp4_logger, DHCP4_DYNAMIC_RECONFIGURATION_FAIL)
             .arg(file);
         return (createAnswer(CONTROL_RESULT_ERROR,
-                             "Config reload failed:" + string(ex.what())));
+                             "Config reload failed: " + string(ex.what())));
     }
 }
 
@@ -245,8 +253,7 @@ ControlledDhcpv4Srv::commandConfigGetHandler(const string&,
 }
 
 ConstElementPtr
-ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
-                                               ConstElementPtr args) {
+ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, ConstElementPtr args) {
     string filename;
 
     if (args) {
@@ -265,6 +272,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
 
     if (filename.empty()) {
         // filename parameter was not specified, so let's use whatever we remember
+        // from the command-line
         filename = getConfigFile();
     }
 
@@ -299,7 +307,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
 ConstElementPtr
 ControlledDhcpv4Srv::commandConfigSetHandler(const string&,
                                              ConstElementPtr args) {
-    const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error
+    const int status_code = CONTROL_RESULT_ERROR;
     ConstElementPtr dhcp4;
     string message;
 
@@ -351,7 +359,7 @@ ControlledDhcpv4Srv::commandConfigSetHandler(const string&,
     // the logging first in case there's a configuration failure.
     int rcode = 0;
     isc::config::parseAnswer(rcode, result);
-    if (rcode == 0) {
+    if (rcode == CONTROL_RESULT_SUCCESS) {
         CfgMgr::instance().getStagingCfg()->applyLoggingCfg();
 
         // Use new configuration.
@@ -467,17 +475,16 @@ ControlledDhcpv4Srv::commandVersionGetHandler(const string&, ConstElementPtr) {
     ElementPtr extended = Element::create(Dhcpv4Srv::getVersion(true));
     ElementPtr arguments = Element::createMap();
     arguments->set("extended", extended);
-    ConstElementPtr answer = isc::config::createAnswer(0,
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
                                 Dhcpv4Srv::getVersion(false),
                                 arguments);
     return (answer);
 }
 
 ConstElementPtr
-ControlledDhcpv4Srv::commandBuildReportHandler(const string&,
-                                               ConstElementPtr) {
+ControlledDhcpv4Srv::commandBuildReportHandler(const string&, ConstElementPtr) {
     ConstElementPtr answer =
-        isc::config::createAnswer(0, isc::detail::getConfigReport());
+        isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport());
     return (answer);
 }
 
@@ -518,12 +525,20 @@ ControlledDhcpv4Srv::processCommand(const string& command,
     ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance();
 
     if (!srv) {
-        ConstElementPtr no_srv = isc::config::createAnswer(1,
-          "Server object not initialized, so can't process command '" +
+        ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR,
+          "Server object not initialized, can't process command '" +
           command + "', arguments: '" + txt + "'.");
         return (no_srv);
     }
 
+    if (srv->run_multithreaded_) {
+        {
+            ReverseLock<std::mutex> rlk(srv->serverLock());
+            srv->pkt_thread_pool_.destroy();
+        }
+        srv->pkt_thread_pool_.create(Dhcpv4Srv::threadCount());
+    }
+
     try {
         if (command == "shutdown") {
             return (srv->commandShutdownHandler(command, args));
@@ -562,12 +577,11 @@ ControlledDhcpv4Srv::processCommand(const string& command,
             return (srv->commandConfigWriteHandler(command, args));
 
         }
-        ConstElementPtr answer = isc::config::createAnswer(1,
-                                 "Unrecognized command:" + command);
-        return (answer);
+        return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " +
+                                         command));
     } catch (const Exception& ex) {
-        return (isc::config::createAnswer(1, "Error while processing command '"
-                                          + command + "':" + ex.what() +
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" +
+                                          command + "': " + ex.what() +
                                           ", params: '" + txt + "'"));
     }
 }
@@ -585,7 +599,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
 
     if (!srv) {
         err << "Server object not initialized, can't process config.";
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     ConstElementPtr answer = configureDhcp4Server(*srv, config);
@@ -599,8 +613,8 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
             return (answer);
         }
     } catch (const std::exception& ex) {
-        err << "Failed to process configuration:" << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        err << "Failed to process configuration: " << ex.what();
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Re-open lease and host database with new parameters.
@@ -612,7 +626,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
         cfg_db->createManagers();
     } catch (const std::exception& ex) {
         err << "Unable to open database: " << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Server will start DDNS communications if its enabled.
@@ -621,7 +635,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
     } catch (const std::exception& ex) {
         err << "Error starting DHCP_DDNS client after server reconfiguration: "
             << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Setup DHCPv4-over-DHCPv6 IPC
@@ -631,7 +645,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
         std::ostringstream err;
         err << "error starting DHCPv4-over-DHCPv6 IPC "
                " after server reconfiguration: " << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Configure DHCP packet queueing
@@ -657,7 +671,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
     // is no need to rollback configuration if socket fails to open on any
     // of the interfaces.
     CfgMgr::instance().getStagingCfg()->getCfgIface()->
-        openSockets(AF_INET, srv->getServerPort(),
+        openSockets(AF_INET, srv->getServerPort(), srv->serverLock(),
                     getInstance()->useBroadcast());
 
     // Install the timers for handling leases reclamation.
@@ -671,7 +685,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
         err << "unable to setup timers for periodically running the"
             " reclamation of the expired leases: "
             << ex.what() << ".";
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo();
@@ -728,7 +742,7 @@ isc::data::ConstElementPtr
 ControlledDhcpv4Srv::checkConfig(isc::data::ConstElementPtr config) {
 
     LOG_DEBUG(dhcp4_logger, DBG_DHCP4_COMMAND, DHCP4_CONFIG_RECEIVED)
-              .arg(config->str());
+        .arg(config->str());
 
     ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance();
 
@@ -737,15 +751,16 @@ ControlledDhcpv4Srv::checkConfig(isc::data::ConstElementPtr config) {
 
     if (!srv) {
         err << "Server object not initialized, can't process config.";
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     return (configureDhcp4Server(*srv, config, true));
 }
 
 ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t server_port /*= DHCP4_SERVER_PORT*/,
-                                         uint16_t client_port /*= 0*/)
-    : Dhcpv4Srv(server_port, client_port), io_service_(),
+                                         uint16_t client_port /*= 0*/,
+                                         bool run_multithreaded /*= false*/)
+    : Dhcpv4Srv(server_port, client_port, run_multithreaded), io_service_(),
       timer_mgr_(TimerMgr::instance()) {
     if (getInstance()) {
         isc_throw(InvalidOperation,
@@ -801,21 +816,20 @@ ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t server_port /*= DHCP4_SERVER_P
     CommandMgr::instance().registerCommand("statistic-get",
         boost::bind(&StatsMgr::statisticGetHandler, _1, _2));
 
-    CommandMgr::instance().registerCommand("statistic-reset",
-        boost::bind(&StatsMgr::statisticResetHandler, _1, _2));
-
-    CommandMgr::instance().registerCommand("statistic-remove",
-        boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2));
-
     CommandMgr::instance().registerCommand("statistic-get-all",
         boost::bind(&StatsMgr::statisticGetAllHandler, _1, _2));
 
+    CommandMgr::instance().registerCommand("statistic-reset",
+        boost::bind(&StatsMgr::statisticResetHandler, _1, _2));
+
     CommandMgr::instance().registerCommand("statistic-reset-all",
         boost::bind(&StatsMgr::statisticResetAllHandler, _1, _2));
 
+    CommandMgr::instance().registerCommand("statistic-remove",
+        boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2));
+
     CommandMgr::instance().registerCommand("statistic-remove-all",
         boost::bind(&StatsMgr::statisticRemoveAllHandler, _1, _2));
-
 }
 
 void ControlledDhcpv4Srv::shutdown() {
@@ -839,14 +853,14 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() {
         // Deregister any registered commands (please keep in alphabetic order)
         CommandMgr::instance().deregisterCommand("build-report");
         CommandMgr::instance().deregisterCommand("config-get");
+        CommandMgr::instance().deregisterCommand("config-set");
         CommandMgr::instance().deregisterCommand("config-reload");
         CommandMgr::instance().deregisterCommand("config-test");
         CommandMgr::instance().deregisterCommand("config-write");
-        CommandMgr::instance().deregisterCommand("leases-reclaim");
-        CommandMgr::instance().deregisterCommand("libreload");
-        CommandMgr::instance().deregisterCommand("config-set");
         CommandMgr::instance().deregisterCommand("dhcp-disable");
         CommandMgr::instance().deregisterCommand("dhcp-enable");
+        CommandMgr::instance().deregisterCommand("leases-reclaim");
+        CommandMgr::instance().deregisterCommand("libreload");
         CommandMgr::instance().deregisterCommand("shutdown");
         CommandMgr::instance().deregisterCommand("statistic-get");
         CommandMgr::instance().deregisterCommand("statistic-get-all");
@@ -862,8 +876,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() {
         ;
     }
 
-    server_ = NULL; // forget this instance. Noone should call any handlers at
-                    // this stage.
+    server_ = NULL; // forget this instance. There should be no callback anymore
+                    // at this stage anyway.
 }
 
 void ControlledDhcpv4Srv::sessionReader(void) {
@@ -997,5 +1011,5 @@ ControlledDhcpv4Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg,
     }
 }
 
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+}  // namespace dhcp
+}  // namespace isc
index c3120af2402e018e1539972f86f1146b4ca9c7e7..f05f72481c98401b3b58313064be5fecba7f7215 100644 (file)
@@ -29,11 +29,13 @@ public:
     ///
     /// @param server_port UDP port to be opened for DHCP traffic
     /// @param client_port UDP port where all responses are sent to.
+    /// @param run_multithreaded enables or disables multithreaded mode
     ControlledDhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT,
-                        uint16_t client_port = 0);
+                        uint16_t client_port = 0,
+                        bool run_multithreaded = false);
 
     /// @brief Destructor.
-    ~ControlledDhcpv4Srv();
+    virtual ~ControlledDhcpv4Srv();
 
     /// @brief Initializes the server.
     ///
@@ -43,12 +45,12 @@ public:
     /// This method may throw if initialization fails.
     void init(const std::string& config_file);
 
-    /// @brief Loads specific config file
+    /// @brief Loads specific configuration file
     ///
     /// This utility method is called whenever we know a filename of the config
     /// and need to load it. It calls config-set command once the content of
     /// the file has been loaded and verified to be a sane JSON configuration.
-    /// config-set handler will process the config file (load it as current
+    /// config-set handler will process the config file (apply it as current
     /// configuration).
     ///
     /// @param file_name name of the file to be loaded
@@ -126,7 +128,7 @@ private:
     /// @brief Callback that will be called from iface_mgr when data
     /// is received over control socket.
     ///
-    /// This static callback method is called from IfaceMgr::receive6() method,
+    /// This static callback method is called from IfaceMgr::receive4() method,
     /// when there is a new command or configuration sent over control socket
     /// (that was sent from some yet unspecified sender).
     static void sessionReader(void);
@@ -247,7 +249,6 @@ private:
     commandDhcpEnableHandler(const std::string& command,
                              isc::data::ConstElementPtr args);
 
-
     /// @Brief handler for processing 'version-get' command
     ///
     /// This handler processes version-get command, which returns
@@ -377,7 +378,7 @@ private:
     /// @brief Static pointer to the sole instance of the DHCP server.
     ///
     /// This is required for config and command handlers to gain access to
-    /// the server
+    /// the server. Some of them need to be static methods.
     static ControlledDhcpv4Srv* server_;
 
     /// @brief IOService object, used for all ASIO operations.
@@ -390,7 +391,7 @@ private:
     TimerMgrPtr timer_mgr_;
 };
 
-}; // namespace isc::dhcp
-}; // namespace isc
+}  // namespace dhcp
+}  // namespace isc
 
 #endif
index 9f3727122589af39366752b407118694c41c2214..d3c9197220b96fd61326023dae00ff3b577c76c7 100644 (file)
@@ -46,7 +46,7 @@
 #include <hooks/hooks_manager.h>
 #include <stats/stats_mgr.h>
 #include <util/strutil.h>
-#include <stats/stats_mgr.h>
+#include <util/threads/lock_guard.h>
 #include <log/logger.h>
 #include <cryptolink/cryptolink.h>
 #include <cfgrpt/config_report.h>
@@ -70,6 +70,7 @@
 
 #include <iomanip>
 
+
 using namespace isc;
 using namespace isc::asiolink;
 using namespace isc::cryptolink;
@@ -80,6 +81,8 @@ using namespace isc::log;
 using namespace isc::stats;
 using namespace std;
 
+using isc::util::thread::LockGuard;
+
 namespace {
 
 /// Structure that holds registered hook indexes
@@ -187,7 +190,7 @@ Dhcpv4Exchange::Dhcpv4Exchange(const AllocEnginePtr& alloc_engine,
             .arg(query_->getLabel())
             .arg(classes.toText());
     }
-};
+}
 
 void
 Dhcpv4Exchange::initResponse() {
@@ -443,15 +446,20 @@ Dhcpv4Exchange::setReservedMessageFields() {
 const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
 
 Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port,
+                     bool run_multithreaded /* = false */,
                      const bool use_bcast, const bool direct_response_desired)
     : io_service_(new IOService()), shutdown_(true), alloc_engine_(),
       server_port_(server_port), use_bcast_(use_bcast),
       client_port_(client_port),
       network_state_(new NetworkState(NetworkState::DHCPv4)),
-      cb_control_(new CBControlDHCPv4()) {
+      cb_control_(new CBControlDHCPv4()),
+      run_multithreaded_(run_multithreaded) {
+
+    mutex_.reset(new std::mutex());
 
     LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET)
         .arg(server_port);
+
     try {
         // Port 0 is used for testing purposes where we don't open broadcast
         // capable sockets. So, set the packet filter handling direct traffic
@@ -734,10 +742,21 @@ Dhcpv4Srv::sendPacket(const Pkt4Ptr& packet) {
 
 bool
 Dhcpv4Srv::run() {
+    if (run_multithreaded_) {
+        // Creating the process packet thread pool
+        // The number of thread pool's threads should be read from configuration
+        // file or it should be determined by the number of hardware threads and
+        // the number of Cassandra DB nodes.
+        pkt_thread_pool_.create(Dhcpv4Srv::threadCount());
+    }
+
     while (!shutdown_) {
         try {
             run_one();
-            getIOService()->poll();
+            {
+                LockGuard<mutex> lock(serverLock());
+                getIOService()->poll();
+            }
         } catch (const std::exception& e) {
             // General catch-all exception that are not caught by more specific
             // catches. This one is for exceptions derived from std::exception.
@@ -751,6 +770,11 @@ Dhcpv4Srv::run() {
         }
     }
 
+    // destroying the thread pool
+    if (run_multithreaded_) {
+        pkt_thread_pool_.destroy();
+    }
+
     return (true);
 }
 
@@ -761,11 +785,27 @@ Dhcpv4Srv::run_one() {
     Pkt4Ptr rsp;
 
     try {
+
+        // Do not read more packets from socket if there are enough
+        // packets to be processed in the packet thread pool queue
+        const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
+        const auto queue_full_wait = std::chrono::milliseconds(1);
+        size_t pkt_queue_size = pkt_thread_pool_.count();
+        if (pkt_queue_size >= Dhcpv4Srv::threadCount() *
+            max_queued_pkt_per_thread) {
+            std::this_thread::sleep_for(queue_full_wait);
+            return;
+        }
+
         // Set select() timeout to 1s. This value should not be modified
         // because it is important that the select() returns control
         // frequently so as the IOService can be polled for ready handlers.
         uint32_t timeout = 1;
-        query = receivePacket(timeout);
+        {
+            // LOG_DEBUG(packet4_logger, DBG_DHCP4_DETAIL, DHCP4_BUFFER_WAIT).arg(timeout);
+            LockGuard<mutex> lock(serverLock());
+            query = receivePacket(timeout);
+        }
 
         // Log if packet has arrived. We can't log the detailed information
         // about the DHCP message because it hasn't been unpacked/parsed
@@ -812,6 +852,7 @@ Dhcpv4Srv::run_one() {
     // receivePacket the process could wait up to the duration of timeout
     // of select() to terminate.
     try {
+        LockGuard<mutex> lock(serverLock());
         handleSignal();
     } catch (const std::exception& e) {
         // Standard exception occurred. Let's be on the safe side to
@@ -834,9 +875,32 @@ Dhcpv4Srv::run_one() {
             .arg(query->getLabel());
         return;
     } else {
-        processPacket(query, rsp);
+    if (run_multithreaded_) {
+        ThreadPool::WorkItemCallBack call_back =
+            std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow, this, query, rsp);
+        pkt_thread_pool_.add(call_back);
+    } else {
+        processPacketAndSendResponse(query, rsp);
+    }
+    }
+}
+
+void
+Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+    try {
+        processPacketAndSendResponse(query, rsp);
+    } catch (const std::exception& e) {
+        LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
+            .arg(e.what());
+    } catch (...) {
+        LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
     }
+}
 
+void
+Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+    LockGuard<mutex> lock(serverLock());
+    processPacket(query, rsp);
     if (!rsp) {
         return;
     }
@@ -3682,6 +3746,23 @@ int Dhcpv4Srv::getHookIndexLease4Decline() {
     return (Hooks.hook_index_lease4_decline_);
 }
 
+uint32_t Dhcpv4Srv::threadCount() {
+    uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+    if (sys_threads) {
+        return sys_threads;
+    }
+    sys_threads = std::thread::hardware_concurrency();
+    return sys_threads * 4;
+}
+
+uint32_t Dhcpv4Srv::maxThreadQueueSize() {
+    uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+    if (max_thread_queue_size) {
+        return max_thread_queue_size;
+    }
+    return 4;
+}
+
 void Dhcpv4Srv::discardPackets() {
     // Clear any packets held by the callhout handle store and
     // all parked packets
@@ -3690,5 +3771,5 @@ void Dhcpv4Srv::discardPackets() {
     HooksManager::clearParkingLots();
 }
 
-}   // namespace dhcp
-}   // namespace isc
+}  // namespace dhcp
+}  // namespace isc
index 72680fff36e85f773dcc610d57a3e41d63653c30..ff02b7781b1ac093814dd45b6bd30a3c34575f89 100644 (file)
@@ -15,6 +15,7 @@
 #include <dhcp/option4_client_fqdn.h>
 #include <dhcp/option_custom.h>
 #include <dhcp_ddns/ncr_msg.h>
+#include <dhcpsrv/thread_pool.h>
 #include <dhcpsrv/alloc_engine.h>
 #include <dhcpsrv/cb_ctl_dhcp4.h>
 #include <dhcpsrv/cfg_option.h>
@@ -30,6 +31,8 @@
 #include <functional>
 #include <iostream>
 #include <queue>
+#include <boost/scoped_ptr.hpp>
+#include <atomic>
 
 // Undefine the macro OPTIONAL which is defined in some operating
 // systems but conflicts with a member of the RequirementLevel enum in
@@ -223,11 +226,13 @@ public:
     ///
     /// @param server_port specifies port number to listen on
     /// @param client_port specifies port number to send to
+    /// @param run_multithreaded enables or disables multithreaded mode
     /// @param use_bcast configure sockets to support broadcast messages.
     /// @param direct_response_desired specifies if it is desired to
     /// use direct V4 traffic.
     Dhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT,
               uint16_t client_port = 0,
+              bool run_multithreaded = false,
               const bool use_bcast = true,
               const bool direct_response_desired = true);
 
@@ -265,6 +270,17 @@ public:
     /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
     static std::string getVersion(bool extended);
 
+    /// @brief returns Kea DHCPv4 server thread count.
+    static uint32_t threadCount();
+
+    /// @brief returns Kea DHCPv4 server max thread queue size.
+    static uint32_t maxThreadQueueSize();
+
+    /// @brief returns Kea DHCPv4 server mutex.
+    std::mutex* serverLock() {
+        return mutex_.get();
+    }
+
     /// @brief Main server processing loop.
     ///
     /// Main server processing loop. Call the processing step routine
@@ -280,6 +296,24 @@ public:
     /// a response.
     void run_one();
 
+    /// @brief Process a single incoming DHCPv4 packet and sends the response.
+    ///
+    /// It verifies correctness of the passed packet, call per-type processXXX
+    /// methods, generates appropriate answer, sends the answer to the client.
+    ///
+    /// @param query A pointer to the packet to be processed.
+    /// @param rsp A pointer to the response
+    void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
+    /// @brief Process a single incoming DHCPv4 packet and sends the response.
+    ///
+    /// It verifies correctness of the passed packet, call per-type processXXX
+    /// methods, generates appropriate answer, sends the answer to the client.
+    ///
+    /// @param query A pointer to the packet to be processed.
+    /// @param rsp A pointer to the response
+    void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
     /// @brief Process a single incoming DHCPv4 packet.
     ///
     /// It verifies correctness of the passed packet, call per-type processXXX
@@ -302,8 +336,8 @@ public:
     ///
     /// @brief Get UDP port on which server should listen.
     ///
-    /// Typically, server listens on UDP port number 67. Other ports are used
-    /// for testing purposes only.
+    /// Typically, server listens on UDP port 67. Other ports are only
+    /// used for testing purposes.
     ///
     /// @return UDP port on which server should listen.
     uint16_t getServerPort() const {
@@ -994,6 +1028,20 @@ protected:
     /// @brief Controls access to the configuration backends.
     CBControlDHCPv4Ptr cb_control_;
 
+    /// @brief Packet processing thread pool
+    ThreadPool pkt_thread_pool_;
+
+    // Global mutex used to serialize packet thread pool's threads
+    // on the not thread safe code and allow threads to run
+    // simultaneously on the thread safe portions
+    // (e.g. CqlLeaseMgr class instance).
+    boost::scoped_ptr<std::mutex> mutex_;
+
+    // Specifies if the application will use a thread pool or will process
+    // received DHCP packets on the main thread.
+    // It is mandatory to be set on false when running the test cases.
+    std::atomic_bool run_multithreaded_;
+
 public:
     /// Class methods for DHCPv4-over-DHCPv6 handler
 
@@ -1034,7 +1082,7 @@ public:
     static int getHookIndexLease4Decline();
 };
 
-}; // namespace isc::dhcp
-}; // namespace isc
+}  // namespace dhcp
+}  // namespace isc
 
 #endif // DHCP4_SRV_H
index c76c82af9833a0b3b17c7dac0b1b0ab12f00e239..b0d9fca834bd96221ada55a97d8f44d34133d65f 100644 (file)
@@ -5,21 +5,29 @@
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 #include <config.h>
-#include <cc/data.h>
+
 #include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
 #include <config/command_mgr.h>
+#include <database/dbaccess_parser.h>
 #include <dhcp/libdhcp++.h>
-#include <dhcpsrv/cfgmgr.h>
-#include <dhcpsrv/cfg_db_access.h>
 #include <dhcp6/ctrl_dhcp6_srv.h>
-#include <dhcp6/dhcp6to4_ipc.h>
 #include <dhcp6/dhcp6_log.h>
+#include <dhcp6/dhcp6to4_ipc.h>
 #include <dhcp6/json_config_parser.h>
 #include <dhcp6/parser_context.h>
+#include <dhcpsrv/cfg_db_access.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
 #include <hooks/hooks_manager.h>
 #include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
+#include <util/threads/lock_guard.h>
+#include <util/threads/reverse_lock.h>
+
 #include <signal.h>
+
+#include <memory>
 #include <sstream>
 
 using namespace isc::config;
@@ -28,6 +36,7 @@ using namespace isc::dhcp;
 using namespace isc::data;
 using namespace isc::hooks;
 using namespace isc::stats;
+using namespace isc::util::thread;
 using namespace std;
 
 namespace {
@@ -95,8 +104,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
     // configuration from a JSON file.
 
     isc::data::ConstElementPtr json;
-    isc::data::ConstElementPtr dhcp6;
-    isc::data::ConstElementPtr logger;
     isc::data::ConstElementPtr result;
 
     // Basic sanity check: file name must not be empty.
@@ -138,8 +145,7 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
         // Now check is the returned result is successful (rcode=0) or not
         // (see @ref isc::config::parseAnswer).
         int rcode;
-        isc::data::ConstElementPtr comment =
-            isc::config::parseAnswer(rcode, result);
+        ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
         if (rcode != 0) {
             string reason = comment ? comment->stringValue() :
                 "no details available";
@@ -159,11 +165,15 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
     return (result);
 }
 
-
 void
 ControlledDhcpv6Srv::init(const std::string& file_name) {
     // Configure the server using JSON file.
-    ConstElementPtr result = loadConfigFile(file_name);
+    ConstElementPtr result;
+    {
+        LockGuard<mutex> lock(serverLock());
+        result = loadConfigFile(file_name);
+    }
+
     int rcode;
     ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
     if (rcode != 0) {
@@ -188,17 +198,16 @@ void ControlledDhcpv6Srv::cleanup() {
     // Nothing to do here. No need to disconnect from anything.
 }
 
-
 ConstElementPtr
 ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) {
-    if (ControlledDhcpv6Srv::server_) {
-        ControlledDhcpv6Srv::server_->shutdown();
+    if (ControlledDhcpv6Srv::getInstance()) {
+        ControlledDhcpv6Srv::getInstance()->shutdown();
     } else {
         LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING);
-        ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure.");
+        ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure.");
         return (answer);
     }
-    ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down.");
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down.");
     return (answer);
 }
 
@@ -210,11 +219,11 @@ ControlledDhcpv6Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
     bool status = HooksManager::loadLibraries(loaded);
     if (!status) {
         LOG_ERROR(dhcp6_logger, DHCP6_HOOKS_LIBS_RELOAD_FAIL);
-        ConstElementPtr answer = isc::config::createAnswer(1,
+        ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR,
                                  "Failed to reload hooks libraries.");
         return (answer);
     }
-    ConstElementPtr answer = isc::config::createAnswer(0,
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
                              "Hooks libraries successfully reloaded.");
     return (answer);
 }
@@ -234,7 +243,7 @@ ControlledDhcpv6Srv::commandConfigReloadHandler(const string&,
         LOG_ERROR(dhcp6_logger, DHCP6_DYNAMIC_RECONFIGURATION_FAIL)
             .arg(file);
         return (createAnswer(CONTROL_RESULT_ERROR,
-                             "Config reload failed:" + string(ex.what())));
+                             "Config reload failed: " + string(ex.what())));
     }
 }
 
@@ -469,7 +478,7 @@ ControlledDhcpv6Srv::commandVersionGetHandler(const string&, ConstElementPtr) {
     ElementPtr extended = Element::create(Dhcpv6Srv::getVersion(true));
     ElementPtr arguments = Element::createMap();
     arguments->set("extended", extended);
-    ConstElementPtr answer = isc::config::createAnswer(0,
+    ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
                                 Dhcpv6Srv::getVersion(false),
                                 arguments);
     return (answer);
@@ -478,14 +487,14 @@ ControlledDhcpv6Srv::commandVersionGetHandler(const string&, ConstElementPtr) {
 ConstElementPtr
 ControlledDhcpv6Srv::commandBuildReportHandler(const string&, ConstElementPtr) {
     ConstElementPtr answer =
-        isc::config::createAnswer(0, isc::detail::getConfigReport());
+        isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport());
     return (answer);
 }
 
 ConstElementPtr
 ControlledDhcpv6Srv::commandLeasesReclaimHandler(const string&,
                                                  ConstElementPtr args) {
-    int status_code = 1;
+    int status_code = CONTROL_RESULT_ERROR;
     string message;
 
     // args must be { "remove": <bool> }
@@ -508,9 +517,9 @@ ControlledDhcpv6Srv::commandLeasesReclaimHandler(const string&,
     return (answer);
 }
 
-isc::data::ConstElementPtr
-ControlledDhcpv6Srv::processCommand(const std::string& command,
-                                    isc::data::ConstElementPtr args) {
+ConstElementPtr
+ControlledDhcpv6Srv::processCommand(const string& command,
+                                    ConstElementPtr args) {
     string txt = args ? args->str() : "(none)";
 
     LOG_DEBUG(dhcp6_logger, DBG_DHCP6_COMMAND, DHCP6_COMMAND_RECEIVED)
@@ -519,12 +528,20 @@ ControlledDhcpv6Srv::processCommand(const std::string& command,
     ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
 
     if (!srv) {
-        ConstElementPtr no_srv = isc::config::createAnswer(1,
+        ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR,
           "Server object not initialized, can't process command '" +
           command + "', arguments: '" + txt + "'.");
         return (no_srv);
     }
 
+    if (srv->run_multithreaded_) {
+        {
+            ReverseLock<std::mutex> rlk(srv->serverLock());
+            srv->pkt_thread_pool_.destroy();
+        }
+        srv->pkt_thread_pool_.create(Dhcpv6Srv::threadCount());
+    }
+
     try {
         if (command == "shutdown") {
             return (srv->commandShutdownHandler(command, args));
@@ -563,13 +580,12 @@ ControlledDhcpv6Srv::processCommand(const std::string& command,
             return (srv->commandConfigWriteHandler(command, args));
 
         }
-
-        return (isc::config::createAnswer(1, "Unrecognized command:"
-                                          + command));
-
+        return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " +
+                                         command));
     } catch (const Exception& ex) {
-        return (isc::config::createAnswer(1, "Error while processing command '"
-                                          + command + "':" + ex.what()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" +
+                                          command + "': " + ex.what() +
+                                          ", params: '" + txt + "'"));
     }
 }
 
@@ -581,11 +597,12 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
 
     ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
 
+    // Single stream instance used in all error clauses
+    std::ostringstream err;
+
     if (!srv) {
-        ConstElementPtr no_srv = isc::config::createAnswer(
-            CONTROL_RESULT_ERROR,
-            "Server object not initialized, can't process config.");
-        return (no_srv);
+        err << "Server object not initialized, can't process config.";
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     ConstElementPtr answer = configureDhcp6Server(*srv, config);
@@ -599,8 +616,8 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
             return (answer);
         }
     } catch (const std::exception& ex) {
-        return (isc::config::createAnswer(1, "Failed to process configuration:"
-                                          + string(ex.what())));
+        err << "Failed to process configuration: " << ex.what();
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Re-open lease and host database with new parameters.
@@ -611,14 +628,13 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
         cfg_db->setAppendedParameters("universe=6");
         cfg_db->createManagers();
     } catch (const std::exception& ex) {
-        return (isc::config::createAnswer(1, "Unable to open database: "
-                                          + std::string(ex.what())));
+        err << "Unable to open database: " << ex.what();
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Regenerate server identifier if needed.
     try {
-        const std::string duid_file =
-            std::string(CfgMgr::instance().getDataDir()) + "/" +
+        const std::string duid_file = CfgMgr::instance().getDataDir() + "/" +
             std::string(SERVER_DUID_FILE);
         DuidPtr duid = CfgMgr::instance().getStagingCfg()->getCfgDUID()->create(duid_file);
         server_->serverid_.reset(new Option(Option::V6, D6O_SERVERID, duid->getDuid()));
@@ -631,17 +647,16 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
     } catch (const std::exception& ex) {
         std::ostringstream err;
         err << "unable to configure server identifier: " << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Server will start DDNS communications if its enabled.
     try {
         srv->startD2();
     } catch (const std::exception& ex) {
-        std::ostringstream err;
-        err << "error starting DHCP_DDNS client "
-                " after server reconfiguration: " << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        err << "Error starting DHCP_DDNS client after server reconfiguration: "
+            << ex.what();
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Setup DHCPv4-over-DHCPv6 IPC
@@ -651,7 +666,7 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
         std::ostringstream err;
         err << "error starting DHCPv4-over-DHCPv6 IPC "
                " after server reconfiguration: " << ex.what();
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Configure DHCP packet queueing
@@ -664,7 +679,6 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
         }
 
     } catch (const std::exception& ex) {
-        std::ostringstream err;
         err << "Error setting packet queue controls after server reconfiguration: "
             << ex.what();
         return (isc::config::createAnswer(1, err.str()));
@@ -678,7 +692,7 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
     // is no need to rollback configuration if socket fails to open on any
     // of the interfaces.
     CfgMgr::instance().getStagingCfg()->getCfgIface()->
-        openSockets(AF_INET6, srv->getServerPort());
+        openSockets(AF_INET6, srv->getServerPort(), srv->serverLock());
 
     // Install the timers for handling leases reclamation.
     try {
@@ -688,11 +702,10 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
                         server_);
 
     } catch (const std::exception& ex) {
-        std::ostringstream err;
         err << "unable to setup timers for periodically running the"
             " reclamation of the expired leases: "
             << ex.what() << ".";
-        return (isc::config::createAnswer(1, err.str()));
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     // Setup config backend polling, if configured for it.
@@ -758,24 +771,27 @@ ControlledDhcpv6Srv::checkConfig(isc::data::ConstElementPtr config) {
 
     ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
 
+    // Single stream instance used in all error clauses
+    std::ostringstream err;
+
     if (!srv) {
-        ConstElementPtr no_srv = isc::config::createAnswer(1,
-            "Server object not initialized, can't process config.");
-        return (no_srv);
+        err << "Server object not initialized, can't process config.";
+        return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
     }
 
     return (configureDhcp6Server(*srv, config, true));
 }
 
 ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port,
-                                         uint16_t client_port)
-    : Dhcpv6Srv(server_port, client_port), io_service_(),
+                                         uint16_t client_port,
+                                         bool run_multithreaded /*= false*/)
+    : Dhcpv6Srv(server_port, client_port, run_multithreaded), io_service_(),
       timer_mgr_(TimerMgr::instance()) {
-    if (server_) {
+    if (getInstance()) {
         isc_throw(InvalidOperation,
                   "There is another Dhcpv6Srv instance already.");
     }
-    server_ = this; // remember this instance for use in callback
+    server_ = this; // remember this instance for later use in handlers
 
     // TimerMgr uses IO service to run asynchronous timers.
     TimerMgr::instance()->setIOService(getIOService());
@@ -794,26 +810,26 @@ ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port,
     CommandMgr::instance().registerCommand("config-reload",
         boost::bind(&ControlledDhcpv6Srv::commandConfigReloadHandler, this, _1, _2));
 
+    CommandMgr::instance().registerCommand("config-set",
+        boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2));
+
     CommandMgr::instance().registerCommand("config-test",
         boost::bind(&ControlledDhcpv6Srv::commandConfigTestHandler, this, _1, _2));
 
     CommandMgr::instance().registerCommand("config-write",
         boost::bind(&ControlledDhcpv6Srv::commandConfigWriteHandler, this, _1, _2));
 
-    CommandMgr::instance().registerCommand("dhcp-disable",
-        boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2));
-
     CommandMgr::instance().registerCommand("dhcp-enable",
         boost::bind(&ControlledDhcpv6Srv::commandDhcpEnableHandler, this, _1, _2));
 
-    CommandMgr::instance().registerCommand("leases-reclaim",
-        boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2));
+    CommandMgr::instance().registerCommand("dhcp-disable",
+        boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2));
 
     CommandMgr::instance().registerCommand("libreload",
         boost::bind(&ControlledDhcpv6Srv::commandLibReloadHandler, this, _1, _2));
 
-    CommandMgr::instance().registerCommand("config-set",
-        boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2));
+    CommandMgr::instance().registerCommand("leases-reclaim",
+        boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2));
 
     CommandMgr::instance().registerCommand("shutdown",
         boost::bind(&ControlledDhcpv6Srv::commandShutdownHandler, this, _1, _2));
@@ -892,8 +908,8 @@ ControlledDhcpv6Srv::~ControlledDhcpv6Srv() {
 void ControlledDhcpv6Srv::sessionReader(void) {
     // Process one asio event. If there are more events, iface_mgr will call
     // this callback more than once.
-    if (server_) {
-        server_->io_service_.run_one();
+    if (getInstance()) {
+        getInstance()->io_service_.run_one();
     }
 }
 
@@ -932,12 +948,13 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) {
     if (reopened) {
         // Cancel the timer.
         if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
-            TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); }
+            TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer");
+        }
 
         // Set network state to service enabled
         network_state_->enableService();
 
-        // Toss the reconnct control, we're done with it
+        // Toss the reconnect control, we're done with it
         db_reconnect_ctl.reset();
     } else {
         if (!db_reconnect_ctl->checkRetries()) {
@@ -955,7 +972,7 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) {
         if (!TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
             TimerMgr::instance()->registerTimer("Dhcp6DbReconnectTimer",
                             boost::bind(&ControlledDhcpv6Srv::dbReconnect, this,
-                            db_reconnect_ctl),
+                                        db_reconnect_ctl),
                             db_reconnect_ctl->retryInterval(),
                             asiolink::IntervalTimer::ONE_SHOT);
         }
@@ -1019,5 +1036,5 @@ ControlledDhcpv6Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg,
     }
 }
 
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+}  // namespace dhcp
+}  // namespace isc
index ef776c83a8d2fcd9eac0025059cd017c6e33ff40..7b4f99884ea0d140ac60a0bb9d78adbc72ca19ca 100644 (file)
@@ -29,8 +29,10 @@ public:
     ///
     /// @param server_port UDP port to be opened for DHCP traffic
     /// @param client_port UDP port where all responses are sent to.
+    /// @param run_multithreaded enables or disables multithreaded mode
     ControlledDhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
-                        uint16_t client_port = 0);
+                        uint16_t client_port = 0,
+                        bool run_multithreaded = false);
 
     /// @brief Destructor.
     virtual ~ControlledDhcpv6Srv();
@@ -65,7 +67,7 @@ public:
     /// @brief Initiates shutdown procedure for the whole DHCPv6 server.
     void shutdown();
 
-    /// @brief command processor
+    /// @brief Command processor
     ///
     /// This method is uniform for all config backends. It processes received
     /// command (as a string + JSON arguments). Internally, it's just a
@@ -75,9 +77,9 @@ public:
     /// Currently supported commands are:
     /// - config-reload
     /// - config-test
-    /// - leases-reclaim
-    /// - libreload
     /// - shutdown
+    /// - libreload
+    /// - leases-reclaim
     /// ...
     ///
     /// @note It never throws.
@@ -89,7 +91,7 @@ public:
     static isc::data::ConstElementPtr
     processCommand(const std::string& command, isc::data::ConstElementPtr args);
 
-    /// @brief configuration processor
+    /// @brief Configuration processor
     ///
     /// This is a method for handling incoming configuration updates.
     /// This method should be called by all configuration backends when the
@@ -114,7 +116,7 @@ public:
     isc::data::ConstElementPtr
     checkConfig(isc::data::ConstElementPtr new_config);
 
-    /// @brief returns pointer to the sole instance of Dhcpv6Srv
+    /// @brief Returns pointer to the sole instance of Dhcpv6Srv
     ///
     /// @return server instance (may return NULL, if called before server is spawned)
     static ControlledDhcpv6Srv* getInstance() {
@@ -131,7 +133,7 @@ private:
     /// (that was sent from some yet unspecified sender).
     static void sessionReader(void);
 
-    /// @brief handler for processing 'shutdown' command
+    /// @brief Handler for processing 'shutdown' command
     ///
     /// This handler processes shutdown command, which initializes shutdown
     /// procedure.
@@ -143,7 +145,7 @@ private:
     commandShutdownHandler(const std::string& command,
                            isc::data::ConstElementPtr args);
 
-    /// @brief handler for processing 'libreload' command
+    /// @brief Handler for processing 'libreload' command
     ///
     /// This handler processes libreload command, which unloads all hook
     /// libraries and reloads them.
@@ -156,7 +158,7 @@ private:
     commandLibReloadHandler(const std::string& command,
                             isc::data::ConstElementPtr args);
 
-    /// @brief handler for processing 'config-reload' command
+    /// @brief Handler for processing 'config-reload' command
     ///
     /// This handler processes config-reload command, which processes
     /// configuration specified in args parameter.
@@ -309,7 +311,6 @@ private:
                               const bool remove_lease,
                               const uint16_t max_unwarned_cycles);
 
-
     /// @brief Deletes reclaimed leases and reschedules the timer.
     ///
     /// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6.
@@ -334,6 +335,7 @@ private:
     ///
     /// If the maximum number of retries has been exhausted an error is logged
     /// and the server shuts down.
+    ///
     /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
     /// configured reconnect parameters
     ///
@@ -355,6 +357,8 @@ private:
     ///
     /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
     /// configured reconnect parameters
+    ///
+    /// @return false if reconnect is not configured, true otherwise
     bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl);
 
     /// @brief Callback invoked periodically to fetch configuration updates
@@ -387,7 +391,7 @@ private:
     TimerMgrPtr timer_mgr_;
 };
 
-}; // namespace isc::dhcp
-}; // namespace isc
+}  // namespace dhcp
+}  // namespace isc
 
 #endif
index fba6cfbbe51604b2c0d609a98103d9682e971e84..f3c0e45d8a5b86b69581c295925e11699d2cbd3b 100644 (file)
 #include <hooks/hooks_log.h>
 #include <hooks/hooks_manager.h>
 #include <stats/stats_mgr.h>
-
 #include <util/encode/hex.h>
 #include <util/io_utilities.h>
 #include <util/pointer_util.h>
 #include <util/range_utilities.h>
+#include <util/threads/lock_guard.h>
 #include <log/logger.h>
 #include <cryptolink/cryptolink.h>
 #include <cfgrpt/config_report.h>
@@ -90,6 +90,8 @@ using namespace isc::stats;
 using namespace isc::util;
 using namespace std;
 
+using isc::util::thread::LockGuard;
+
 namespace {
 
 /// Structure that holds registered hook indexes
@@ -172,19 +174,23 @@ createStatusCode(const Pkt6& pkt, const Option6IA& ia, const uint16_t status_cod
     return (option_status);
 }
 
-}; // anonymous namespace
+}  // namespace
 
 namespace isc {
 namespace dhcp {
 
 const std::string Dhcpv6Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
 
-Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port)
+Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port, bool run_multithreaded /* = false */)
     : io_service_(new IOService()), server_port_(server_port),
       client_port_(client_port), serverid_(), shutdown_(true),
       alloc_engine_(), name_change_reqs_(),
       network_state_(new NetworkState(NetworkState::DHCPv6)),
-      cb_control_(new CBControlDHCPv6()) {
+      cb_control_(new CBControlDHCPv6()),
+      run_multithreaded_(run_multithreaded) {
+
+    mutex_.reset(new std::mutex());
+
     LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_OPEN_SOCKET)
         .arg(server_port);
 
@@ -237,7 +243,7 @@ Dhcpv6Srv::~Dhcpv6Srv() {
         // LOG_ERROR(dhcp6_logger, DHCP6_SRV_DHCP4O6_ERROR).arg(ex.what());
     }
 
-    IfaceMgr::instance().closeSockets();
+    IfaceMgr::instance().closeSockets(serverLock());
 
     LeaseMgrFactory::destroy();
 
@@ -251,7 +257,7 @@ void Dhcpv6Srv::shutdown() {
 }
 
 Pkt6Ptr Dhcpv6Srv::receivePacket(int timeout) {
-    return (IfaceMgr::instance().receive6(timeout));
+    return (IfaceMgr::instance().receive6(timeout, 0, serverLock()));
 }
 
 void Dhcpv6Srv::sendPacket(const Pkt6Ptr& packet) {
@@ -405,10 +411,21 @@ Dhcpv6Srv::initContext(const Pkt6Ptr& pkt,
 }
 
 bool Dhcpv6Srv::run() {
+    if (run_multithreaded_) {
+        // Creating the process packet thread pool
+        // The number of thread pool's threads should be read from configuration
+        // file or it should be determined by the number of hardware threads and
+        // the number of Cassandra DB nodes.
+        pkt_thread_pool_.create(Dhcpv6Srv::threadCount());
+    }
+
     while (!shutdown_) {
         try {
             run_one();
-            getIOService()->poll();
+            {
+                LockGuard<mutex> lock(serverLock());
+                getIOService()->poll();
+            }
         } catch (const std::exception& e) {
             // General catch-all standard exceptions that are not caught by more
             // specific catches.
@@ -421,6 +438,11 @@ bool Dhcpv6Srv::run() {
         }
     }
 
+    // destroying the thread pool
+    if (run_multithreaded_) {
+        pkt_thread_pool_.destroy();
+    }
+
     return (true);
 }
 
@@ -430,11 +452,27 @@ void Dhcpv6Srv::run_one() {
     Pkt6Ptr rsp;
 
     try {
+
+        // Do not read more packets from socket if there are enough
+        // packets to be processed in the packet thread pool queue
+        const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
+        const auto queue_full_wait = std::chrono::milliseconds(1);
+        size_t pkt_queue_size = pkt_thread_pool_.count();
+        if (pkt_queue_size >= Dhcpv6Srv::threadCount() *
+            max_queued_pkt_per_thread) {
+            std::this_thread::sleep_for(queue_full_wait);
+            return;
+        }
+
         // Set select() timeout to 1s. This value should not be modified
         // because it is important that the select() returns control
         // frequently so as the IOService can be polled for ready handlers.
         uint32_t timeout = 1;
-        query = receivePacket(timeout);
+        {
+            // LOG_DEBUG(packet6_logger, DBG_DHCP6_DETAIL, DHCP6_BUFFER_WAIT).arg(timeout);
+            LockGuard<mutex> lock(serverLock());
+            query = receivePacket(timeout);
+        }
 
         // Log if packet has arrived. We can't log the detailed information
         // about the DHCP message because it hasn't been unpacked/parsed
@@ -487,6 +525,7 @@ void Dhcpv6Srv::run_one() {
     // process could wait up to the duration of timeout of select() to
     // terminate.
     try {
+        LockGuard<mutex> lock(serverLock());
         handleSignal();
     } catch (const std::exception& e) {
         // An (a standard or ISC) exception occurred.
@@ -507,9 +546,32 @@ void Dhcpv6Srv::run_one() {
             .arg(query->getLabel());
         return;
     } else {
-        processPacket(query, rsp);
+    if (run_multithreaded_) {
+        ThreadPool::WorkItemCallBack call_back =
+            std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow, this, query, rsp);
+        pkt_thread_pool_.add(call_back);
+    } else {
+        processPacketAndSendResponse(query, rsp);
+    }
+    }
+}
+
+void
+Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+    try {
+        processPacketAndSendResponse(query, rsp);
+    } catch (const std::exception& e) {
+        LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
+            .arg(e.what());
+    } catch (...) {
+        LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
     }
+}
 
+void
+Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+    LockGuard<mutex> lock(serverLock());
+    processPacket(query, rsp);
     if (!rsp) {
         return;
     }
@@ -1289,7 +1351,7 @@ Dhcpv6Srv::sanityCheck(const Pkt6Ptr& pkt) {
         switch (pkt->getType()) {
         case DHCPV6_SOLICIT:
         case DHCPV6_REBIND:
-    case DHCPV6_CONFIRM:
+        case DHCPV6_CONFIRM:
             sanityCheck(pkt, MANDATORY, FORBIDDEN);
             return (true);
 
@@ -1511,6 +1573,7 @@ Dhcpv6Srv::assignLeases(const Pkt6Ptr& question, Pkt6Ptr& answer,
             if (answer_opt) {
                 answer->addOption(answer_opt);
             }
+            break;
         }
         default:
             break;
@@ -2242,7 +2305,6 @@ Dhcpv6Srv::extendIA_PD(const Pkt6Ptr& query,
                                (*l)->preferred_lft_, (*l)->valid_lft_));
         ia_rsp->addOption(prf);
 
-
         if (pd_exclude_requested) {
             // PD exclude option has been requested via ORO, thus we need to
             // include it if the pool configuration specifies this option.
@@ -2257,7 +2319,6 @@ Dhcpv6Srv::extendIA_PD(const Pkt6Ptr& query,
             }
         }
 
-
         LOG_INFO(lease6_logger, DHCP6_PD_LEASE_RENEW)
             .arg(query->getLabel())
             .arg((*l)->addr_.toText())
@@ -3862,6 +3923,23 @@ Dhcpv6Srv::requestedInORO(const Pkt6Ptr& query, const uint16_t code) const {
     return (false);
 }
 
+uint32_t Dhcpv6Srv::threadCount() {
+    uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+    if (sys_threads) {
+        return sys_threads;
+    }
+    sys_threads = std::thread::hardware_concurrency();
+    return sys_threads * 4;
+}
+
+uint32_t Dhcpv6Srv::maxThreadQueueSize() {
+    uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+    if (max_thread_queue_size) {
+        return max_thread_queue_size;
+    }
+    return 4;
+}
+
 void Dhcpv6Srv::discardPackets() {
     // Dump all of our current packets, anything that is mid-stream
     isc::dhcp::Pkt6Ptr pkt6ptr_empty;
@@ -3869,5 +3947,5 @@ void Dhcpv6Srv::discardPackets() {
     HooksManager::clearParkingLots();
 }
 
-};
-};
+}  // namespace dhcp
+}  // namespace isc
index 1e0670cc6ec2390df59894e3de0c47d44f514b51..0484538cc765b6e6553d78c6109fc905b15667fd 100644 (file)
@@ -16,6 +16,7 @@
 #include <dhcp/option6_ia.h>
 #include <dhcp/option_definition.h>
 #include <dhcp/pkt6.h>
+#include <dhcpsrv/thread_pool.h>
 #include <dhcpsrv/alloc_engine.h>
 #include <dhcpsrv/callout_handle_store.h>
 #include <dhcpsrv/cb_ctl_dhcp6.h>
@@ -29,6 +30,8 @@
 #include <functional>
 #include <iostream>
 #include <queue>
+#include <boost/scoped_ptr.hpp>
+#include <atomic>
 
 // Undefine the macro OPTIONAL which is defined in some operating
 // systems but conflicts with a member of the RequirementLevel enum in
@@ -51,7 +54,7 @@ public:
 
 /// @brief DHCPv6 server service.
 ///
-/// This class represents DHCPv6 server. It contains all
+/// This singleton class represents DHCPv6 server. It contains all
 /// top-level methods and routines necessary for server operation.
 /// In particular, it instantiates IfaceMgr, loads or generates DUID
 /// that is going to be used as server-identifier, receives incoming
@@ -83,8 +86,10 @@ public:
     ///
     /// @param server_port port on which all sockets will listen
     /// @param client_port port to which all responses will be sent
+    /// @param run_multithreaded enables or disables multithreaded mode
     Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
-              uint16_t client_port = 0);
+              uint16_t client_port = 0,
+              bool run_multithreaded = false);
 
     /// @brief Destructor. Used during DHCPv6 service shutdown.
     virtual ~Dhcpv6Srv();
@@ -120,6 +125,17 @@ public:
     /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
     static std::string getVersion(bool extended);
 
+    /// @brief returns Kea DHCPv6 server thread count.
+    static uint32_t threadCount();
+
+    /// @brief returns Kea DHCPv6 server max thread queue size.
+    static uint32_t maxThreadQueueSize();
+
+    /// @brief returns Kea DHCPv6 server mutex.
+    std::mutex* serverLock() {
+        return mutex_.get();
+    }
+
     /// @brief Returns server-identifier option.
     ///
     /// @return server-id option
@@ -1018,9 +1034,23 @@ protected:
 
     /// @brief Controls access to the configuration backends.
     CBControlDHCPv6Ptr cb_control_;
+
+    /// @brief Packet processing thread pool
+    ThreadPool pkt_thread_pool_;
+
+    // Global mutex used to serialize packet thread pool's threads
+    // on the not thread safe code and allow threads to run
+    // simultaneously on the thread safe portions
+    // (e.g. CqlLeaseMgr class instance).
+    boost::scoped_ptr<std::mutex> mutex_;
+
+    // Specifies if the application will use a thread pool or will process
+    // received DHCP packets on the main thread.
+    // It is mandatory to be set on false when running the test cases.
+    std::atomic_bool run_multithreaded_;
 };
 
-}; // namespace isc::dhcp
-}; // namespace isc
+}  // namespace dhcp
+}  // namespace isc
 
 #endif // DHCP6_SRV_H
index 2c590feeab4ed181fb83f9dad6dff38c38961c3f..491f6a4baa93a242031b0a3f6f9be4ad5658c3de 100644 (file)
@@ -115,6 +115,7 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr.cc lease_mgr.h
 libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
+libkea_dhcpsrv_la_SOURCES += thread_pool.cc thread_pool.h
 
 if HAVE_MYSQL
 libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h