// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
-#include <cc/data.h>
+
#include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
#include <dhcp/libdhcp++.h>
-#include <dhcpsrv/cfgmgr.h>
-#include <dhcpsrv/cfg_db_access.h>
#include <dhcp4/ctrl_dhcp4_srv.h>
#include <dhcp4/dhcp4_log.h>
#include <dhcp4/dhcp4to6_ipc.h>
#include <dhcp4/json_config_parser.h>
#include <dhcp4/parser_context.h>
+#include <dhcpsrv/cfg_db_access.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
#include <signal.h>
+
#include <sstream>
using namespace isc::config;
-using namespace isc::db;
using namespace isc::data;
+using namespace isc::db;
using namespace isc::dhcp;
using namespace isc::hooks;
using namespace isc::stats;
+using namespace isc::util;
using namespace std;
namespace {
// configuration from a JSON file.
isc::data::ConstElementPtr json;
- isc::data::ConstElementPtr dhcp4;
- isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
ConstElementPtr
ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
-
/// @todo delete any stored CalloutHandles referring to the old libraries
/// Get list of currently loaded libraries and reload them.
HookLibsCollection loaded = HooksManager::getLibraryInfo();
ConstElementPtr
ControlledDhcpv4Srv::commandConfigReloadHandler(const string&,
ConstElementPtr /*args*/) {
-
// Get configuration file name.
std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile();
try {
if (filename.empty()) {
// filename parameter was not specified, so let's use whatever we remember
+ // from the command-line
filename = getConfigFile();
}
ConstElementPtr
ControlledDhcpv4Srv::commandConfigSetHandler(const string&,
ConstElementPtr args) {
- const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error
+ const int status_code = CONTROL_RESULT_ERROR;
ConstElementPtr dhcp4;
string message;
return (no_srv);
}
+ if (Dhcpv4Srv::threadCount()) {
+ if (srv->pkt_thread_pool_.size()) {
+ srv->pkt_thread_pool_.stop();
+ }
+ MultiThreadingMgr::instance().setMode(true);
+ srv->pkt_thread_pool_.start(Dhcpv4Srv::threadCount());
+ } else {
+ MultiThreadingMgr::instance().setMode(false);
+ }
+
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
return (isc::config::createAnswer(1, err.str()));
}
+ // Setup config backend polling, if configured for it.
auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo();
if (ctl_info) {
long fetch_time = static_cast<long>(ctl_info->getConfigFetchWaitTime());
CommandMgr::instance().deregisterCommand("build-report");
CommandMgr::instance().deregisterCommand("config-backend-pull");
CommandMgr::instance().deregisterCommand("config-get");
- CommandMgr::instance().deregisterCommand("config-reload");
CommandMgr::instance().deregisterCommand("config-set");
+ CommandMgr::instance().deregisterCommand("config-reload");
CommandMgr::instance().deregisterCommand("config-test");
CommandMgr::instance().deregisterCommand("config-write");
CommandMgr::instance().deregisterCommand("dhcp-disable");
;
}
- server_ = NULL; // forget this instance. Noone should call any handlers at
- // this stage.
+ server_ = NULL; // forget this instance. There should be no callback anymore
+ // at this stage anyway.
}
void ControlledDhcpv4Srv::sessionReader(void) {
}
}
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+} // namespace dhcp
+} // namespace isc
uint16_t client_port = 0);
/// @brief Destructor.
- ~ControlledDhcpv4Srv();
+ virtual ~ControlledDhcpv4Srv();
/// @brief Initializes the server.
///
/// This method may throw if initialization fails.
void init(const std::string& config_file);
- /// @brief Loads specific config file
+ /// @brief Loads specific configuration file
///
/// This utility method is called whenever we know a filename of the config
/// and need to load it. It calls config-set command once the content of
/// the file has been loaded and verified to be a sane JSON configuration.
- /// config-set handler will process the config file (load it as current
+ /// config-set handler will process the config file (apply it as current
/// configuration).
///
/// @param file_name name of the file to be loaded
return (server_);
}
-
private:
+
/// @brief Callback that will be called from iface_mgr when data
/// is received over control socket.
///
commandDhcpEnableHandler(const std::string& command,
isc::data::ConstElementPtr args);
-
/// @Brief handler for processing 'version-get' command
///
/// This handler processes version-get command, which returns
/// @brief Static pointer to the sole instance of the DHCP server.
///
/// This is required for config and command handlers to gain access to
- /// the server
+ /// the server. Some of them need to be static methods.
static ControlledDhcpv4Srv* server_;
/// @brief IOService object, used for all ASIO operations.
TimerMgrPtr timer_mgr_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif
#include <dhcpsrv/subnet.h>
#include <dhcpsrv/subnet_selector.h>
#include <dhcpsrv/utils.h>
-#include <dhcpsrv/utils.h>
#include <eval/evaluate.h>
#include <eval/eval_messages.h>
#include <hooks/callout_handle.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <util/strutil.h>
-#include <stats/stats_mgr.h>
#include <log/logger.h>
#include <cryptolink/cryptolink.h>
#include <cfgrpt/config_report.h>
using namespace isc::hooks;
using namespace isc::log;
using namespace isc::stats;
+using namespace isc::util;
using namespace std;
namespace {
hook_index_pkt4_receive_ = HooksManager::registerHook("pkt4_receive");
hook_index_subnet4_select_ = HooksManager::registerHook("subnet4_select");
hook_index_leases4_committed_ = HooksManager::registerHook("leases4_committed");
- hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send");
hook_index_lease4_release_ = HooksManager::registerHook("lease4_release");
+ hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send");
hook_index_buffer4_send_ = HooksManager::registerHook("buffer4_send");
hook_index_lease4_decline_ = HooksManager::registerHook("lease4_decline");
hook_index_host4_identifier_ = HooksManager::registerHook("host4_identifier");
.arg(query_->getLabel())
.arg(classes.toText());
}
-};
+}
void
Dhcpv4Exchange::initResponse() {
Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port,
const bool use_bcast, const bool direct_response_desired)
- : io_service_(new IOService()), shutdown_(true), alloc_engine_(),
- use_bcast_(use_bcast), server_port_(server_port),
- client_port_(client_port),
+ : io_service_(new IOService()), server_port_(server_port),
+ client_port_(client_port), shutdown_(true),
+ alloc_engine_(), use_bcast_(use_bcast),
network_state_(new NetworkState(NetworkState::DHCPv4)),
cb_control_(new CBControlDHCPv4()) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET)
.arg(server_port);
+
try {
// Port 0 is used for testing purposes where we don't open broadcast
// capable sockets. So, set the packet filter handling direct traffic
}
}
+ // destroying the thread pool
+ if (Dhcpv4Srv::threadCount()) {
+ pkt_thread_pool_.reset();
+ }
+
return (true);
}
Pkt4Ptr rsp;
try {
+
+ // Do not read more packets from socket if there are enough
+ // packets to be processed in the packet thread pool queue
+ const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
+ const auto queue_full_wait = std::chrono::milliseconds(1);
+ size_t pkt_queue_size = pkt_thread_pool_.count();
+ if (pkt_queue_size >= Dhcpv4Srv::threadCount() *
+ max_queued_pkt_per_thread) {
+ return;
+ }
+
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
.arg(query->getLabel());
return;
} else {
- processPacket(query, rsp);
+ if (Dhcpv4Srv::threadCount()) {
+ typedef function<void()> CallBack;
+ boost::shared_ptr<CallBack> call_back =
+ boost::make_shared<CallBack>(std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow,
+ this, query, rsp));
+ pkt_thread_pool_.add(call_back);
+ } else {
+ processPacketAndSendResponse(query, rsp);
+ }
}
+}
+void
+Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+ try {
+ processPacketAndSendResponse(query, rsp);
+ } catch (const std::exception& e) {
+ LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
+ .arg(e.what());
+ } catch (...) {
+ LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
+ }
+}
+
+void
+Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) {
+ processPacket(query, rsp);
if (!rsp) {
return;
}
return (Hooks.hook_index_lease4_decline_);
}
+uint32_t Dhcpv4Srv::threadCount() {
+ uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+ if (sys_threads) {
+ return sys_threads;
+ }
+ sys_threads = std::thread::hardware_concurrency();
+ return sys_threads * 0;
+}
+
+uint32_t Dhcpv4Srv::maxThreadQueueSize() {
+ uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+ if (max_thread_queue_size) {
+ return max_thread_queue_size;
+ }
+ return 4;
+}
+
void Dhcpv4Srv::discardPackets() {
// Clear any packets held by the callhout handle store and
// all parked packets
#include <asiolink/io_service.h>
#include <dhcp/dhcp4.h>
-#include <dhcp/pkt4.h>
#include <dhcp/option.h>
#include <dhcp/option_string.h>
#include <dhcp/option4_client_fqdn.h>
#include <dhcp/option_custom.h>
+#include <dhcp/pkt4.h>
#include <dhcp_ddns/ncr_msg.h>
#include <dhcpsrv/alloc_engine.h>
+#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/cb_ctl_dhcp4.h>
#include <dhcpsrv/cfg_option.h>
-#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/d2_client_mgr.h>
#include <dhcpsrv/network_state.h>
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
-
-#include <boost/noncopyable.hpp>
+#include <util/thread_pool.h>
#include <functional>
#include <iostream>
/// @brief Pointer to the allocation engine used by the server.
AllocEnginePtr alloc_engine_;
+
/// @brief Pointer to the DHCPv4 message sent by the client.
Pkt4Ptr query_;
+
/// @brief Pointer to the DHCPv4 message to be sent to the client.
Pkt4Ptr resp_;
+
/// @brief Context for use with allocation engine.
AllocEngine::ClientContext4Ptr context_;
+
/// @brief Configured option list.
/// @note The configured option list is an *ordered* list of
/// @c CfgOption objects used to append options to the response.
/// @brief Destructor. Used during DHCPv4 service shutdown.
virtual ~Dhcpv4Srv();
- /// @brief Checks if the server is running in a test mode.
+ /// @brief Checks if the server is running in unit test mode.
///
- /// @return true if the server is running in the test mode,
+ /// @return true if the server is running in unit test mode,
/// false otherwise.
bool inTestMode() const {
return (server_port_ == 0);
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
+ /// @brief returns Kea DHCPv4 server thread count.
+ static uint32_t threadCount();
+
+ /// @brief returns Kea DHCPv4 server max thread queue size.
+ static uint32_t maxThreadQueueSize();
+
/// @brief Main server processing loop.
///
/// Main server processing loop. Call the processing step routine
/// a response.
void run_one();
+ /// @brief Process a single incoming DHCPv4 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
+ /// @brief Process a single incoming DHCPv4 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp);
+
/// @brief Process a single incoming DHCPv4 packet.
///
/// It verifies correctness of the passed packet, call per-type processXXX
NameChangeSender::Result result,
dhcp_ddns::NameChangeRequestPtr& ncr);
- /// @brief Discard all in-progress packets
+ /// @brief Discards cached and parked packets
+ /// Clears the call_handle store and packet parking lots
+ /// of all packets. Called during reconfigure and shutdown.
void discardPackets();
protected:
bool& drop,
bool sanity_only = false) const;
- /// indicates if shutdown is in progress. Setting it to true will
- /// initiate server shutdown procedure.
- volatile bool shutdown_;
-
/// @brief dummy wrapper around IfaceMgr::receive4
///
/// This method is useful for testing purposes, where its replacement
void processPacketBufferSend(hooks::CalloutHandlePtr& callout_handle,
Pkt4Ptr& rsp);
- /// @brief Allocation Engine.
- /// Pointer to the allocation engine that we are currently using
- /// It must be a pointer, because we will support changing engines
- /// during normal operation (e.g. to use different allocators)
- boost::shared_ptr<AllocEngine> alloc_engine_;
private:
/// @return Option that contains netmask information
static OptionPtr getNetmaskOption(const Subnet4Ptr& subnet);
- /// Should broadcast be enabled on sockets (if true).
- bool use_bcast_;
-
protected:
/// UDP port number on which server listens.
uint16_t server_port_;
- /// UDP port number to which server sends responses.
+ /// UDP port number to which server sends all responses.
uint16_t client_port_;
+ /// Indicates if shutdown is in progress. Setting it to true will
+ /// initiate server shutdown procedure.
+ volatile bool shutdown_;
+
+ /// @brief Allocation Engine.
+ /// Pointer to the allocation engine that we are currently using
+ /// It must be a pointer, because we will support changing engines
+ /// during normal operation (e.g. to use different allocators)
+ boost::shared_ptr<AllocEngine> alloc_engine_;
+
+ /// Should broadcast be enabled on sockets (if true).
+ bool use_bcast_;
+
/// @brief Holds information about disabled DHCP service and/or
/// disabled subnet/network scopes.
NetworkStatePtr network_state_;
/// @brief Controls access to the configuration backends.
CBControlDHCPv4Ptr cb_control_;
+ /// @brief Packet processing thread pool
+ isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
+
public:
/// Class methods for DHCPv4-over-DHCPv6 handler
static int getHookIndexLease4Decline();
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif // DHCP4_SRV_H
#include <config.h>
#include <kea_version.h>
+#include <cfgrpt/config_report.h>
#include <dhcp4/ctrl_dhcp4_srv.h>
#include <dhcp4/dhcp4_log.h>
#include <dhcp4/parser_context.h>
#include <dhcp4/json_config_parser.h>
-#include <cc/command_interpreter.h>
#include <dhcpsrv/cfgmgr.h>
+#include <exceptions/exceptions.h>
#include <log/logger_support.h>
#include <log/logger_manager.h>
-#include <cfgrpt/config_report.h>
+#include <process/daemon.h>
#include <boost/lexical_cast.hpp>
/// instantiates ControlledDhcpv4Srv class that is responsible for establishing
/// connection with msgq (receiving commands and configuration) and also
/// creating Dhcpv4 server object as well.
+///
+/// For detailed explanation or relations between main(), ControlledDhcpv4Srv,
+/// Dhcpv4Srv and other classes, see \ref dhcpv4Session.
namespace {
<< "(useful for testing only)" << endl;
cerr << " -P number: specify non-standard client port number 1-65535 "
<< "(useful for testing only)" << endl;
+ cerr << " -N number: specify thread count 0-65535 "
+ << "(0 means multi-threading disabled)" << endl;
exit(EXIT_FAILURE);
}
-} // end of anonymous namespace
+} // namespace
int
main(int argc, char* argv[]) {
int server_port_number = DHCP4_SERVER_PORT;
// Not zero values are useful for testing only.
int client_port_number = 0;
+ // Number of threads. 0 means multi-threading disabled
+ int thread_count = 0;
bool verbose_mode = false; // Should server be verbose?
bool check_mode = false; // Check syntax
config_file = optarg;
break;
- case 'p':
+ case 'p': // server port number
try {
server_port_number = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
}
break;
- case 'P':
+ case 'P': // client port number
try {
client_port_number = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
}
break;
+ case 'N': // number of threads
+ try {
+ thread_count = boost::lexical_cast<int>(optarg);
+ } catch (const boost::bad_lexical_cast &) {
+ cerr << "Failed to parse thread count number: [" << optarg
+ << "], 0-65535 allowed." << endl;
+ usage();
+ }
+ if (thread_count < 0 || thread_count > 65535) {
+ cerr << "Failed to parse thread count number: [" << optarg
+ << "], 0-65535 allowed." << endl;
+ usage();
+ }
+ break;
+
default:
usage();
}
usage();
}
-
// Configuration file is required.
if (config_file.empty()) {
cerr << "Configuration file not specified." << endl;
if (check_mode) {
try {
-
// We need to initialize logging, in case any error messages are to be printed.
// This is just a test, so we don't care about lockfile.
setenv("KEA_LOCKFILE_DIR", "none", 0);
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
-#include <cc/data.h>
+
#include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
#include <dhcp/libdhcp++.h>
-#include <dhcpsrv/cfgmgr.h>
-#include <dhcpsrv/cfg_db_access.h>
#include <dhcp6/ctrl_dhcp6_srv.h>
#include <dhcp6/dhcp6_log.h>
#include <dhcp6/dhcp6to4_ipc.h>
#include <dhcp6/json_config_parser.h>
#include <dhcp6/parser_context.h>
+#include <dhcpsrv/cfg_db_access.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/db_type.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-#include <cfgrpt/config_report.h>
#include <signal.h>
+
#include <sstream>
using namespace isc::config;
-using namespace isc::db;
using namespace isc::data;
+using namespace isc::db;
using namespace isc::dhcp;
using namespace isc::hooks;
using namespace isc::stats;
+using namespace isc::util;
using namespace std;
namespace {
// configuration from a JSON file.
isc::data::ConstElementPtr json;
- isc::data::ConstElementPtr dhcp6;
- isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
return (result);
}
-
void
ControlledDhcpv6Srv::init(const std::string& file_name) {
// Keep the call timestamp.
// Configure the server using JSON file.
ConstElementPtr result = loadConfigFile(file_name);
+
int rcode;
ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
if (rcode != 0) {
// Nothing to do here. No need to disconnect from anything.
}
-
ConstElementPtr
ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) {
- if (ControlledDhcpv6Srv::server_) {
- ControlledDhcpv6Srv::server_->shutdown();
+ if (ControlledDhcpv6Srv::getInstance()) {
+ ControlledDhcpv6Srv::getInstance()->shutdown();
} else {
LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING);
ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure.");
isc::config::parseAnswer(rcode, result);
if (rcode == CONTROL_RESULT_SUCCESS) {
CfgMgr::instance().getStagingCfg()->applyLoggingCfg();
+
// Use new configuration.
CfgMgr::instance().commit();
} else {
return (no_srv);
}
+ if (Dhcpv6Srv::threadCount()) {
+ if (srv->pkt_thread_pool_.size()) {
+ srv->pkt_thread_pool_.stop();
+ }
+ MultiThreadingMgr::instance().setMode(true);
+ srv->pkt_thread_pool_.start(Dhcpv6Srv::threadCount());
+ } else {
+ MultiThreadingMgr::instance().setMode(false);
+ }
+
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
uint16_t client_port)
: Dhcpv6Srv(server_port, client_port), io_service_(),
timer_mgr_(TimerMgr::instance()) {
- if (server_) {
+ if (getInstance()) {
isc_throw(InvalidOperation,
"There is another Dhcpv6Srv instance already.");
}
- server_ = this; // remember this instance for use in callback
+ server_ = this; // remember this instance for later use in handlers
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
void ControlledDhcpv6Srv::sessionReader(void) {
// Process one asio event. If there are more events, iface_mgr will call
// this callback more than once.
- if (server_) {
- server_->io_service_.run_one();
+ if (getInstance()) {
+ getInstance()->io_service_.run_one();
}
}
if (reopened) {
// Cancel the timer.
if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
- TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); }
+ TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer");
+ }
// Set network state to service enabled
network_state_->enableService();
- // Toss the reconnct control, we're done with it
+ // Toss the reconnect control, we're done with it
db_reconnect_ctl.reset();
} else {
if (!db_reconnect_ctl->checkRetries()) {
if (!TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
TimerMgr::instance()->registerTimer("Dhcp6DbReconnectTimer",
boost::bind(&ControlledDhcpv6Srv::dbReconnect, this,
- db_reconnect_ctl),
+ db_reconnect_ctl),
db_reconnect_ctl->retryInterval(),
asiolink::IntervalTimer::ONE_SHOT);
}
}
}
-}; // end of isc::dhcp namespace
-}; // end of isc namespace
+} // namespace dhcp
+} // namespace isc
/// @brief Initiates shutdown procedure for the whole DHCPv6 server.
void shutdown();
- /// @brief command processor
+ /// @brief Command processor
///
/// This method is uniform for all config backends. It processes received
/// command (as a string + JSON arguments). Internally, it's just a
/// Currently supported commands are:
/// - config-reload
/// - config-test
- /// - leases-reclaim
- /// - libreload
/// - shutdown
+ /// - libreload
+ /// - leases-reclaim
/// ...
///
/// @note It never throws.
static isc::data::ConstElementPtr
processCommand(const std::string& command, isc::data::ConstElementPtr args);
- /// @brief configuration processor
+ /// @brief Configuration processor
///
/// This is a method for handling incoming configuration updates.
/// This method should be called by all configuration backends when the
isc::data::ConstElementPtr
checkConfig(isc::data::ConstElementPtr new_config);
- /// @brief returns pointer to the sole instance of Dhcpv6Srv
+ /// @brief Returns pointer to the sole instance of Dhcpv6Srv
///
/// @return server instance (may return NULL, if called before server is spawned)
static ControlledDhcpv6Srv* getInstance() {
/// (that was sent from some yet unspecified sender).
static void sessionReader(void);
- /// @brief handler for processing 'shutdown' command
+ /// @brief Handler for processing 'shutdown' command
///
/// This handler processes shutdown command, which initializes shutdown
/// procedure.
commandShutdownHandler(const std::string& command,
isc::data::ConstElementPtr args);
- /// @brief handler for processing 'libreload' command
+ /// @brief Handler for processing 'libreload' command
///
/// This handler processes libreload command, which unloads all hook
/// libraries and reloads them.
commandLibReloadHandler(const std::string& command,
isc::data::ConstElementPtr args);
- /// @brief handler for processing 'config-reload' command
+ /// @brief Handler for processing 'config-reload' command
///
/// This handler processes config-reload command, which processes
/// configuration specified in args parameter.
const bool remove_lease,
const uint16_t max_unwarned_cycles);
-
/// @brief Deletes reclaimed leases and reschedules the timer.
///
/// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6.
///
/// If the maximum number of retries has been exhausted an error is logged
/// and the server shuts down.
+ ///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
///
///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
+ ///
+ /// @return false if reconnect is not configured, true otherwise
bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl);
/// @brief Callback invoked periodically to fetch configuration updates
TimerMgrPtr timer_mgr_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif
#include <hooks/hooks_log.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
-
#include <util/encode/hex.h>
#include <util/io_utilities.h>
#include <util/pointer_util.h>
}
}
+ // destroying the thread pool
+ if (Dhcpv6Srv::threadCount()) {
+ pkt_thread_pool_.reset();
+ }
+
return (true);
}
Pkt6Ptr rsp;
try {
+
+ // Do not read more packets from socket if there are enough
+ // packets to be processed in the packet thread pool queue
+ const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
+ const auto queue_full_wait = std::chrono::milliseconds(1);
+ size_t pkt_queue_size = pkt_thread_pool_.count();
+ if (pkt_queue_size >= Dhcpv6Srv::threadCount() *
+ max_queued_pkt_per_thread) {
+ return;
+ }
+
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
.arg(query->getLabel());
return;
} else {
- processPacket(query, rsp);
+ if (Dhcpv6Srv::threadCount()) {
+ typedef function<void()> CallBack;
+ boost::shared_ptr<CallBack> call_back =
+ boost::make_shared<CallBack>(std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow,
+ this, query, rsp));
+ pkt_thread_pool_.add(call_back);
+ } else {
+ processPacketAndSendResponse(query, rsp);
+ }
+ }
+}
+
+void
+Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+ try {
+ processPacketAndSendResponse(query, rsp);
+ } catch (const std::exception& e) {
+ LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
+ .arg(e.what());
+ } catch (...) {
+ LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
}
+}
+void
+Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
+ processPacket(query, rsp);
if (!rsp) {
return;
}
return (false);
}
+uint32_t Dhcpv6Srv::threadCount() {
+ uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
+ if (sys_threads) {
+ return sys_threads;
+ }
+ sys_threads = std::thread::hardware_concurrency();
+ return sys_threads * 0;
+}
+
+uint32_t Dhcpv6Srv::maxThreadQueueSize() {
+ uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
+ if (max_thread_queue_size) {
+ return max_thread_queue_size;
+ }
+ return 4;
+}
+
void Dhcpv6Srv::discardPackets() {
// Dump all of our current packets, anything that is mid-stream
isc::dhcp::Pkt6Ptr pkt6ptr_empty;
#define DHCPV6_SRV_H
#include <asiolink/io_service.h>
-#include <dhcp_ddns/ncr_msg.h>
#include <dhcp/dhcp6.h>
#include <dhcp/duid.h>
#include <dhcp/option.h>
+#include <dhcp/option_string.h>
#include <dhcp/option6_client_fqdn.h>
#include <dhcp/option6_ia.h>
+#include <dhcp/option_custom.h>
#include <dhcp/option_definition.h>
+#include <dhcp_ddns/ncr_msg.h>
#include <dhcp/pkt6.h>
#include <dhcpsrv/alloc_engine.h>
#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
+#include <util/thread_pool.h>
#include <functional>
#include <iostream>
/// @brief DHCPv6 server service.
///
-/// This class represents DHCPv6 server. It contains all
+/// This singleton class represents DHCPv6 server. It contains all
/// top-level methods and routines necessary for server operation.
/// In particular, it instantiates IfaceMgr, loads or generates DUID
/// that is going to be used as server-identifier, receives incoming
/// packets, processes them, manages leases assignment and generates
/// appropriate responses.
+///
+/// This class does not support any controlling mechanisms directly.
+/// See the derived \ref ControlledDhcpv6Srv class for support for
+/// command and configuration updates over msgq.
class Dhcpv6Srv : public process::Daemon {
private:
/// Instantiates necessary services, required to run DHCPv6 server.
/// In particular, creates IfaceMgr that will be responsible for
/// network interaction. Will instantiate lease manager, and load
- /// old or create new DUID.
+ /// old or create new DUID. It is possible to specify alternate
+ /// port on which DHCPv6 server will listen on and alternate port
+ /// where DHCPv6 server sends all responses to. Those are mostly useful
+ /// for testing purposes.
///
- /// @param server_port port on which all sockets will listen
- /// @param client_port port to which all responses will be sent
+ /// @param server_port specifies port number to listen on
+ /// @param client_port specifies port number to send to
Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
uint16_t client_port = 0);
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
+ /// @brief returns Kea DHCPv6 server thread count.
+ static uint32_t threadCount();
+
+ /// @brief returns Kea DHCPv6 server max thread queue size.
+ static uint32_t maxThreadQueueSize();
+
/// @brief Returns server-identifier option.
///
/// @return server-id option
/// a response.
void run_one();
+ /// @brief Process a single incoming DHCPv6 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp);
+
+ /// @brief Process a single incoming DHCPv6 packet and sends the response.
+ ///
+ /// It verifies correctness of the passed packet, call per-type processXXX
+ /// methods, generates appropriate answer, sends the answer to the client.
+ ///
+ /// @param query A pointer to the packet to be processed.
+ /// @param rsp A pointer to the response
+ void processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp);
+
/// @brief Process a single incoming DHCPv6 packet.
///
/// It verifies correctness of the passed packet, call per-type processXXX
/// @brief Instructs the server to shut down.
void shutdown();
+ ///
+ /// @name Public accessors returning values required to (re)open sockets.
+ ///
+ //@{
+ ///
/// @brief Get UDP port on which server should listen.
///
- /// Typically, server listens on UDP port 547. Other ports are only
- /// used for testing purposes.
+ /// Typically, server listens on UDP port number 547. Other ports are used
+ /// for testing purposes only.
///
/// @return UDP port on which server should listen.
uint16_t getServerPort() const {
return (server_port_);
}
+ //@}
/// @brief Starts DHCP_DDNS client IO if DDNS updates are enabled.
///
/// @brief Controls access to the configuration backends.
CBControlDHCPv6Ptr cb_control_;
+
+ /// @brief Packet processing thread pool
+ isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
};
-}; // namespace isc::dhcp
-}; // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif // DHCP6_SRV_H
#include <config.h>
#include <kea_version.h>
+#include <cfgrpt/config_report.h>
#include <dhcp6/ctrl_dhcp6_srv.h>
#include <dhcp6/dhcp6_log.h>
#include <dhcp6/parser_context.h>
#include <dhcp6/json_config_parser.h>
#include <dhcpsrv/cfgmgr.h>
+#include <exceptions/exceptions.h>
#include <log/logger_support.h>
#include <log/logger_manager.h>
-#include <exceptions/exceptions.h>
-#include <cfgrpt/config_report.h>
#include <process/daemon.h>
#include <boost/lexical_cast.hpp>
/// Dhcpv6Srv and other classes, see \ref dhcpv6Session.
namespace {
-const char* const DHCP6_NAME = "kea-dhcp6";
-const char* const DHCP6_LOGGER_NAME = "kea-dhcp6";
+const char* const DHCP6_NAME = "kea-dhcp6";
/// @brief Prints Kea Usage and exits
///
<< "(useful for testing only)" << endl;
cerr << " -P number: specify non-standard client port number 1-65535 "
<< "(useful for testing only)" << endl;
+ cerr << " -N number: specify thread count 0-65535 "
+ << "(0 means multi-threading disabled)" << endl;
exit(EXIT_FAILURE);
}
-} // end of anonymous namespace
+} // namespace
int
main(int argc, char* argv[]) {
int server_port_number = DHCP6_SERVER_PORT;
// Not zero values are useful for testing only.
int client_port_number = 0;
+ // Number of threads. 0 means multi-threading disabled
+ int thread_count = 0;
bool verbose_mode = false; // Should server be verbose?
bool check_mode = false; // Check syntax
}
break;
+ case 'N': // number of threads
+ try {
+ thread_count = boost::lexical_cast<int>(optarg);
+ } catch (const boost::bad_lexical_cast &) {
+ cerr << "Failed to parse thread count number: [" << optarg
+ << "], 0-65535 allowed." << endl;
+ usage();
+ }
+ if (thread_count < 0 || thread_count > 65535) {
+ cerr << "Failed to parse thread count number: [" << optarg
+ << "], 0-65535 allowed." << endl;
+ usage();
+ }
+ break;
+
default:
usage();
}
cerr << "Error encountered: " << answer->stringValue() << endl;
return (EXIT_FAILURE);
}
-
-
- return (EXIT_SUCCESS);
} catch (const std::exception& ex) {
- cerr << "Syntax check failed with " << ex.what() << endl;
+ cerr << "Syntax check failed with: " << ex.what() << endl;
}
return (EXIT_FAILURE);
}
// It is important that we set a default logger name because this name
// will be used when the user doesn't provide the logging configuration
// in the Kea configuration file.
- Daemon::setDefaultLoggerName(DHCP6_LOGGER_NAME);
+ Daemon::setDefaultLoggerName(DHCP6_ROOT_LOGGER_NAME);
// Initialize logging. If verbose, we'll use maximum verbosity.
- Daemon::loggerInit(DHCP6_LOGGER_NAME, verbose_mode);
-
+ Daemon::loggerInit(DHCP6_ROOT_LOGGER_NAME, verbose_mode);
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_START_INFO)
.arg(getpid())
.arg(server_port_number)
// Remember verbose-mode
server.setVerbose(verbose_mode);
- // Create our PID file
+ // Create our PID file.
server.setProcName(DHCP6_NAME);
server.setConfigFile(config_file);
server.createPIDFile();
try {
- // Initialize the server, e.g. establish control session
- // Read a configuration file
+ // Initialize the server.
server.init(config_file);
-
} catch (const std::exception& ex) {
try {
LOG_ERROR(dhcp6_logger, DHCP6_INIT_FAIL).arg(ex.what());
} catch (...) {
// The exception thrown during the initialization could
- // originate from logger subsystem. Therefore LOG_ERROR() may
- // fail as well.
+ // originate from logger subsystem. Therefore LOG_ERROR()
+ // may fail as well.
cerr << "Failed to initialize server: " << ex.what() << endl;
}
}
ret = EXIT_FAILURE;
} catch (const std::exception& ex) {
-
// First, we print the error on stderr (that should always work)
cerr << DHCP6_NAME << "Fatal error during start up: " << ex.what()
<< endl;
cfg_host_operations6_(CfgHostOperations::createConfig6()),
class_dictionary_(new ClientClassDictionary()),
decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0),
+ server_threads_(0),
+ server_max_thread_queue_size_(0),
d2_client_config_(new D2ClientConfig()),
configured_globals_(Element::createMap()),
cfg_consist_(new CfgConsistency()) {
cfg_host_operations6_(CfgHostOperations::createConfig6()),
class_dictionary_(new ClientClassDictionary()),
decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0),
+ server_threads_(0),
+ server_max_thread_queue_size_(0),
d2_client_config_(new D2ClientConfig()),
configured_globals_(Element::createMap()),
cfg_consist_(new CfgConsistency()) {
void
SrvConfig::removeStatistics() {
-
// Removes statistics for v4 and v6 subnets
getCfgSubnets4()->removeStatistics();
return (dhcp4o6_port_);
}
+ /// @brief Sets the server thread count.
+ ///
+ /// @param threads value of the server thread count
+ void setServerThreadCount(uint32_t threads) {
+ server_threads_ = threads;
+ }
+
+ /// @brief Retrieves the server thread count.
+ ///
+ /// @return value of the server thread count
+ uint32_t getServerThreadCount() const {
+ return (server_threads_);
+ }
+
+ /// @brief Sets the server max thread queue size.
+ ///
+ /// @param size max thread queue size
+ void setServerMaxThreadQueueSize(uint32_t size) {
+ server_max_thread_queue_size_ = size;
+ }
+
+ /// @brief Retrieves the server max thread queue size.
+ ///
+ /// @return value of the max thread queue size
+ uint32_t getServerMaxThreadQueueSize() const {
+ return (server_max_thread_queue_size_);
+ }
+
/// @brief Returns pointer to the D2 client configuration
D2ClientConfigPtr getD2ClientConfig() {
return (d2_client_config_);
/// this socket is bound and connected to this port and port + 1
uint16_t dhcp4o6_port_;
+ /// @brief The server thread count.
+ uint32_t server_threads_;
+
+ /// @brief The server max thread queue size.
+ uint32_t server_max_thread_queue_size_;
+
/// @brief Stores D2 client configuration
D2ClientConfigPtr d2_client_config_;
typedef boost::shared_ptr<const SrvConfig> ConstSrvConfigPtr;
//@}
-} // namespace isc::dhcp
-} // namespace isc
+} // namespace dhcp
+} // namespace isc
#endif // DHCPSRV_CONFIG_H