// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
-#include <cc/data.h>
+
#include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
+#include <database/dbaccess_parser.h>
+#include <dhcp/libdhcp++.h>
#include <dhcp4/ctrl_dhcp4_srv.h>
#include <dhcp4/dhcp4_log.h>
#include <dhcp4/dhcp4to6_ipc.h>
-#include <dhcp4/parser_context.h>
#include <dhcp4/json_config_parser.h>
-#include <dhcpsrv/cfgmgr.h>
+#include <dhcp4/parser_context.h>
#include <dhcpsrv/cfg_db_access.h>
-#include <hooks/hooks.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
+#include <util/threads/lock_guard.h>
+#include <util/threads/reverse_lock.h>
+
#include <signal.h>
+
+#include <memory>
#include <sstream>
-using namespace isc::data;
+using namespace isc::config;
using namespace isc::db;
using namespace isc::dhcp;
+using namespace isc::data;
using namespace isc::hooks;
-using namespace isc::config;
using namespace isc::stats;
+using namespace isc::util::thread;
using namespace std;
namespace {
ControlledDhcpv4Srv* ControlledDhcpv4Srv::server_ = NULL;
-void
-ControlledDhcpv4Srv::init(const std::string& file_name) {
- // Configure the server using JSON file.
- ConstElementPtr result = loadConfigFile(file_name);
- int rcode;
- ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
- if (rcode != 0) {
- string reason = comment ? comment->stringValue() :
- "no details available";
- isc_throw(isc::BadValue, reason);
- }
-
- // We don't need to call openActiveSockets() or startD2() as these
- // methods are called in processConfig() which is called by
- // processCommand("config-set", ...)
-
- // Set signal handlers. When the SIGHUP is received by the process
- // the server reconfiguration will be triggered. When SIGTERM or
- // SIGINT will be received, the server will start shutting down.
- signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM));
- // Set the pointer to the handler function.
- signal_handler_ = signalHandler;
-}
-
-void ControlledDhcpv4Srv::cleanup() {
- // Nothing to do here. No need to disconnect from anything.
-}
-
/// @brief Configure DHCPv4 server using the configuration file specified.
///
/// This function is used to both configure the DHCP server on its startup
// configuration from a JSON file.
isc::data::ConstElementPtr json;
- isc::data::ConstElementPtr dhcp4;
- isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
try {
if (file_name.empty()) {
// Basic sanity check: file name must not be empty.
- isc_throw(isc::BadValue, "JSON configuration file not specified."
- " Please use -c command line option.");
+ isc_throw(isc::BadValue, "JSON configuration file not specified. Please "
+ "use -c command line option.");
}
// Read contents of the file and parse it as JSON
return (result);
}
+void
+ControlledDhcpv4Srv::init(const std::string& file_name) {
+ // Configure the server using JSON file.
+ ConstElementPtr result;
+ {
+ LockGuard<mutex> lock(serverLock());
+ result = loadConfigFile(file_name);
+ }
+
+ int rcode;
+ ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
+ if (rcode != 0) {
+ string reason = comment ? comment->stringValue() :
+ "no details available";
+ isc_throw(isc::BadValue, reason);
+ }
+
+ // We don't need to call openActiveSockets() or startD2() as these
+ // methods are called in processConfig() which is called by
+ // processCommand("config-set", ...)
+
+ // Set signal handlers. When the SIGHUP is received by the process
+ // the server reconfiguration will be triggered. When SIGTERM or
+ // SIGINT will be received, the server will start shutting down.
+ signal_set_.reset(new isc::util::SignalSet(SIGINT, SIGHUP, SIGTERM));
+ // Set the pointer to the handler function.
+ signal_handler_ = signalHandler;
+}
+
+void ControlledDhcpv4Srv::cleanup() {
+ // Nothing to do here. No need to disconnect from anything.
+}
ConstElementPtr
ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) {
ControlledDhcpv4Srv::getInstance()->shutdown();
} else {
LOG_WARN(dhcp4_logger, DHCP4_NOT_RUNNING);
- ConstElementPtr answer = isc::config::createAnswer(1,
- "Shutdown failure.");
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure.");
return (answer);
}
- ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down.");
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down.");
return (answer);
}
ConstElementPtr
ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
-
/// @todo delete any stored CalloutHandles referring to the old libraries
/// Get list of currently loaded libraries and reload them.
HookLibsCollection loaded = HooksManager::getLibraryInfo();
bool status = HooksManager::loadLibraries(loaded);
if (!status) {
LOG_ERROR(dhcp4_logger, DHCP4_HOOKS_LIBS_RELOAD_FAIL);
- ConstElementPtr answer = isc::config::createAnswer(1,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR,
"Failed to reload hooks libraries.");
return (answer);
}
- ConstElementPtr answer = isc::config::createAnswer(0,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
"Hooks libraries successfully reloaded.");
return (answer);
}
ConstElementPtr
ControlledDhcpv4Srv::commandConfigReloadHandler(const string&,
ConstElementPtr /*args*/) {
-
// Get configuration file name.
std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile();
try {
LOG_ERROR(dhcp4_logger, DHCP4_DYNAMIC_RECONFIGURATION_FAIL)
.arg(file);
return (createAnswer(CONTROL_RESULT_ERROR,
- "Config reload failed:" + string(ex.what())));
+ "Config reload failed: " + string(ex.what())));
}
}
}
ConstElementPtr
-ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
- ConstElementPtr args) {
+ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, ConstElementPtr args) {
string filename;
if (args) {
if (filename.empty()) {
// filename parameter was not specified, so let's use whatever we remember
+ // from the command-line
filename = getConfigFile();
}
ConstElementPtr
ControlledDhcpv4Srv::commandConfigSetHandler(const string&,
ConstElementPtr args) {
- const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error
+ const int status_code = CONTROL_RESULT_ERROR;
ConstElementPtr dhcp4;
string message;
// the logging first in case there's a configuration failure.
int rcode = 0;
isc::config::parseAnswer(rcode, result);
- if (rcode == 0) {
+ if (rcode == CONTROL_RESULT_SUCCESS) {
CfgMgr::instance().getStagingCfg()->applyLoggingCfg();
// Use new configuration.
ElementPtr extended = Element::create(Dhcpv4Srv::getVersion(true));
ElementPtr arguments = Element::createMap();
arguments->set("extended", extended);
- ConstElementPtr answer = isc::config::createAnswer(0,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
Dhcpv4Srv::getVersion(false),
arguments);
return (answer);
}
ConstElementPtr
-ControlledDhcpv4Srv::commandBuildReportHandler(const string&,
- ConstElementPtr) {
+ControlledDhcpv4Srv::commandBuildReportHandler(const string&, ConstElementPtr) {
ConstElementPtr answer =
- isc::config::createAnswer(0, isc::detail::getConfigReport());
+ isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport());
return (answer);
}
ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance();
if (!srv) {
- ConstElementPtr no_srv = isc::config::createAnswer(1,
- "Server object not initialized, so can't process command '" +
+ ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR,
+ "Server object not initialized, can't process command '" +
command + "', arguments: '" + txt + "'.");
return (no_srv);
}
+ if (srv->run_multithreaded_) {
+ {
+ ReverseLock<std::mutex> rlk(srv->serverLock());
+ srv->pkt_thread_pool_.destroy();
+ }
+ srv->pkt_thread_pool_.create(Dhcpv4Srv::threadCount());
+ }
+
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
return (srv->commandConfigWriteHandler(command, args));
}
- ConstElementPtr answer = isc::config::createAnswer(1,
- "Unrecognized command:" + command);
- return (answer);
+ return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " +
+ command));
} catch (const Exception& ex) {
- return (isc::config::createAnswer(1, "Error while processing command '"
- + command + "':" + ex.what() +
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" +
+ command + "': " + ex.what() +
", params: '" + txt + "'"));
}
}
if (!srv) {
err << "Server object not initialized, can't process config.";
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
ConstElementPtr answer = configureDhcp4Server(*srv, config);
return (answer);
}
} catch (const std::exception& ex) {
- err << "Failed to process configuration:" << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ err << "Failed to process configuration: " << ex.what();
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Re-open lease and host database with new parameters.
cfg_db->createManagers();
} catch (const std::exception& ex) {
err << "Unable to open database: " << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Server will start DDNS communications if its enabled.
} catch (const std::exception& ex) {
err << "Error starting DHCP_DDNS client after server reconfiguration: "
<< ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Setup DHCPv4-over-DHCPv6 IPC
std::ostringstream err;
err << "error starting DHCPv4-over-DHCPv6 IPC "
" after server reconfiguration: " << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Configure DHCP packet queueing
// is no need to rollback configuration if socket fails to open on any
// of the interfaces.
CfgMgr::instance().getStagingCfg()->getCfgIface()->
- openSockets(AF_INET, srv->getServerPort(),
+ openSockets(AF_INET, srv->getServerPort(), srv->serverLock(),
getInstance()->useBroadcast());
// Install the timers for handling leases reclamation.
err << "unable to setup timers for periodically running the"
" reclamation of the expired leases: "
<< ex.what() << ".";
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo();
ControlledDhcpv4Srv::checkConfig(isc::data::ConstElementPtr config) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_COMMAND, DHCP4_CONFIG_RECEIVED)
- .arg(config->str());
+ .arg(config->str());
ControlledDhcpv4Srv* srv = ControlledDhcpv4Srv::getInstance();
if (!srv) {
err << "Server object not initialized, can't process config.";
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
return (configureDhcp4Server(*srv, config, true));
}
ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t server_port /*= DHCP4_SERVER_PORT*/,
- uint16_t client_port /*= 0*/)
- : Dhcpv4Srv(server_port, client_port), io_service_(),
+ uint16_t client_port /*= 0*/,
+ bool run_multithreaded /*= false*/)
+ : Dhcpv4Srv(server_port, client_port, run_multithreaded), io_service_(),
timer_mgr_(TimerMgr::instance()) {
if (getInstance()) {
isc_throw(InvalidOperation,
CommandMgr::instance().registerCommand("statistic-get",
boost::bind(&StatsMgr::statisticGetHandler, _1, _2));
- CommandMgr::instance().registerCommand("statistic-reset",
- boost::bind(&StatsMgr::statisticResetHandler, _1, _2));
-
- CommandMgr::instance().registerCommand("statistic-remove",
- boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2));
-
CommandMgr::instance().registerCommand("statistic-get-all",
boost::bind(&StatsMgr::statisticGetAllHandler, _1, _2));
+ CommandMgr::instance().registerCommand("statistic-reset",
+ boost::bind(&StatsMgr::statisticResetHandler, _1, _2));
+
CommandMgr::instance().registerCommand("statistic-reset-all",
boost::bind(&StatsMgr::statisticResetAllHandler, _1, _2));
+ CommandMgr::instance().registerCommand("statistic-remove",
+ boost::bind(&StatsMgr::statisticRemoveHandler, _1, _2));
+
CommandMgr::instance().registerCommand("statistic-remove-all",
boost::bind(&StatsMgr::statisticRemoveAllHandler, _1, _2));
-
}
void ControlledDhcpv4Srv::shutdown() {
// Deregister any registered commands (please keep in alphabetic order)
CommandMgr::instance().deregisterCommand("build-report");
CommandMgr::instance().deregisterCommand("config-get");
+ CommandMgr::instance().deregisterCommand("config-set");
CommandMgr::instance().deregisterCommand("config-reload");
CommandMgr::instance().deregisterCommand("config-test");
CommandMgr::instance().deregisterCommand("config-write");
- CommandMgr::instance().deregisterCommand("leases-reclaim");
- CommandMgr::instance().deregisterCommand("libreload");
- CommandMgr::instance().deregisterCommand("config-set");
CommandMgr::instance().deregisterCommand("dhcp-disable");
CommandMgr::instance().deregisterCommand("dhcp-enable");
+ CommandMgr::instance().deregisterCommand("leases-reclaim");
+ CommandMgr::instance().deregisterCommand("libreload");
CommandMgr::instance().deregisterCommand("shutdown");
CommandMgr::instance().deregisterCommand("statistic-get");
CommandMgr::instance().deregisterCommand("statistic-get-all");
;
}
- server_ = NULL; // forget this instance. Noone should call any handlers at
- // this stage.
+ server_ = NULL; // forget this instance. There should be no callback anymore
+ // at this stage anyway.
}
void ControlledDhcpv4Srv::sessionReader(void) {
}
}
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+} // namespace dhcp
+} // namespace isc
///
/// @param server_port UDP port to be opened for DHCP traffic
/// @param client_port UDP port where all responses are sent to.
+ /// @param run_multithreaded enables or disables multithreaded mode
ControlledDhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT,
- uint16_t client_port = 0);
+ uint16_t client_port = 0,
+ bool run_multithreaded = false);
/// @brief Destructor.
- ~ControlledDhcpv4Srv();
+ virtual ~ControlledDhcpv4Srv();
/// @brief Initializes the server.
///
/// This method may throw if initialization fails.
void init(const std::string& config_file);
- /// @brief Loads specific config file
+ /// @brief Loads specific configuration file
///
/// This utility method is called whenever we know a filename of the config
/// and need to load it. It calls config-set command once the content of
/// the file has been loaded and verified to be a sane JSON configuration.
- /// config-set handler will process the config file (load it as current
+ /// config-set handler will process the config file (apply it as current
/// configuration).
///
/// @param file_name name of the file to be loaded
/// @brief Callback that will be called from iface_mgr when data
/// is received over control socket.
///
- /// This static callback method is called from IfaceMgr::receive6() method,
+ /// This static callback method is called from IfaceMgr::receive4() method,
/// when there is a new command or configuration sent over control socket
/// (that was sent from some yet unspecified sender).
static void sessionReader(void);
commandDhcpEnableHandler(const std::string& command,
isc::data::ConstElementPtr args);
-
/// @Brief handler for processing 'version-get' command
///
/// This handler processes version-get command, which returns
/// @brief Static pointer to the sole instance of the DHCP server.
///
/// This is required for config and command handlers to gain access to
- /// the server
+ /// the server. Some of them need to be static methods.
static ControlledDhcpv4Srv* server_;
/// @brief IOService object, used for all ASIO operations.
TimerMgrPtr timer_mgr_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <util/strutil.h>
-#include <stats/stats_mgr.h>
+#include <util/threads/lock_guard.h>
#include <log/logger.h>
#include <cryptolink/cryptolink.h>
#include <cfgrpt/config_report.h>
#include <iomanip>
+
using namespace isc;
using namespace isc::asiolink;
using namespace isc::cryptolink;
using namespace isc::stats;
using namespace std;
+using isc::util::thread::LockGuard;
+
namespace {
/// Structure that holds registered hook indexes
.arg(query_->getLabel())
.arg(classes.toText());
}
-};
+}
void
Dhcpv4Exchange::initResponse() {
const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port,
+ bool run_multithreaded /* = false */,
const bool use_bcast, const bool direct_response_desired)
: io_service_(new IOService()), shutdown_(true), alloc_engine_(),
server_port_(server_port), use_bcast_(use_bcast),
client_port_(client_port),
network_state_(new NetworkState(NetworkState::DHCPv4)),
- cb_control_(new CBControlDHCPv4()) {
+ cb_control_(new CBControlDHCPv4()),
+ run_multithreaded_(run_multithreaded) {
+
+ mutex_.reset(new std::mutex());
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET)
.arg(server_port);
+
try {
// Port 0 is used for testing purposes where we don't open broadcast
// capable sockets. So, set the packet filter handling direct traffic
bool
Dhcpv4Srv::run() {
+ if (run_multithreaded_) {
+ // Creating the process packet thread pool
+ // The number of thread pool's threads should be read from configuration
+ // file or it should be determined by the number of hardware threads and
+ // the number of Cassandra DB nodes.
+ pkt_thread_pool_.create(Dhcpv4Srv::threadCount());
+ }
+
while (!shutdown_) {
try {
run_one();
- getIOService()->poll();
+ {
+ LockGuard<mutex> lock(serverLock());
+ getIOService()->poll();
+ }
} catch (const std::exception& e) {
// General catch-all exception that are not caught by more specific
// catches. This one is for exceptions derived from std::exception.
}
}
+ // destroying the thread pool
+ if (run_multithreaded_) {
+ pkt_thread_pool_.destroy();
+ }
+
return (true);
}
Pkt4Ptr rsp;
try {
+
+ // Do not read more packets from socket if there are enough
+ // packets to be processed in the packet thread pool queue
+ const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
+ const auto queue_full_wait = std::chrono::milliseconds(1);
+ size_t pkt_queue_size = pkt_thread_pool_.count();
+ if (pkt_queue_size >= Dhcpv4Srv::threadCount() *
+ max_queued_pkt_per_thread) {
+ std::this_thread::sleep_for(queue_full_wait);
+ return;
+ }
+
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
- query = receivePacket(timeout);
+ {
+ // LOG_DEBUG(packet4_logger, DBG_DHCP4_DETAIL, DHCP4_BUFFER_WAIT).arg(timeout);
+ LockGuard<mutex> lock(serverLock());
+ query = receivePacket(timeout);
+ }
// Log if packet has arrived. We can't log the detailed information
// about the DHCP message because it hasn't been unpacked/parsed
// receivePacket the process could wait up to the duration of timeout
// of select() to terminate.
try {
+ LockGuard<mutex> lock(serverLock());
handleSignal();
} catch (const std::exception& e) {
// Standard exception occurred. Let's be on the safe side to
.arg(query->getLabel());
return;
} else {
- processPacket(query, rsp);
+ if (run_multithreaded_) {
+ ThreadPool::WorkItemCallBack call_back =
+ std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow, this, query, rsp);
+ pkt_thread_pool_.add(call_back);
+ } else {
+ processPacketAndSendResponse(query, rsp);
+ }
+ }
+}
+
+void
+Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+ try {
+ processPacketAndSendResponse(query, rsp);
+ } catch (const std::exception& e) {
+ LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
+ .arg(e.what());
+ } catch (...) {
+ LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
}
+}
+void
+Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+ LockGuard<mutex> lock(serverLock());
+ processPacket(query, rsp);
if (!rsp) {
return;
}
return (Hooks.hook_index_lease4_decline_);
}
+uint32_t Dhcpv4Srv::threadCount() {
+ uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+ if (sys_threads) {
+ return sys_threads;
+ }
+ sys_threads = std::thread::hardware_concurrency();
+ return sys_threads * 4;
+}
+
+uint32_t Dhcpv4Srv::maxThreadQueueSize() {
+ uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+ if (max_thread_queue_size) {
+ return max_thread_queue_size;
+ }
+ return 4;
+}
+
void Dhcpv4Srv::discardPackets() {
// Clear any packets held by the callhout handle store and
// all parked packets
HooksManager::clearParkingLots();
}
-} // namespace dhcp
-} // namespace isc
+} // namespace dhcp
+} // namespace isc
#include <dhcp/option4_client_fqdn.h>
#include <dhcp/option_custom.h>
#include <dhcp_ddns/ncr_msg.h>
+#include <dhcpsrv/thread_pool.h>
#include <dhcpsrv/alloc_engine.h>
#include <dhcpsrv/cb_ctl_dhcp4.h>
#include <dhcpsrv/cfg_option.h>
#include <functional>
#include <iostream>
#include <queue>
+#include <boost/scoped_ptr.hpp>
+#include <atomic>
// Undefine the macro OPTIONAL which is defined in some operating
// systems but conflicts with a member of the RequirementLevel enum in
///
/// @param server_port specifies port number to listen on
/// @param client_port specifies port number to send to
+ /// @param run_multithreaded enables or disables multithreaded mode
/// @param use_bcast configure sockets to support broadcast messages.
/// @param direct_response_desired specifies if it is desired to
/// use direct V4 traffic.
Dhcpv4Srv(uint16_t server_port = DHCP4_SERVER_PORT,
uint16_t client_port = 0,
+ bool run_multithreaded = false,
const bool use_bcast = true,
const bool direct_response_desired = true);
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
+ /// @brief returns Kea DHCPv4 server thread count.
+ static uint32_t threadCount();
+
+ /// @brief returns Kea DHCPv4 server max thread queue size.
+ static uint32_t maxThreadQueueSize();
+
+ /// @brief returns Kea DHCPv4 server mutex.
+ std::mutex* serverLock() {
+ return mutex_.get();
+ }
+
/// @brief Main server processing loop.
///
/// Main server processing loop. Call the processing step routine
/// a response.
void run_one();
+ /// @brief Process a single incoming DHCPv4 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
+ /// @brief Process a single incoming DHCPv4 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
/// @brief Process a single incoming DHCPv4 packet.
///
/// It verifies correctness of the passed packet, call per-type processXXX
///
/// @brief Get UDP port on which server should listen.
///
- /// Typically, server listens on UDP port number 67. Other ports are used
- /// for testing purposes only.
+ /// Typically, server listens on UDP port 67. Other ports are only
+ /// used for testing purposes.
///
/// @return UDP port on which server should listen.
uint16_t getServerPort() const {
/// @brief Controls access to the configuration backends.
CBControlDHCPv4Ptr cb_control_;
+ /// @brief Packet processing thread pool
+ ThreadPool pkt_thread_pool_;
+
+ // Global mutex used to serialize packet thread pool's threads
+ // on the not thread safe code and allow threads to run
+ // simultaneously on the thread safe portions
+ // (e.g. CqlLeaseMgr class instance).
+ boost::scoped_ptr<std::mutex> mutex_;
+
+ // Specifies if the application will use a thread pool or will process
+ // received DHCP packets on the main thread.
+ // It is mandatory to be set on false when running the test cases.
+ std::atomic_bool run_multithreaded_;
+
public:
/// Class methods for DHCPv4-over-DHCPv6 handler
static int getHookIndexLease4Decline();
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif // DHCP4_SRV_H
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
-#include <cc/data.h>
+
#include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
+#include <database/dbaccess_parser.h>
#include <dhcp/libdhcp++.h>
-#include <dhcpsrv/cfgmgr.h>
-#include <dhcpsrv/cfg_db_access.h>
#include <dhcp6/ctrl_dhcp6_srv.h>
-#include <dhcp6/dhcp6to4_ipc.h>
#include <dhcp6/dhcp6_log.h>
+#include <dhcp6/dhcp6to4_ipc.h>
#include <dhcp6/json_config_parser.h>
#include <dhcp6/parser_context.h>
+#include <dhcpsrv/cfg_db_access.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
+#include <util/threads/lock_guard.h>
+#include <util/threads/reverse_lock.h>
+
#include <signal.h>
+
+#include <memory>
#include <sstream>
using namespace isc::config;
using namespace isc::data;
using namespace isc::hooks;
using namespace isc::stats;
+using namespace isc::util::thread;
using namespace std;
namespace {
// configuration from a JSON file.
isc::data::ConstElementPtr json;
- isc::data::ConstElementPtr dhcp6;
- isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
// Now check is the returned result is successful (rcode=0) or not
// (see @ref isc::config::parseAnswer).
int rcode;
- isc::data::ConstElementPtr comment =
- isc::config::parseAnswer(rcode, result);
+ ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
if (rcode != 0) {
string reason = comment ? comment->stringValue() :
"no details available";
return (result);
}
-
void
ControlledDhcpv6Srv::init(const std::string& file_name) {
// Configure the server using JSON file.
- ConstElementPtr result = loadConfigFile(file_name);
+ ConstElementPtr result;
+ {
+ LockGuard<mutex> lock(serverLock());
+ result = loadConfigFile(file_name);
+ }
+
int rcode;
ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
if (rcode != 0) {
// Nothing to do here. No need to disconnect from anything.
}
-
ConstElementPtr
ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) {
- if (ControlledDhcpv6Srv::server_) {
- ControlledDhcpv6Srv::server_->shutdown();
+ if (ControlledDhcpv6Srv::getInstance()) {
+ ControlledDhcpv6Srv::getInstance()->shutdown();
} else {
LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING);
- ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure.");
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR, "Shutdown failure.");
return (answer);
}
- ConstElementPtr answer = isc::config::createAnswer(0, "Shutting down.");
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS, "Shutting down.");
return (answer);
}
bool status = HooksManager::loadLibraries(loaded);
if (!status) {
LOG_ERROR(dhcp6_logger, DHCP6_HOOKS_LIBS_RELOAD_FAIL);
- ConstElementPtr answer = isc::config::createAnswer(1,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_ERROR,
"Failed to reload hooks libraries.");
return (answer);
}
- ConstElementPtr answer = isc::config::createAnswer(0,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
"Hooks libraries successfully reloaded.");
return (answer);
}
LOG_ERROR(dhcp6_logger, DHCP6_DYNAMIC_RECONFIGURATION_FAIL)
.arg(file);
return (createAnswer(CONTROL_RESULT_ERROR,
- "Config reload failed:" + string(ex.what())));
+ "Config reload failed: " + string(ex.what())));
}
}
ElementPtr extended = Element::create(Dhcpv6Srv::getVersion(true));
ElementPtr arguments = Element::createMap();
arguments->set("extended", extended);
- ConstElementPtr answer = isc::config::createAnswer(0,
+ ConstElementPtr answer = isc::config::createAnswer(CONTROL_RESULT_SUCCESS,
Dhcpv6Srv::getVersion(false),
arguments);
return (answer);
ConstElementPtr
ControlledDhcpv6Srv::commandBuildReportHandler(const string&, ConstElementPtr) {
ConstElementPtr answer =
- isc::config::createAnswer(0, isc::detail::getConfigReport());
+ isc::config::createAnswer(CONTROL_RESULT_SUCCESS, isc::detail::getConfigReport());
return (answer);
}
ConstElementPtr
ControlledDhcpv6Srv::commandLeasesReclaimHandler(const string&,
ConstElementPtr args) {
- int status_code = 1;
+ int status_code = CONTROL_RESULT_ERROR;
string message;
// args must be { "remove": <bool> }
return (answer);
}
-isc::data::ConstElementPtr
-ControlledDhcpv6Srv::processCommand(const std::string& command,
- isc::data::ConstElementPtr args) {
+ConstElementPtr
+ControlledDhcpv6Srv::processCommand(const string& command,
+ ConstElementPtr args) {
string txt = args ? args->str() : "(none)";
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_COMMAND, DHCP6_COMMAND_RECEIVED)
ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
if (!srv) {
- ConstElementPtr no_srv = isc::config::createAnswer(1,
+ ConstElementPtr no_srv = isc::config::createAnswer(CONTROL_RESULT_ERROR,
"Server object not initialized, can't process command '" +
command + "', arguments: '" + txt + "'.");
return (no_srv);
}
+ if (srv->run_multithreaded_) {
+ {
+ ReverseLock<std::mutex> rlk(srv->serverLock());
+ srv->pkt_thread_pool_.destroy();
+ }
+ srv->pkt_thread_pool_.create(Dhcpv6Srv::threadCount());
+ }
+
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
return (srv->commandConfigWriteHandler(command, args));
}
-
- return (isc::config::createAnswer(1, "Unrecognized command:"
- + command));
-
+ return(isc::config::createAnswer(CONTROL_RESULT_ERROR, "Unrecognized command: " +
+ command));
} catch (const Exception& ex) {
- return (isc::config::createAnswer(1, "Error while processing command '"
- + command + "':" + ex.what()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, "Error while processing command '" +
+ command + "': " + ex.what() +
+ ", params: '" + txt + "'"));
}
}
ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
+ // Single stream instance used in all error clauses
+ std::ostringstream err;
+
if (!srv) {
- ConstElementPtr no_srv = isc::config::createAnswer(
- CONTROL_RESULT_ERROR,
- "Server object not initialized, can't process config.");
- return (no_srv);
+ err << "Server object not initialized, can't process config.";
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
ConstElementPtr answer = configureDhcp6Server(*srv, config);
return (answer);
}
} catch (const std::exception& ex) {
- return (isc::config::createAnswer(1, "Failed to process configuration:"
- + string(ex.what())));
+ err << "Failed to process configuration: " << ex.what();
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Re-open lease and host database with new parameters.
cfg_db->setAppendedParameters("universe=6");
cfg_db->createManagers();
} catch (const std::exception& ex) {
- return (isc::config::createAnswer(1, "Unable to open database: "
- + std::string(ex.what())));
+ err << "Unable to open database: " << ex.what();
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Regenerate server identifier if needed.
try {
- const std::string duid_file =
- std::string(CfgMgr::instance().getDataDir()) + "/" +
+ const std::string duid_file = CfgMgr::instance().getDataDir() + "/" +
std::string(SERVER_DUID_FILE);
DuidPtr duid = CfgMgr::instance().getStagingCfg()->getCfgDUID()->create(duid_file);
server_->serverid_.reset(new Option(Option::V6, D6O_SERVERID, duid->getDuid()));
} catch (const std::exception& ex) {
std::ostringstream err;
err << "unable to configure server identifier: " << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Server will start DDNS communications if its enabled.
try {
srv->startD2();
} catch (const std::exception& ex) {
- std::ostringstream err;
- err << "error starting DHCP_DDNS client "
- " after server reconfiguration: " << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ err << "Error starting DHCP_DDNS client after server reconfiguration: "
+ << ex.what();
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Setup DHCPv4-over-DHCPv6 IPC
std::ostringstream err;
err << "error starting DHCPv4-over-DHCPv6 IPC "
" after server reconfiguration: " << ex.what();
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Configure DHCP packet queueing
}
} catch (const std::exception& ex) {
- std::ostringstream err;
err << "Error setting packet queue controls after server reconfiguration: "
<< ex.what();
return (isc::config::createAnswer(1, err.str()));
// is no need to rollback configuration if socket fails to open on any
// of the interfaces.
CfgMgr::instance().getStagingCfg()->getCfgIface()->
- openSockets(AF_INET6, srv->getServerPort());
+ openSockets(AF_INET6, srv->getServerPort(), srv->serverLock());
// Install the timers for handling leases reclamation.
try {
server_);
} catch (const std::exception& ex) {
- std::ostringstream err;
err << "unable to setup timers for periodically running the"
" reclamation of the expired leases: "
<< ex.what() << ".";
- return (isc::config::createAnswer(1, err.str()));
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
// Setup config backend polling, if configured for it.
ControlledDhcpv6Srv* srv = ControlledDhcpv6Srv::getInstance();
+ // Single stream instance used in all error clauses
+ std::ostringstream err;
+
if (!srv) {
- ConstElementPtr no_srv = isc::config::createAnswer(1,
- "Server object not initialized, can't process config.");
- return (no_srv);
+ err << "Server object not initialized, can't process config.";
+ return (isc::config::createAnswer(CONTROL_RESULT_ERROR, err.str()));
}
return (configureDhcp6Server(*srv, config, true));
}
ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port,
- uint16_t client_port)
- : Dhcpv6Srv(server_port, client_port), io_service_(),
+ uint16_t client_port,
+ bool run_multithreaded /*= false*/)
+ : Dhcpv6Srv(server_port, client_port, run_multithreaded), io_service_(),
timer_mgr_(TimerMgr::instance()) {
- if (server_) {
+ if (getInstance()) {
isc_throw(InvalidOperation,
"There is another Dhcpv6Srv instance already.");
}
- server_ = this; // remember this instance for use in callback
+ server_ = this; // remember this instance for later use in handlers
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
CommandMgr::instance().registerCommand("config-reload",
boost::bind(&ControlledDhcpv6Srv::commandConfigReloadHandler, this, _1, _2));
+ CommandMgr::instance().registerCommand("config-set",
+ boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2));
+
CommandMgr::instance().registerCommand("config-test",
boost::bind(&ControlledDhcpv6Srv::commandConfigTestHandler, this, _1, _2));
CommandMgr::instance().registerCommand("config-write",
boost::bind(&ControlledDhcpv6Srv::commandConfigWriteHandler, this, _1, _2));
- CommandMgr::instance().registerCommand("dhcp-disable",
- boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2));
-
CommandMgr::instance().registerCommand("dhcp-enable",
boost::bind(&ControlledDhcpv6Srv::commandDhcpEnableHandler, this, _1, _2));
- CommandMgr::instance().registerCommand("leases-reclaim",
- boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2));
+ CommandMgr::instance().registerCommand("dhcp-disable",
+ boost::bind(&ControlledDhcpv6Srv::commandDhcpDisableHandler, this, _1, _2));
CommandMgr::instance().registerCommand("libreload",
boost::bind(&ControlledDhcpv6Srv::commandLibReloadHandler, this, _1, _2));
- CommandMgr::instance().registerCommand("config-set",
- boost::bind(&ControlledDhcpv6Srv::commandConfigSetHandler, this, _1, _2));
+ CommandMgr::instance().registerCommand("leases-reclaim",
+ boost::bind(&ControlledDhcpv6Srv::commandLeasesReclaimHandler, this, _1, _2));
CommandMgr::instance().registerCommand("shutdown",
boost::bind(&ControlledDhcpv6Srv::commandShutdownHandler, this, _1, _2));
void ControlledDhcpv6Srv::sessionReader(void) {
// Process one asio event. If there are more events, iface_mgr will call
// this callback more than once.
- if (server_) {
- server_->io_service_.run_one();
+ if (getInstance()) {
+ getInstance()->io_service_.run_one();
}
}
if (reopened) {
// Cancel the timer.
if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
- TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); }
+ TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer");
+ }
// Set network state to service enabled
network_state_->enableService();
- // Toss the reconnct control, we're done with it
+ // Toss the reconnect control, we're done with it
db_reconnect_ctl.reset();
} else {
if (!db_reconnect_ctl->checkRetries()) {
if (!TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
TimerMgr::instance()->registerTimer("Dhcp6DbReconnectTimer",
boost::bind(&ControlledDhcpv6Srv::dbReconnect, this,
- db_reconnect_ctl),
+ db_reconnect_ctl),
db_reconnect_ctl->retryInterval(),
asiolink::IntervalTimer::ONE_SHOT);
}
}
}
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+} // namespace dhcp
+} // namespace isc
///
/// @param server_port UDP port to be opened for DHCP traffic
/// @param client_port UDP port where all responses are sent to.
+ /// @param run_multithreaded enables or disables multithreaded mode
ControlledDhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
- uint16_t client_port = 0);
+ uint16_t client_port = 0,
+ bool run_multithreaded = false);
/// @brief Destructor.
virtual ~ControlledDhcpv6Srv();
/// @brief Initiates shutdown procedure for the whole DHCPv6 server.
void shutdown();
- /// @brief command processor
+ /// @brief Command processor
///
/// This method is uniform for all config backends. It processes received
/// command (as a string + JSON arguments). Internally, it's just a
/// Currently supported commands are:
/// - config-reload
/// - config-test
- /// - leases-reclaim
- /// - libreload
/// - shutdown
+ /// - libreload
+ /// - leases-reclaim
/// ...
///
/// @note It never throws.
static isc::data::ConstElementPtr
processCommand(const std::string& command, isc::data::ConstElementPtr args);
- /// @brief configuration processor
+ /// @brief Configuration processor
///
/// This is a method for handling incoming configuration updates.
/// This method should be called by all configuration backends when the
isc::data::ConstElementPtr
checkConfig(isc::data::ConstElementPtr new_config);
- /// @brief returns pointer to the sole instance of Dhcpv6Srv
+ /// @brief Returns pointer to the sole instance of Dhcpv6Srv
///
/// @return server instance (may return NULL, if called before server is spawned)
static ControlledDhcpv6Srv* getInstance() {
/// (that was sent from some yet unspecified sender).
static void sessionReader(void);
- /// @brief handler for processing 'shutdown' command
+ /// @brief Handler for processing 'shutdown' command
///
/// This handler processes shutdown command, which initializes shutdown
/// procedure.
commandShutdownHandler(const std::string& command,
isc::data::ConstElementPtr args);
- /// @brief handler for processing 'libreload' command
+ /// @brief Handler for processing 'libreload' command
///
/// This handler processes libreload command, which unloads all hook
/// libraries and reloads them.
commandLibReloadHandler(const std::string& command,
isc::data::ConstElementPtr args);
- /// @brief handler for processing 'config-reload' command
+ /// @brief Handler for processing 'config-reload' command
///
/// This handler processes config-reload command, which processes
/// configuration specified in args parameter.
const bool remove_lease,
const uint16_t max_unwarned_cycles);
-
/// @brief Deletes reclaimed leases and reschedules the timer.
///
/// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6.
///
/// If the maximum number of retries has been exhausted an error is logged
/// and the server shuts down.
+ ///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
///
///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
+ ///
+ /// @return false if reconnect is not configured, true otherwise
bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl);
/// @brief Callback invoked periodically to fetch configuration updates
TimerMgrPtr timer_mgr_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif
#include <hooks/hooks_log.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-
#include <util/encode/hex.h>
#include <util/io_utilities.h>
#include <util/pointer_util.h>
#include <util/range_utilities.h>
+#include <util/threads/lock_guard.h>
#include <log/logger.h>
#include <cryptolink/cryptolink.h>
#include <cfgrpt/config_report.h>
using namespace isc::util;
using namespace std;
+using isc::util::thread::LockGuard;
+
namespace {
/// Structure that holds registered hook indexes
return (option_status);
}
-}; // anonymous namespace
+} // namespace
namespace isc {
namespace dhcp {
const std::string Dhcpv6Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
-Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port)
+Dhcpv6Srv::Dhcpv6Srv(uint16_t server_port, uint16_t client_port, bool run_multithreaded /* = false */)
: io_service_(new IOService()), server_port_(server_port),
client_port_(client_port), serverid_(), shutdown_(true),
alloc_engine_(), name_change_reqs_(),
network_state_(new NetworkState(NetworkState::DHCPv6)),
- cb_control_(new CBControlDHCPv6()) {
+ cb_control_(new CBControlDHCPv6()),
+ run_multithreaded_(run_multithreaded) {
+
+ mutex_.reset(new std::mutex());
+
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_OPEN_SOCKET)
.arg(server_port);
// LOG_ERROR(dhcp6_logger, DHCP6_SRV_DHCP4O6_ERROR).arg(ex.what());
}
- IfaceMgr::instance().closeSockets();
+ IfaceMgr::instance().closeSockets(serverLock());
LeaseMgrFactory::destroy();
}
Pkt6Ptr Dhcpv6Srv::receivePacket(int timeout) {
- return (IfaceMgr::instance().receive6(timeout));
+ return (IfaceMgr::instance().receive6(timeout, 0, serverLock()));
}
void Dhcpv6Srv::sendPacket(const Pkt6Ptr& packet) {
}
bool Dhcpv6Srv::run() {
+ if (run_multithreaded_) {
+ // Creating the process packet thread pool
+ // The number of thread pool's threads should be read from configuration
+ // file or it should be determined by the number of hardware threads and
+ // the number of Cassandra DB nodes.
+ pkt_thread_pool_.create(Dhcpv6Srv::threadCount());
+ }
+
while (!shutdown_) {
try {
run_one();
- getIOService()->poll();
+ {
+ LockGuard<mutex> lock(serverLock());
+ getIOService()->poll();
+ }
} catch (const std::exception& e) {
// General catch-all standard exceptions that are not caught by more
// specific catches.
}
}
+ // destroying the thread pool
+ if (run_multithreaded_) {
+ pkt_thread_pool_.destroy();
+ }
+
return (true);
}
Pkt6Ptr rsp;
try {
+
+ // Do not read more packets from socket if there are enough
+ // packets to be processed in the packet thread pool queue
+ const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
+ const auto queue_full_wait = std::chrono::milliseconds(1);
+ size_t pkt_queue_size = pkt_thread_pool_.count();
+ if (pkt_queue_size >= Dhcpv6Srv::threadCount() *
+ max_queued_pkt_per_thread) {
+ std::this_thread::sleep_for(queue_full_wait);
+ return;
+ }
+
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
- query = receivePacket(timeout);
+ {
+ // LOG_DEBUG(packet6_logger, DBG_DHCP6_DETAIL, DHCP6_BUFFER_WAIT).arg(timeout);
+ LockGuard<mutex> lock(serverLock());
+ query = receivePacket(timeout);
+ }
// Log if packet has arrived. We can't log the detailed information
// about the DHCP message because it hasn't been unpacked/parsed
// process could wait up to the duration of timeout of select() to
// terminate.
try {
+ LockGuard<mutex> lock(serverLock());
handleSignal();
} catch (const std::exception& e) {
// An (a standard or ISC) exception occurred.
.arg(query->getLabel());
return;
} else {
- processPacket(query, rsp);
+ if (run_multithreaded_) {
+ ThreadPool::WorkItemCallBack call_back =
+ std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow, this, query, rsp);
+ pkt_thread_pool_.add(call_back);
+ } else {
+ processPacketAndSendResponse(query, rsp);
+ }
+ }
+}
+
+void
+Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+ try {
+ processPacketAndSendResponse(query, rsp);
+ } catch (const std::exception& e) {
+ LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
+ .arg(e.what());
+ } catch (...) {
+ LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
}
+}
+void
+Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+ LockGuard<mutex> lock(serverLock());
+ processPacket(query, rsp);
if (!rsp) {
return;
}
switch (pkt->getType()) {
case DHCPV6_SOLICIT:
case DHCPV6_REBIND:
- case DHCPV6_CONFIRM:
+ case DHCPV6_CONFIRM:
sanityCheck(pkt, MANDATORY, FORBIDDEN);
return (true);
if (answer_opt) {
answer->addOption(answer_opt);
}
+ break;
}
default:
break;
(*l)->preferred_lft_, (*l)->valid_lft_));
ia_rsp->addOption(prf);
-
if (pd_exclude_requested) {
// PD exclude option has been requested via ORO, thus we need to
// include it if the pool configuration specifies this option.
}
}
-
LOG_INFO(lease6_logger, DHCP6_PD_LEASE_RENEW)
.arg(query->getLabel())
.arg((*l)->addr_.toText())
return (false);
}
+uint32_t Dhcpv6Srv::threadCount() {
+ uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+ if (sys_threads) {
+ return sys_threads;
+ }
+ sys_threads = std::thread::hardware_concurrency();
+ return sys_threads * 4;
+}
+
+uint32_t Dhcpv6Srv::maxThreadQueueSize() {
+ uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+ if (max_thread_queue_size) {
+ return max_thread_queue_size;
+ }
+ return 4;
+}
+
void Dhcpv6Srv::discardPackets() {
// Dump all of our current packets, anything that is mid-stream
isc::dhcp::Pkt6Ptr pkt6ptr_empty;
HooksManager::clearParkingLots();
}
-};
-};
+} // namespace dhcp
+} // namespace isc
#include <dhcp/option6_ia.h>
#include <dhcp/option_definition.h>
#include <dhcp/pkt6.h>
+#include <dhcpsrv/thread_pool.h>
#include <dhcpsrv/alloc_engine.h>
#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/cb_ctl_dhcp6.h>
#include <functional>
#include <iostream>
#include <queue>
+#include <boost/scoped_ptr.hpp>
+#include <atomic>
// Undefine the macro OPTIONAL which is defined in some operating
// systems but conflicts with a member of the RequirementLevel enum in
/// @brief DHCPv6 server service.
///
-/// This class represents DHCPv6 server. It contains all
+/// This singleton class represents DHCPv6 server. It contains all
/// top-level methods and routines necessary for server operation.
/// In particular, it instantiates IfaceMgr, loads or generates DUID
/// that is going to be used as server-identifier, receives incoming
///
/// @param server_port port on which all sockets will listen
/// @param client_port port to which all responses will be sent
+ /// @param run_multithreaded enables or disables multithreaded mode
Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
- uint16_t client_port = 0);
+ uint16_t client_port = 0,
+ bool run_multithreaded = false);
/// @brief Destructor. Used during DHCPv6 service shutdown.
virtual ~Dhcpv6Srv();
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
+ /// @brief returns Kea DHCPv6 server thread count.
+ static uint32_t threadCount();
+
+ /// @brief returns Kea DHCPv6 server max thread queue size.
+ static uint32_t maxThreadQueueSize();
+
+ /// @brief returns Kea DHCPv6 server mutex.
+ std::mutex* serverLock() {
+ return mutex_.get();
+ }
+
/// @brief Returns server-identifier option.
///
/// @return server-id option
/// @brief Controls access to the configuration backends.
CBControlDHCPv6Ptr cb_control_;
+
+ /// @brief Packet processing thread pool
+ ThreadPool pkt_thread_pool_;
+
+ // Global mutex used to serialize packet thread pool's threads
+ // on the not thread safe code and allow threads to run
+ // simultaneously on the thread safe portions
+ // (e.g. CqlLeaseMgr class instance).
+ boost::scoped_ptr<std::mutex> mutex_;
+
+ // Specifies if the application will use a thread pool or will process
+ // received DHCP packets on the main thread.
+ // It is mandatory to be set on false when running the test cases.
+ std::atomic_bool run_multithreaded_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif // DHCP6_SRV_H
libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h
libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
+libkea_dhcpsrv_la_SOURCES += thread_pool.cc thread_pool.h
if HAVE_MYSQL
libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h