From: Razvan Becheriu Date: Mon, 25 Mar 2019 14:34:27 +0000 (+0200) Subject: added thread pool X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6005c5d51273faafac051d8510a6316e1ddb9b7f;p=thirdparty%2Fkea.git added thread pool --- diff --git a/src/bin/dhcp4/ctrl_dhcp4_srv.cc b/src/bin/dhcp4/ctrl_dhcp4_srv.cc index 0595c1ad01..868bdace55 100644 --- a/src/bin/dhcp4/ctrl_dhcp4_srv.cc +++ b/src/bin/dhcp4/ctrl_dhcp4_srv.cc @@ -5,29 +5,38 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include -#include + #include +#include +#include #include +#include +#include #include #include #include -#include #include -#include +#include #include -#include +#include +#include #include #include -#include +#include +#include + #include + +#include #include -using namespace isc::data; +using namespace isc::config; using namespace isc::db; using namespace isc::dhcp; +using namespace isc::data; using namespace isc::hooks; -using namespace isc::config; using namespace isc::stats; +using namespace isc::util::thread; using namespace std; namespace { @@ -76,34 +85,6 @@ namespace dhcp { ControlledDhcpv4Srv* ControlledDhcpv4Srv::server_ = NULL; -void -ControlledDhcpv4Srv::init(const std::string& file_name) { - // Configure the server using JSON file. - ConstElementPtr result = loadConfigFile(file_name); - int rcode; - ConstElementPtr comment = isc::config::parseAnswer(rcode, result); - if (rcode != 0) { - string reason = comment ? comment->stringValue() : - "no details available"; - isc_throw(isc::BadValue, reason); - } - - // We don't need to call openActiveSockets() or startD2() as these - // methods are called in processConfig() which is called by - // processCommand("config-set", ...) - - // Set signal handlers. When the SIGHUP is received by the process - // the server reconfiguration will be triggered. When SIGTERM or - // SIGINT will be received, the server will start shutting down. - signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM)); - // Set the pointer to the handler function. - signal_handler_ = signalHandler; -} - -void ControlledDhcpv4Srv::cleanup() { - // Nothing to do here. No need to disconnect from anything. -} - /// @brief Configure DHCPv4 server using the configuration file specified. /// /// This function is used to both configure the DHCP server on its startup @@ -120,16 +101,14 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) { // configuration from a JSON file. isc::data::ConstElementPtr json; - isc::data::ConstElementPtr dhcp4; - isc::data::ConstElementPtr logger; isc::data::ConstElementPtr result; // Basic sanity check: file name must not be empty. try { if (file_name.empty()) { // Basic sanity check: file name must not be empty. - isc_throw(isc::BadValue, "JSON configuration file not specified." - " Please use -c command line option."); + isc_throw(isc::BadValue, "JSON configuration file not specified. Please " + "use -c command line option."); } // Read contents of the file and parse it as JSON @@ -183,6 +162,38 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) { return (result); } +void +ControlledDhcpv4Srv::init(const std::string& file_name) { + // Configure the server using JSON file. + ConstElementPtr result; + { + LockGuard lock(serverLock()); + result = loadConfigFile(file_name); + } + + int rcode; + ConstElementPtr comment = isc::config::parseAnswer(rcode, result); + if (rcode != 0) { + string reason = comment ? comment->stringValue() : + "no details available"; + isc_throw(isc::BadValue, reason); + } + + // We don't need to call openActiveSockets() or startD2() as these + // methods are called in processConfig() which is called by + // processCommand("config-set", ...) + + // Set signal handlers. When the SIGHUP is received by the process + // the server reconfiguration will be triggered. When SIGTERM or + // SIGINT will be received, the server will start shutting down. + signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM)); + // Set the pointer to the handler function. + signal_handler_ = signalHandler; +} + +void ControlledDhcpv4Srv::cleanup() { + // Nothing to do here. No need to disconnect from anything. +} ConstElementPtr ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) { @@ -190,28 +201,26 @@ ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) { ControlledDhcpv4Srv::getInstance()->shutdown(); } else { LOG_WARN(dhcp4_logger, DHCP4_NOT_RUNNING); - ConstElementPtr answer = isc::config::createAnswer(1, - "Shutdown failure."); + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure."); return (answer); } - ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down."); + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down."); return (answer); } ConstElementPtr ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) { - /// @todo delete any stored CalloutHandles referring to the old libraries /// Get list of currently loaded libraries and reload them. HookLibsCollection loaded = HooksManager::getLibraryInfo(); bool status = HooksManager::loadLibraries(loaded); if (!status) { LOG_ERROR(dhcp4_logger, DHCP4_HOOKS_LIBS_RELOAD_FAIL); - ConstElementPtr answer = isc::config::createAnswer(1, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Failed to reload hooks libraries."); return (answer); } - ConstElementPtr answer = isc::config::createAnswer(0, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Hooks libraries successfully reloaded."); return (answer); } @@ -219,7 +228,6 @@ ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) { ConstElementPtr ControlledDhcpv4Srv::commandConfigReloadHandler(const string&, ConstElementPtr /*args*/) { - // Get configuration file name. std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile(); try { @@ -232,7 +240,7 @@ ControlledDhcpv4Srv::commandConfigReloadHandler(const string&, LOG_ERROR(dhcp4_logger, DHCP4_DYNAMIC_RECONFIGURATION_FAIL) .arg(file); return (createAnswer(CONTROL_RESULT_ERROR, - "Config reload failed:" + string(ex.what()))); + "Config reload failed: " + string(ex.what()))); } } @@ -245,8 +253,7 @@ ControlledDhcpv4Srv::commandConfigGetHandler(const string&, } ConstElementPtr -ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, - ConstElementPtr args) { +ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, ConstElementPtr args) { string filename; if (args) { @@ -265,6 +272,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, if (filename.empty()) { // filename parameter was not specified, so let's use whatever we remember + // from the command-line filename = getConfigFile(); } @@ -299,7 +307,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, ConstElementPtr ControlledDhcpv4Srv::commandConfigSetHandler(const string&, ConstElementPtr args) { - const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error + const int status_code = CONTROL_RESULT_ERROR; ConstElementPtr dhcp4; string message; @@ -351,7 +359,7 @@ ControlledDhcpv4Srv::commandConfigSetHandler(const string&, // the logging first in case there's a configuration failure. int rcode = 0; isc::config::parseAnswer(rcode, result); - if (rcode == 0) { + if (rcode == CONTROL_RESULT_SUCCESS) { CfgMgr::instance().getStagingCfg()->applyLoggingCfg(); // Use new configuration. @@ -467,17 +475,16 @@ ControlledDhcpv4Srv::commandVersionGetHandler(const string&, ConstElementPtr) { ElementPtr extended = Element::create(Dhcpv4Srv::getVersion(true)); ElementPtr arguments = Element::createMap(); arguments->set("extended", extended); - ConstElementPtr answer = isc::config::createAnswer(0, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, Dhcpv4Srv::getVersion(false), arguments); return (answer); } ConstElementPtr -ControlledDhcpv4Srv::commandBuildReportHandler(const string&, - ConstElementPtr) { +ControlledDhcpv4Srv::commandBuildReportHandler(const string&, ConstElementPtr) { ConstElementPtr answer = - isc::config::createAnswer(0, isc::detail::getConfigReport()); + isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport()); return (answer); } @@ -518,12 +525,20 @@ ControlledDhcpv4Srv::processCommand(const string& command, ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance(); if (!srv) { - ConstElementPtr no_srv = isc::config::createAnswer(1, - "Server object not initialized, so can't process command '" + + ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR, + "Server object not initialized, can't process command '" + command + "', arguments: '" + txt + "'."); return (no_srv); } + if (srv->run_multithreaded_) { + { + ReverseLock rlk(srv->serverLock()); + srv->pkt_thread_pool_.destroy(); + } + srv->pkt_thread_pool_.create(Dhcpv4Srv::threadCount()); + } + try { if (command == "shutdown") { return (srv->commandShutdownHandler(command, args)); @@ -562,12 +577,11 @@ ControlledDhcpv4Srv::processCommand(const string& command, return (srv->commandConfigWriteHandler(command, args)); } - ConstElementPtr answer = isc::config::createAnswer(1, - "Unrecognized command:" + command); - return (answer); + return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " + + command)); } catch (const Exception& ex) { - return (isc::config::createAnswer(1, "Error while processing command '" - + command + "':" + ex.what() + + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" + + command + "': " + ex.what() + ", params: '" + txt + "'")); } } @@ -585,7 +599,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { if (!srv) { err << "Server object not initialized, can't process config."; - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } ConstElementPtr answer = configureDhcp4Server(*srv, config); @@ -599,8 +613,8 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { return (answer); } } catch (const std::exception& ex) { - err << "Failed to process configuration:" << ex.what(); - return (isc::config::createAnswer(1, err.str())); + err << "Failed to process configuration: " << ex.what(); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Re-open lease and host database with new parameters. @@ -612,7 +626,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { cfg_db->createManagers(); } catch (const std::exception& ex) { err << "Unable to open database: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Server will start DDNS communications if its enabled. @@ -621,7 +635,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { } catch (const std::exception& ex) { err << "Error starting DHCP_DDNS client after server reconfiguration: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Setup DHCPv4-over-DHCPv6 IPC @@ -631,7 +645,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { std::ostringstream err; err << "error starting DHCPv4-over-DHCPv6 IPC " " after server reconfiguration: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Configure DHCP packet queueing @@ -657,7 +671,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { // is no need to rollback configuration if socket fails to open on any // of the interfaces. CfgMgr::instance().getStagingCfg()->getCfgIface()-> - openSockets(AF_INET, srv->getServerPort(), + openSockets(AF_INET, srv->getServerPort(), srv->serverLock(), getInstance()->useBroadcast()); // Install the timers for handling leases reclamation. @@ -671,7 +685,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { err << "unable to setup timers for periodically running the" " reclamation of the expired leases: " << ex.what() << "."; - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo(); @@ -728,7 +742,7 @@ isc::data::ConstElementPtr ControlledDhcpv4Srv::checkConfig(isc::data::ConstElementPtr config) { LOG_DEBUG(dhcp4_logger, DBG_DHCP4_COMMAND, DHCP4_CONFIG_RECEIVED) - .arg(config->str()); + .arg(config->str()); ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance(); @@ -737,15 +751,16 @@ ControlledDhcpv4Srv::checkConfig(isc::data::ConstElementPtr config) { if (!srv) { err << "Server object not initialized, can't process config."; - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } return (configureDhcp4Server(*srv, config, true)); } ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t server_port /*= DHCP4_SERVER_PORT*/, - uint16_t client_port /*= 0*/) - : Dhcpv4Srv(server_port, client_port), io_service_(), + uint16_t client_port /*= 0*/, + bool run_multithreaded /*= false*/) + : Dhcpv4Srv(server_port, client_port, run_multithreaded), io_service_(), timer_mgr_(TimerMgr::instance()) { if (getInstance()) { isc_throw(InvalidOperation, @@ -801,21 +816,20 @@ ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t server_port /*= DHCP4_SERVER_P CommandMgr::instance().registerCommand("statistic-get", boost::bind(&StatsMgr::statisticGetHandler, _1, _2)); - CommandMgr::instance().registerCommand("statistic-reset", - boost::bind(&StatsMgr::statisticResetHandler, _1, _2)); - - CommandMgr::instance().registerCommand("statistic-remove", - boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2)); - CommandMgr::instance().registerCommand("statistic-get-all", boost::bind(&StatsMgr::statisticGetAllHandler, _1, _2)); + CommandMgr::instance().registerCommand("statistic-reset", + boost::bind(&StatsMgr::statisticResetHandler, _1, _2)); + CommandMgr::instance().registerCommand("statistic-reset-all", boost::bind(&StatsMgr::statisticResetAllHandler, _1, _2)); + CommandMgr::instance().registerCommand("statistic-remove", + boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2)); + CommandMgr::instance().registerCommand("statistic-remove-all", boost::bind(&StatsMgr::statisticRemoveAllHandler, _1, _2)); - } void ControlledDhcpv4Srv::shutdown() { @@ -839,14 +853,14 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() { // Deregister any registered commands (please keep in alphabetic order) CommandMgr::instance().deregisterCommand("build-report"); CommandMgr::instance().deregisterCommand("config-get"); + CommandMgr::instance().deregisterCommand("config-set"); CommandMgr::instance().deregisterCommand("config-reload"); CommandMgr::instance().deregisterCommand("config-test"); CommandMgr::instance().deregisterCommand("config-write"); - CommandMgr::instance().deregisterCommand("leases-reclaim"); - CommandMgr::instance().deregisterCommand("libreload"); - CommandMgr::instance().deregisterCommand("config-set"); CommandMgr::instance().deregisterCommand("dhcp-disable"); CommandMgr::instance().deregisterCommand("dhcp-enable"); + CommandMgr::instance().deregisterCommand("leases-reclaim"); + CommandMgr::instance().deregisterCommand("libreload"); CommandMgr::instance().deregisterCommand("shutdown"); CommandMgr::instance().deregisterCommand("statistic-get"); CommandMgr::instance().deregisterCommand("statistic-get-all"); @@ -862,8 +876,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() { ; } - server_ = NULL; // forget this instance. Noone should call any handlers at - // this stage. + server_ = NULL; // forget this instance. There should be no callback anymore + // at this stage anyway. } void ControlledDhcpv4Srv::sessionReader(void) { @@ -997,5 +1011,5 @@ ControlledDhcpv4Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg, } } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp4/ctrl_dhcp4_srv.h b/src/bin/dhcp4/ctrl_dhcp4_srv.h index c3120af240..f05f72481c 100644 --- a/src/bin/dhcp4/ctrl_dhcp4_srv.h +++ b/src/bin/dhcp4/ctrl_dhcp4_srv.h @@ -29,11 +29,13 @@ public: /// /// @param server_port UDP port to be opened for DHCP traffic /// @param client_port UDP port where all responses are sent to. + /// @param run_multithreaded enables or disables multithreaded mode ControlledDhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT, - uint16_t client_port = 0); + uint16_t client_port = 0, + bool run_multithreaded = false); /// @brief Destructor. - ~ControlledDhcpv4Srv(); + virtual ~ControlledDhcpv4Srv(); /// @brief Initializes the server. /// @@ -43,12 +45,12 @@ public: /// This method may throw if initialization fails. void init(const std::string& config_file); - /// @brief Loads specific config file + /// @brief Loads specific configuration file /// /// This utility method is called whenever we know a filename of the config /// and need to load it. It calls config-set command once the content of /// the file has been loaded and verified to be a sane JSON configuration. - /// config-set handler will process the config file (load it as current + /// config-set handler will process the config file (apply it as current /// configuration). /// /// @param file_name name of the file to be loaded @@ -126,7 +128,7 @@ private: /// @brief Callback that will be called from iface_mgr when data /// is received over control socket. /// - /// This static callback method is called from IfaceMgr::receive6() method, + /// This static callback method is called from IfaceMgr::receive4() method, /// when there is a new command or configuration sent over control socket /// (that was sent from some yet unspecified sender). static void sessionReader(void); @@ -247,7 +249,6 @@ private: commandDhcpEnableHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @Brief handler for processing 'version-get' command /// /// This handler processes version-get command, which returns @@ -377,7 +378,7 @@ private: /// @brief Static pointer to the sole instance of the DHCP server. /// /// This is required for config and command handlers to gain access to - /// the server + /// the server. Some of them need to be static methods. static ControlledDhcpv4Srv* server_; /// @brief IOService object, used for all ASIO operations. @@ -390,7 +391,7 @@ private: TimerMgrPtr timer_mgr_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif diff --git a/src/bin/dhcp4/dhcp4_srv.cc b/src/bin/dhcp4/dhcp4_srv.cc index 9f37271225..d3c9197220 100644 --- a/src/bin/dhcp4/dhcp4_srv.cc +++ b/src/bin/dhcp4/dhcp4_srv.cc @@ -46,7 +46,7 @@ #include #include #include -#include +#include #include #include #include @@ -70,6 +70,7 @@ #include + using namespace isc; using namespace isc::asiolink; using namespace isc::cryptolink; @@ -80,6 +81,8 @@ using namespace isc::log; using namespace isc::stats; using namespace std; +using isc::util::thread::LockGuard; + namespace { /// Structure that holds registered hook indexes @@ -187,7 +190,7 @@ Dhcpv4Exchange::Dhcpv4Exchange(const AllocEnginePtr& alloc_engine, .arg(query_->getLabel()) .arg(classes.toText()); } -}; +} void Dhcpv4Exchange::initResponse() { @@ -443,15 +446,20 @@ Dhcpv4Exchange::setReservedMessageFields() { const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_"); Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port, + bool run_multithreaded /* = false */, const bool use_bcast, const bool direct_response_desired) : io_service_(new IOService()), shutdown_(true), alloc_engine_(), server_port_(server_port), use_bcast_(use_bcast), client_port_(client_port), network_state_(new NetworkState(NetworkState::DHCPv4)), - cb_control_(new CBControlDHCPv4()) { + cb_control_(new CBControlDHCPv4()), + run_multithreaded_(run_multithreaded) { + + mutex_.reset(new std::mutex()); LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET) .arg(server_port); + try { // Port 0 is used for testing purposes where we don't open broadcast // capable sockets. So, set the packet filter handling direct traffic @@ -734,10 +742,21 @@ Dhcpv4Srv::sendPacket(const Pkt4Ptr& packet) { bool Dhcpv4Srv::run() { + if (run_multithreaded_) { + // Creating the process packet thread pool + // The number of thread pool's threads should be read from configuration + // file or it should be determined by the number of hardware threads and + // the number of Cassandra DB nodes. + pkt_thread_pool_.create(Dhcpv4Srv::threadCount()); + } + while (!shutdown_) { try { run_one(); - getIOService()->poll(); + { + LockGuard lock(serverLock()); + getIOService()->poll(); + } } catch (const std::exception& e) { // General catch-all exception that are not caught by more specific // catches. This one is for exceptions derived from std::exception. @@ -751,6 +770,11 @@ Dhcpv4Srv::run() { } } + // destroying the thread pool + if (run_multithreaded_) { + pkt_thread_pool_.destroy(); + } + return (true); } @@ -761,11 +785,27 @@ Dhcpv4Srv::run_one() { Pkt4Ptr rsp; try { + + // Do not read more packets from socket if there are enough + // packets to be processed in the packet thread pool queue + const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize(); + const auto queue_full_wait = std::chrono::milliseconds(1); + size_t pkt_queue_size = pkt_thread_pool_.count(); + if (pkt_queue_size >= Dhcpv4Srv::threadCount() * + max_queued_pkt_per_thread) { + std::this_thread::sleep_for(queue_full_wait); + return; + } + // Set select() timeout to 1s. This value should not be modified // because it is important that the select() returns control // frequently so as the IOService can be polled for ready handlers. uint32_t timeout = 1; - query = receivePacket(timeout); + { + // LOG_DEBUG(packet4_logger, DBG_DHCP4_DETAIL, DHCP4_BUFFER_WAIT).arg(timeout); + LockGuard lock(serverLock()); + query = receivePacket(timeout); + } // Log if packet has arrived. We can't log the detailed information // about the DHCP message because it hasn't been unpacked/parsed @@ -812,6 +852,7 @@ Dhcpv4Srv::run_one() { // receivePacket the process could wait up to the duration of timeout // of select() to terminate. try { + LockGuard lock(serverLock()); handleSignal(); } catch (const std::exception& e) { // Standard exception occurred. Let's be on the safe side to @@ -834,9 +875,32 @@ Dhcpv4Srv::run_one() { .arg(query->getLabel()); return; } else { - processPacket(query, rsp); + if (run_multithreaded_) { + ThreadPool::WorkItemCallBack call_back = + std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow, this, query, rsp); + pkt_thread_pool_.add(call_back); + } else { + processPacketAndSendResponse(query, rsp); + } + } +} + +void +Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) { + try { + processPacketAndSendResponse(query, rsp); + } catch (const std::exception& e) { + LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION) + .arg(e.what()); + } catch (...) { + LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION); } +} +void +Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) { + LockGuard lock(serverLock()); + processPacket(query, rsp); if (!rsp) { return; } @@ -3682,6 +3746,23 @@ int Dhcpv4Srv::getHookIndexLease4Decline() { return (Hooks.hook_index_lease4_decline_); } +uint32_t Dhcpv4Srv::threadCount() { + uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount(); + if (sys_threads) { + return sys_threads; + } + sys_threads = std::thread::hardware_concurrency(); + return sys_threads * 4; +} + +uint32_t Dhcpv4Srv::maxThreadQueueSize() { + uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize(); + if (max_thread_queue_size) { + return max_thread_queue_size; + } + return 4; +} + void Dhcpv4Srv::discardPackets() { // Clear any packets held by the callhout handle store and // all parked packets @@ -3690,5 +3771,5 @@ void Dhcpv4Srv::discardPackets() { HooksManager::clearParkingLots(); } -} // namespace dhcp -} // namespace isc +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp4/dhcp4_srv.h b/src/bin/dhcp4/dhcp4_srv.h index 72680fff36..ff02b7781b 100644 --- a/src/bin/dhcp4/dhcp4_srv.h +++ b/src/bin/dhcp4/dhcp4_srv.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,8 @@ #include #include #include +#include +#include // Undefine the macro OPTIONAL which is defined in some operating // systems but conflicts with a member of the RequirementLevel enum in @@ -223,11 +226,13 @@ public: /// /// @param server_port specifies port number to listen on /// @param client_port specifies port number to send to + /// @param run_multithreaded enables or disables multithreaded mode /// @param use_bcast configure sockets to support broadcast messages. /// @param direct_response_desired specifies if it is desired to /// use direct V4 traffic. Dhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT, uint16_t client_port = 0, + bool run_multithreaded = false, const bool use_bcast = true, const bool direct_response_desired = true); @@ -265,6 +270,17 @@ public: /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion() static std::string getVersion(bool extended); + /// @brief returns Kea DHCPv4 server thread count. + static uint32_t threadCount(); + + /// @brief returns Kea DHCPv4 server max thread queue size. + static uint32_t maxThreadQueueSize(); + + /// @brief returns Kea DHCPv4 server mutex. + std::mutex* serverLock() { + return mutex_.get(); + } + /// @brief Main server processing loop. /// /// Main server processing loop. Call the processing step routine @@ -280,6 +296,24 @@ public: /// a response. void run_one(); + /// @brief Process a single incoming DHCPv4 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp); + + /// @brief Process a single incoming DHCPv4 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp); + /// @brief Process a single incoming DHCPv4 packet. /// /// It verifies correctness of the passed packet, call per-type processXXX @@ -302,8 +336,8 @@ public: /// /// @brief Get UDP port on which server should listen. /// - /// Typically, server listens on UDP port number 67. Other ports are used - /// for testing purposes only. + /// Typically, server listens on UDP port 67. Other ports are only + /// used for testing purposes. /// /// @return UDP port on which server should listen. uint16_t getServerPort() const { @@ -994,6 +1028,20 @@ protected: /// @brief Controls access to the configuration backends. CBControlDHCPv4Ptr cb_control_; + /// @brief Packet processing thread pool + ThreadPool pkt_thread_pool_; + + // Global mutex used to serialize packet thread pool's threads + // on the not thread safe code and allow threads to run + // simultaneously on the thread safe portions + // (e.g. CqlLeaseMgr class instance). + boost::scoped_ptr mutex_; + + // Specifies if the application will use a thread pool or will process + // received DHCP packets on the main thread. + // It is mandatory to be set on false when running the test cases. + std::atomic_bool run_multithreaded_; + public: /// Class methods for DHCPv4-over-DHCPv6 handler @@ -1034,7 +1082,7 @@ public: static int getHookIndexLease4Decline(); }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif // DHCP4_SRV_H diff --git a/src/bin/dhcp6/ctrl_dhcp6_srv.cc b/src/bin/dhcp6/ctrl_dhcp6_srv.cc index c76c82af98..b0d9fca834 100644 --- a/src/bin/dhcp6/ctrl_dhcp6_srv.cc +++ b/src/bin/dhcp6/ctrl_dhcp6_srv.cc @@ -5,21 +5,29 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include -#include + #include +#include +#include #include +#include #include -#include -#include #include -#include #include +#include #include #include +#include +#include +#include #include #include -#include +#include +#include + #include + +#include #include using namespace isc::config; @@ -28,6 +36,7 @@ using namespace isc::dhcp; using namespace isc::data; using namespace isc::hooks; using namespace isc::stats; +using namespace isc::util::thread; using namespace std; namespace { @@ -95,8 +104,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) { // configuration from a JSON file. isc::data::ConstElementPtr json; - isc::data::ConstElementPtr dhcp6; - isc::data::ConstElementPtr logger; isc::data::ConstElementPtr result; // Basic sanity check: file name must not be empty. @@ -138,8 +145,7 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) { // Now check is the returned result is successful (rcode=0) or not // (see @ref isc::config::parseAnswer). int rcode; - isc::data::ConstElementPtr comment = - isc::config::parseAnswer(rcode, result); + ConstElementPtr comment = isc::config::parseAnswer(rcode, result); if (rcode != 0) { string reason = comment ? comment->stringValue() : "no details available"; @@ -159,11 +165,15 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) { return (result); } - void ControlledDhcpv6Srv::init(const std::string& file_name) { // Configure the server using JSON file. - ConstElementPtr result = loadConfigFile(file_name); + ConstElementPtr result; + { + LockGuard lock(serverLock()); + result = loadConfigFile(file_name); + } + int rcode; ConstElementPtr comment = isc::config::parseAnswer(rcode, result); if (rcode != 0) { @@ -188,17 +198,16 @@ void ControlledDhcpv6Srv::cleanup() { // Nothing to do here. No need to disconnect from anything. } - ConstElementPtr ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) { - if (ControlledDhcpv6Srv::server_) { - ControlledDhcpv6Srv::server_->shutdown(); + if (ControlledDhcpv6Srv::getInstance()) { + ControlledDhcpv6Srv::getInstance()->shutdown(); } else { LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING); - ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure."); + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure."); return (answer); } - ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down."); + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down."); return (answer); } @@ -210,11 +219,11 @@ ControlledDhcpv6Srv::commandLibReloadHandler(const string&, ConstElementPtr) { bool status = HooksManager::loadLibraries(loaded); if (!status) { LOG_ERROR(dhcp6_logger, DHCP6_HOOKS_LIBS_RELOAD_FAIL); - ConstElementPtr answer = isc::config::createAnswer(1, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Failed to reload hooks libraries."); return (answer); } - ConstElementPtr answer = isc::config::createAnswer(0, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Hooks libraries successfully reloaded."); return (answer); } @@ -234,7 +243,7 @@ ControlledDhcpv6Srv::commandConfigReloadHandler(const string&, LOG_ERROR(dhcp6_logger, DHCP6_DYNAMIC_RECONFIGURATION_FAIL) .arg(file); return (createAnswer(CONTROL_RESULT_ERROR, - "Config reload failed:" + string(ex.what()))); + "Config reload failed: " + string(ex.what()))); } } @@ -469,7 +478,7 @@ ControlledDhcpv6Srv::commandVersionGetHandler(const string&, ConstElementPtr) { ElementPtr extended = Element::create(Dhcpv6Srv::getVersion(true)); ElementPtr arguments = Element::createMap(); arguments->set("extended", extended); - ConstElementPtr answer = isc::config::createAnswer(0, + ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, Dhcpv6Srv::getVersion(false), arguments); return (answer); @@ -478,14 +487,14 @@ ControlledDhcpv6Srv::commandVersionGetHandler(const string&, ConstElementPtr) { ConstElementPtr ControlledDhcpv6Srv::commandBuildReportHandler(const string&, ConstElementPtr) { ConstElementPtr answer = - isc::config::createAnswer(0, isc::detail::getConfigReport()); + isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport()); return (answer); } ConstElementPtr ControlledDhcpv6Srv::commandLeasesReclaimHandler(const string&, ConstElementPtr args) { - int status_code = 1; + int status_code = CONTROL_RESULT_ERROR; string message; // args must be { "remove": } @@ -508,9 +517,9 @@ ControlledDhcpv6Srv::commandLeasesReclaimHandler(const string&, return (answer); } -isc::data::ConstElementPtr -ControlledDhcpv6Srv::processCommand(const std::string& command, - isc::data::ConstElementPtr args) { +ConstElementPtr +ControlledDhcpv6Srv::processCommand(const string& command, + ConstElementPtr args) { string txt = args ? args->str() : "(none)"; LOG_DEBUG(dhcp6_logger, DBG_DHCP6_COMMAND, DHCP6_COMMAND_RECEIVED) @@ -519,12 +528,20 @@ ControlledDhcpv6Srv::processCommand(const std::string& command, ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance(); if (!srv) { - ConstElementPtr no_srv = isc::config::createAnswer(1, + ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Server object not initialized, can't process command '" + command + "', arguments: '" + txt + "'."); return (no_srv); } + if (srv->run_multithreaded_) { + { + ReverseLock rlk(srv->serverLock()); + srv->pkt_thread_pool_.destroy(); + } + srv->pkt_thread_pool_.create(Dhcpv6Srv::threadCount()); + } + try { if (command == "shutdown") { return (srv->commandShutdownHandler(command, args)); @@ -563,13 +580,12 @@ ControlledDhcpv6Srv::processCommand(const std::string& command, return (srv->commandConfigWriteHandler(command, args)); } - - return (isc::config::createAnswer(1, "Unrecognized command:" - + command)); - + return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " + + command)); } catch (const Exception& ex) { - return (isc::config::createAnswer(1, "Error while processing command '" - + command + "':" + ex.what())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" + + command + "': " + ex.what() + + ", params: '" + txt + "'")); } } @@ -581,11 +597,12 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance(); + // Single stream instance used in all error clauses + std::ostringstream err; + if (!srv) { - ConstElementPtr no_srv = isc::config::createAnswer( - CONTROL_RESULT_ERROR, - "Server object not initialized, can't process config."); - return (no_srv); + err << "Server object not initialized, can't process config."; + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } ConstElementPtr answer = configureDhcp6Server(*srv, config); @@ -599,8 +616,8 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { return (answer); } } catch (const std::exception& ex) { - return (isc::config::createAnswer(1, "Failed to process configuration:" - + string(ex.what()))); + err << "Failed to process configuration: " << ex.what(); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Re-open lease and host database with new parameters. @@ -611,14 +628,13 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { cfg_db->setAppendedParameters("universe=6"); cfg_db->createManagers(); } catch (const std::exception& ex) { - return (isc::config::createAnswer(1, "Unable to open database: " - + std::string(ex.what()))); + err << "Unable to open database: " << ex.what(); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Regenerate server identifier if needed. try { - const std::string duid_file = - std::string(CfgMgr::instance().getDataDir()) + "/" + + const std::string duid_file = CfgMgr::instance().getDataDir() + "/" + std::string(SERVER_DUID_FILE); DuidPtr duid = CfgMgr::instance().getStagingCfg()->getCfgDUID()->create(duid_file); server_->serverid_.reset(new Option(Option::V6, D6O_SERVERID, duid->getDuid())); @@ -631,17 +647,16 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { } catch (const std::exception& ex) { std::ostringstream err; err << "unable to configure server identifier: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Server will start DDNS communications if its enabled. try { srv->startD2(); } catch (const std::exception& ex) { - std::ostringstream err; - err << "error starting DHCP_DDNS client " - " after server reconfiguration: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + err << "Error starting DHCP_DDNS client after server reconfiguration: " + << ex.what(); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Setup DHCPv4-over-DHCPv6 IPC @@ -651,7 +666,7 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { std::ostringstream err; err << "error starting DHCPv4-over-DHCPv6 IPC " " after server reconfiguration: " << ex.what(); - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Configure DHCP packet queueing @@ -664,7 +679,6 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { } } catch (const std::exception& ex) { - std::ostringstream err; err << "Error setting packet queue controls after server reconfiguration: " << ex.what(); return (isc::config::createAnswer(1, err.str())); @@ -678,7 +692,7 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { // is no need to rollback configuration if socket fails to open on any // of the interfaces. CfgMgr::instance().getStagingCfg()->getCfgIface()-> - openSockets(AF_INET6, srv->getServerPort()); + openSockets(AF_INET6, srv->getServerPort(), srv->serverLock()); // Install the timers for handling leases reclamation. try { @@ -688,11 +702,10 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) { server_); } catch (const std::exception& ex) { - std::ostringstream err; err << "unable to setup timers for periodically running the" " reclamation of the expired leases: " << ex.what() << "."; - return (isc::config::createAnswer(1, err.str())); + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } // Setup config backend polling, if configured for it. @@ -758,24 +771,27 @@ ControlledDhcpv6Srv::checkConfig(isc::data::ConstElementPtr config) { ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance(); + // Single stream instance used in all error clauses + std::ostringstream err; + if (!srv) { - ConstElementPtr no_srv = isc::config::createAnswer(1, - "Server object not initialized, can't process config."); - return (no_srv); + err << "Server object not initialized, can't process config."; + return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str())); } return (configureDhcp6Server(*srv, config, true)); } ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port, - uint16_t client_port) - : Dhcpv6Srv(server_port, client_port), io_service_(), + uint16_t client_port, + bool run_multithreaded /*= false*/) + : Dhcpv6Srv(server_port, client_port, run_multithreaded), io_service_(), timer_mgr_(TimerMgr::instance()) { - if (server_) { + if (getInstance()) { isc_throw(InvalidOperation, "There is another Dhcpv6Srv instance already."); } - server_ = this; // remember this instance for use in callback + server_ = this; // remember this instance for later use in handlers // TimerMgr uses IO service to run asynchronous timers. TimerMgr::instance()->setIOService(getIOService()); @@ -794,26 +810,26 @@ ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port, CommandMgr::instance().registerCommand("config-reload", boost::bind(&ControlledDhcpv6Srv::commandConfigReloadHandler, this, _1, _2)); + CommandMgr::instance().registerCommand("config-set", + boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2)); + CommandMgr::instance().registerCommand("config-test", boost::bind(&ControlledDhcpv6Srv::commandConfigTestHandler, this, _1, _2)); CommandMgr::instance().registerCommand("config-write", boost::bind(&ControlledDhcpv6Srv::commandConfigWriteHandler, this, _1, _2)); - CommandMgr::instance().registerCommand("dhcp-disable", - boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2)); - CommandMgr::instance().registerCommand("dhcp-enable", boost::bind(&ControlledDhcpv6Srv::commandDhcpEnableHandler, this, _1, _2)); - CommandMgr::instance().registerCommand("leases-reclaim", - boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2)); + CommandMgr::instance().registerCommand("dhcp-disable", + boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2)); CommandMgr::instance().registerCommand("libreload", boost::bind(&ControlledDhcpv6Srv::commandLibReloadHandler, this, _1, _2)); - CommandMgr::instance().registerCommand("config-set", - boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2)); + CommandMgr::instance().registerCommand("leases-reclaim", + boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2)); CommandMgr::instance().registerCommand("shutdown", boost::bind(&ControlledDhcpv6Srv::commandShutdownHandler, this, _1, _2)); @@ -892,8 +908,8 @@ ControlledDhcpv6Srv::~ControlledDhcpv6Srv() { void ControlledDhcpv6Srv::sessionReader(void) { // Process one asio event. If there are more events, iface_mgr will call // this callback more than once. - if (server_) { - server_->io_service_.run_one(); + if (getInstance()) { + getInstance()->io_service_.run_one(); } } @@ -932,12 +948,13 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) { if (reopened) { // Cancel the timer. if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) { - TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); } + TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); + } // Set network state to service enabled network_state_->enableService(); - // Toss the reconnct control, we're done with it + // Toss the reconnect control, we're done with it db_reconnect_ctl.reset(); } else { if (!db_reconnect_ctl->checkRetries()) { @@ -955,7 +972,7 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) { if (!TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) { TimerMgr::instance()->registerTimer("Dhcp6DbReconnectTimer", boost::bind(&ControlledDhcpv6Srv::dbReconnect, this, - db_reconnect_ctl), + db_reconnect_ctl), db_reconnect_ctl->retryInterval(), asiolink::IntervalTimer::ONE_SHOT); } @@ -1019,5 +1036,5 @@ ControlledDhcpv6Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg, } } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp6/ctrl_dhcp6_srv.h b/src/bin/dhcp6/ctrl_dhcp6_srv.h index ef776c83a8..7b4f99884e 100644 --- a/src/bin/dhcp6/ctrl_dhcp6_srv.h +++ b/src/bin/dhcp6/ctrl_dhcp6_srv.h @@ -29,8 +29,10 @@ public: /// /// @param server_port UDP port to be opened for DHCP traffic /// @param client_port UDP port where all responses are sent to. + /// @param run_multithreaded enables or disables multithreaded mode ControlledDhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT, - uint16_t client_port = 0); + uint16_t client_port = 0, + bool run_multithreaded = false); /// @brief Destructor. virtual ~ControlledDhcpv6Srv(); @@ -65,7 +67,7 @@ public: /// @brief Initiates shutdown procedure for the whole DHCPv6 server. void shutdown(); - /// @brief command processor + /// @brief Command processor /// /// This method is uniform for all config backends. It processes received /// command (as a string + JSON arguments). Internally, it's just a @@ -75,9 +77,9 @@ public: /// Currently supported commands are: /// - config-reload /// - config-test - /// - leases-reclaim - /// - libreload /// - shutdown + /// - libreload + /// - leases-reclaim /// ... /// /// @note It never throws. @@ -89,7 +91,7 @@ public: static isc::data::ConstElementPtr processCommand(const std::string& command, isc::data::ConstElementPtr args); - /// @brief configuration processor + /// @brief Configuration processor /// /// This is a method for handling incoming configuration updates. /// This method should be called by all configuration backends when the @@ -114,7 +116,7 @@ public: isc::data::ConstElementPtr checkConfig(isc::data::ConstElementPtr new_config); - /// @brief returns pointer to the sole instance of Dhcpv6Srv + /// @brief Returns pointer to the sole instance of Dhcpv6Srv /// /// @return server instance (may return NULL, if called before server is spawned) static ControlledDhcpv6Srv* getInstance() { @@ -131,7 +133,7 @@ private: /// (that was sent from some yet unspecified sender). static void sessionReader(void); - /// @brief handler for processing 'shutdown' command + /// @brief Handler for processing 'shutdown' command /// /// This handler processes shutdown command, which initializes shutdown /// procedure. @@ -143,7 +145,7 @@ private: commandShutdownHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @brief handler for processing 'libreload' command + /// @brief Handler for processing 'libreload' command /// /// This handler processes libreload command, which unloads all hook /// libraries and reloads them. @@ -156,7 +158,7 @@ private: commandLibReloadHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @brief handler for processing 'config-reload' command + /// @brief Handler for processing 'config-reload' command /// /// This handler processes config-reload command, which processes /// configuration specified in args parameter. @@ -309,7 +311,6 @@ private: const bool remove_lease, const uint16_t max_unwarned_cycles); - /// @brief Deletes reclaimed leases and reschedules the timer. /// /// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6. @@ -334,6 +335,7 @@ private: /// /// If the maximum number of retries has been exhausted an error is logged /// and the server shuts down. + /// /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the /// configured reconnect parameters /// @@ -355,6 +357,8 @@ private: /// /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the /// configured reconnect parameters + /// + /// @return false if reconnect is not configured, true otherwise bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl); /// @brief Callback invoked periodically to fetch configuration updates @@ -387,7 +391,7 @@ private: TimerMgrPtr timer_mgr_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif diff --git a/src/bin/dhcp6/dhcp6_srv.cc b/src/bin/dhcp6/dhcp6_srv.cc index fba6cfbbe5..f3c0e45d8a 100644 --- a/src/bin/dhcp6/dhcp6_srv.cc +++ b/src/bin/dhcp6/dhcp6_srv.cc @@ -45,11 +45,11 @@ #include #include #include - #include #include #include #include +#include #include #include #include @@ -90,6 +90,8 @@ using namespace isc::stats; using namespace isc::util; using namespace std; +using isc::util::thread::LockGuard; + namespace { /// Structure that holds registered hook indexes @@ -172,19 +174,23 @@ createStatusCode(const Pkt6& pkt, const Option6IA& ia, const uint16_t status_cod return (option_status); } -}; // anonymous namespace +} // namespace namespace isc { namespace dhcp { const std::string Dhcpv6Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_"); -Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port) +Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port, bool run_multithreaded /* = false */) : io_service_(new IOService()), server_port_(server_port), client_port_(client_port), serverid_(), shutdown_(true), alloc_engine_(), name_change_reqs_(), network_state_(new NetworkState(NetworkState::DHCPv6)), - cb_control_(new CBControlDHCPv6()) { + cb_control_(new CBControlDHCPv6()), + run_multithreaded_(run_multithreaded) { + + mutex_.reset(new std::mutex()); + LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_OPEN_SOCKET) .arg(server_port); @@ -237,7 +243,7 @@ Dhcpv6Srv::~Dhcpv6Srv() { // LOG_ERROR(dhcp6_logger, DHCP6_SRV_DHCP4O6_ERROR).arg(ex.what()); } - IfaceMgr::instance().closeSockets(); + IfaceMgr::instance().closeSockets(serverLock()); LeaseMgrFactory::destroy(); @@ -251,7 +257,7 @@ void Dhcpv6Srv::shutdown() { } Pkt6Ptr Dhcpv6Srv::receivePacket(int timeout) { - return (IfaceMgr::instance().receive6(timeout)); + return (IfaceMgr::instance().receive6(timeout, 0, serverLock())); } void Dhcpv6Srv::sendPacket(const Pkt6Ptr& packet) { @@ -405,10 +411,21 @@ Dhcpv6Srv::initContext(const Pkt6Ptr& pkt, } bool Dhcpv6Srv::run() { + if (run_multithreaded_) { + // Creating the process packet thread pool + // The number of thread pool's threads should be read from configuration + // file or it should be determined by the number of hardware threads and + // the number of Cassandra DB nodes. + pkt_thread_pool_.create(Dhcpv6Srv::threadCount()); + } + while (!shutdown_) { try { run_one(); - getIOService()->poll(); + { + LockGuard lock(serverLock()); + getIOService()->poll(); + } } catch (const std::exception& e) { // General catch-all standard exceptions that are not caught by more // specific catches. @@ -421,6 +438,11 @@ bool Dhcpv6Srv::run() { } } + // destroying the thread pool + if (run_multithreaded_) { + pkt_thread_pool_.destroy(); + } + return (true); } @@ -430,11 +452,27 @@ void Dhcpv6Srv::run_one() { Pkt6Ptr rsp; try { + + // Do not read more packets from socket if there are enough + // packets to be processed in the packet thread pool queue + const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize(); + const auto queue_full_wait = std::chrono::milliseconds(1); + size_t pkt_queue_size = pkt_thread_pool_.count(); + if (pkt_queue_size >= Dhcpv6Srv::threadCount() * + max_queued_pkt_per_thread) { + std::this_thread::sleep_for(queue_full_wait); + return; + } + // Set select() timeout to 1s. This value should not be modified // because it is important that the select() returns control // frequently so as the IOService can be polled for ready handlers. uint32_t timeout = 1; - query = receivePacket(timeout); + { + // LOG_DEBUG(packet6_logger, DBG_DHCP6_DETAIL, DHCP6_BUFFER_WAIT).arg(timeout); + LockGuard lock(serverLock()); + query = receivePacket(timeout); + } // Log if packet has arrived. We can't log the detailed information // about the DHCP message because it hasn't been unpacked/parsed @@ -487,6 +525,7 @@ void Dhcpv6Srv::run_one() { // process could wait up to the duration of timeout of select() to // terminate. try { + LockGuard lock(serverLock()); handleSignal(); } catch (const std::exception& e) { // An (a standard or ISC) exception occurred. @@ -507,9 +546,32 @@ void Dhcpv6Srv::run_one() { .arg(query->getLabel()); return; } else { - processPacket(query, rsp); + if (run_multithreaded_) { + ThreadPool::WorkItemCallBack call_back = + std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow, this, query, rsp); + pkt_thread_pool_.add(call_back); + } else { + processPacketAndSendResponse(query, rsp); + } + } +} + +void +Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) { + try { + processPacketAndSendResponse(query, rsp); + } catch (const std::exception& e) { + LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION) + .arg(e.what()); + } catch (...) { + LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION); } +} +void +Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) { + LockGuard lock(serverLock()); + processPacket(query, rsp); if (!rsp) { return; } @@ -1289,7 +1351,7 @@ Dhcpv6Srv::sanityCheck(const Pkt6Ptr& pkt) { switch (pkt->getType()) { case DHCPV6_SOLICIT: case DHCPV6_REBIND: - case DHCPV6_CONFIRM: + case DHCPV6_CONFIRM: sanityCheck(pkt, MANDATORY, FORBIDDEN); return (true); @@ -1511,6 +1573,7 @@ Dhcpv6Srv::assignLeases(const Pkt6Ptr& question, Pkt6Ptr& answer, if (answer_opt) { answer->addOption(answer_opt); } + break; } default: break; @@ -2242,7 +2305,6 @@ Dhcpv6Srv::extendIA_PD(const Pkt6Ptr& query, (*l)->preferred_lft_, (*l)->valid_lft_)); ia_rsp->addOption(prf); - if (pd_exclude_requested) { // PD exclude option has been requested via ORO, thus we need to // include it if the pool configuration specifies this option. @@ -2257,7 +2319,6 @@ Dhcpv6Srv::extendIA_PD(const Pkt6Ptr& query, } } - LOG_INFO(lease6_logger, DHCP6_PD_LEASE_RENEW) .arg(query->getLabel()) .arg((*l)->addr_.toText()) @@ -3862,6 +3923,23 @@ Dhcpv6Srv::requestedInORO(const Pkt6Ptr& query, const uint16_t code) const { return (false); } +uint32_t Dhcpv6Srv::threadCount() { + uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount(); + if (sys_threads) { + return sys_threads; + } + sys_threads = std::thread::hardware_concurrency(); + return sys_threads * 4; +} + +uint32_t Dhcpv6Srv::maxThreadQueueSize() { + uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize(); + if (max_thread_queue_size) { + return max_thread_queue_size; + } + return 4; +} + void Dhcpv6Srv::discardPackets() { // Dump all of our current packets, anything that is mid-stream isc::dhcp::Pkt6Ptr pkt6ptr_empty; @@ -3869,5 +3947,5 @@ void Dhcpv6Srv::discardPackets() { HooksManager::clearParkingLots(); } -}; -}; +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp6/dhcp6_srv.h b/src/bin/dhcp6/dhcp6_srv.h index 1e0670cc6e..0484538cc7 100644 --- a/src/bin/dhcp6/dhcp6_srv.h +++ b/src/bin/dhcp6/dhcp6_srv.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,8 @@ #include #include #include +#include +#include // Undefine the macro OPTIONAL which is defined in some operating // systems but conflicts with a member of the RequirementLevel enum in @@ -51,7 +54,7 @@ public: /// @brief DHCPv6 server service. /// -/// This class represents DHCPv6 server. It contains all +/// This singleton class represents DHCPv6 server. It contains all /// top-level methods and routines necessary for server operation. /// In particular, it instantiates IfaceMgr, loads or generates DUID /// that is going to be used as server-identifier, receives incoming @@ -83,8 +86,10 @@ public: /// /// @param server_port port on which all sockets will listen /// @param client_port port to which all responses will be sent + /// @param run_multithreaded enables or disables multithreaded mode Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT, - uint16_t client_port = 0); + uint16_t client_port = 0, + bool run_multithreaded = false); /// @brief Destructor. Used during DHCPv6 service shutdown. virtual ~Dhcpv6Srv(); @@ -120,6 +125,17 @@ public: /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion() static std::string getVersion(bool extended); + /// @brief returns Kea DHCPv6 server thread count. + static uint32_t threadCount(); + + /// @brief returns Kea DHCPv6 server max thread queue size. + static uint32_t maxThreadQueueSize(); + + /// @brief returns Kea DHCPv6 server mutex. + std::mutex* serverLock() { + return mutex_.get(); + } + /// @brief Returns server-identifier option. /// /// @return server-id option @@ -1018,9 +1034,23 @@ protected: /// @brief Controls access to the configuration backends. CBControlDHCPv6Ptr cb_control_; + + /// @brief Packet processing thread pool + ThreadPool pkt_thread_pool_; + + // Global mutex used to serialize packet thread pool's threads + // on the not thread safe code and allow threads to run + // simultaneously on the thread safe portions + // (e.g. CqlLeaseMgr class instance). + boost::scoped_ptr mutex_; + + // Specifies if the application will use a thread pool or will process + // received DHCP packets on the main thread. + // It is mandatory to be set on false when running the test cases. + std::atomic_bool run_multithreaded_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif // DHCP6_SRV_H diff --git a/src/lib/dhcpsrv/Makefile.am b/src/lib/dhcpsrv/Makefile.am index 2c590feeab..491f6a4baa 100644 --- a/src/lib/dhcpsrv/Makefile.am +++ b/src/lib/dhcpsrv/Makefile.am @@ -115,6 +115,7 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr.cc lease_mgr.h libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h +libkea_dhcpsrv_la_SOURCES += thread_pool.cc thread_pool.h if HAVE_MYSQL libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h