server_ = this; // remember this instance for later use in handlers
TimerMgr::instance()->setIOService(getIOService());
+ CommandMgr::instance().setIOService(getIOService());
// These are the commands always supported by the DHCPv4 server.
// Please keep the list in alphabetic order.
#include <config.h>
+#include <asiolink/io_service.h>
#include <cc/command_interpreter.h>
#include <config/command_mgr.h>
#include <dhcp/dhcp4.h>
#include <testutils/io_utils.h>
#include <testutils/unix_control_client.h>
+#include <util/threads/thread.h>
+
#include "marker_file.h"
#include "test_libraries.h"
~CtrlChannelDhcpv4SrvTest() {
LeaseMgrFactory::destroy();
StatsMgr::instance().removeAll();
+
+ CommandMgr::instance().closeCommandSocket();
+ if (getIOService()) {
+ getIOService()->stopWork();
+ getIOService()->poll();
+ }
+
server_.reset();
- reset();
};
+ /// @brief Returns pointer to the server's IO service.
+ IOServicePtr getIOService() {
+ return (server_ ? server_->getIOService() : IOServicePtr());
+ }
+
void createUnixChannelServer() {
::remove(socket_path_.c_str());
// make the actual configuration text.
std::string config_txt = header + socket_path_ + footer;
- ASSERT_NO_THROW(server_.reset(new NakedControlledDhcpv4Srv()));
+ server_.reset(new NakedControlledDhcpv4Srv());
+ CommandMgr::instance().setIOService(getIOService());
ConstElementPtr config;
ASSERT_NO_THROW(config = parseDHCP4(config_txt));
// detect the control socket connect and call the accept handler
ASSERT_TRUE(client->connectToServer(socket_path_));
ASSERT_NO_THROW(server_->receivePacket(0));
+ ASSERT_NO_THROW(getIOService()->run_one());
// Send the command and then call server's receivePacket() so it can
// detect the inbound data and call the read handler
ASSERT_TRUE(client->sendCommand(command));
ASSERT_NO_THROW(server_->receivePacket(0));
+ ASSERT_NO_THROW(getIOService()->run_one());
// Read the response generated by the server. Note that getResponse
// only fails if there an IO error or no response data was present.
// Now disconnect and process the close event
client->disconnectFromServer();
ASSERT_NO_THROW(server_->receivePacket(0));
+// ASSERT_NO_THROW(getIOService()->run_one());
+
+ ASSERT_NO_THROW(getIOService()->poll());
}
/// @brief Checks response for list-commands
ASSERT_NO_THROW(
server_.reset(new NakedControlledDhcpv4Srv());
+ CommandMgr::instance().setIOService(getIOService());
);
// Use empty parameters list
// Created server should register several additional commands.
ASSERT_NO_THROW(
server_.reset(new NakedControlledDhcpv4Srv());
+ CommandMgr::instance().setIOService(getIOService());
);
EXPECT_NO_THROW(answer = CommandMgr::instance().processCommand(list_cmds));
EXPECT_EQ("{ \"result\": 0, \"text\": \"Configuration successful.\" }",
response);
- // Check that the config was indeed applied.
- const Subnet4Collection* subnets =
+ /// Check that the config was indeed applied.
+ /* const Subnet4Collection* subnets =
CfgMgr::instance().getCurrentCfg()->getCfgSubnets4()->getAll();
EXPECT_EQ(1, subnets->size());
EXPECT_EQ(2, subnets->size());
// Clean up after the test.
- CfgMgr::instance().clear();
+ CfgMgr::instance().clear(); */
}
// Tests that the server properly responds to shtudown command sent
--- /dev/null
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef IO_ACCEPTOR_H
+#define IO_ACCEPTOR_H
+
+#ifndef BOOST_ASIO_HPP
+#error "asio.hpp must be included before including this, see asiolink.h as to why"
+#endif
+
+#include <asiolink/io_service.h>
+#include <asiolink/io_socket.h>
+
+namespace isc {
+namespace asiolink {
+
+template<typename ProtocolType, typename CallbackType>
+class IOAcceptor : public IOSocket {
+public:
+
+ /// @brief Constructor.
+ ///
+ /// @param io_service IO service.
+ explicit IOAcceptor(IOService& io_service)
+ : IOSocket(),
+ acceptor_(new typename ProtocolType::acceptor(io_service.get_io_service())) {
+ }
+
+ /// @brief Destructor.
+ virtual ~IOAcceptor() { }
+
+ /// @brief Returns file descriptor of the underlying socket.
+ virtual int getNative() const {
+ return (acceptor_->native());
+ }
+
+ /// @brief Opens acceptor socket given the endpoint.
+ ///
+ /// @param endpoint Reference to the endpoint object which specifies the
+ /// address and port on which the acceptor service will run.
+ template<typename EndpointType>
+ void open(const EndpointType& endpoint) {
+ acceptor_->open(endpoint.getASIOEndpoint().protocol());
+ }
+
+ /// @brief Binds socket to an endpoint.
+ ///
+ /// @param endpoint Reference to an endpoint to which the socket is to
+ /// be bound.
+ template<typename EndpointType>
+ void bind(const EndpointType& endpoint) {
+ acceptor_->bind(endpoint.getASIOEndpoint());
+ }
+
+ /// @brief Sets socket option.
+ ///
+ /// @param socket_option Reference to the object encapsulating an option to
+ /// be set for the socket.
+ /// @tparam SettableSocketOption Type of the object encapsulating socket option
+ /// being set.
+ template<typename SettableSocketOption>
+ void setOption(const SettableSocketOption& socket_option) {
+ acceptor_->set_option(socket_option);
+ }
+
+ /// @brief Starts listening for the new connections.
+ void listen() {
+ acceptor_->listen();
+ }
+
+ template<template<typename> class SocketType, typename SocketCallback>
+ void asyncAccept(const SocketType<SocketCallback>& socket,
+ const CallbackType& callback) {
+ acceptor_->async_accept(socket.getASIOSocket(), callback);
+ }
+
+ /// @brief Checks if the acceptor is open.
+ ///
+ /// @return true if acceptor is open.
+ bool isOpen() const {
+ return (acceptor_->is_open());
+ }
+
+ /// @brief Closes the acceptor.
+ void close() const {
+ acceptor_->close();
+ }
+
+protected:
+
+ /// @brief Asynchronously accept new connection.
+ ///
+ /// This method accepts new connection into the specified socket. When the
+ /// new connection arrives or an error occurs the specified callback function
+ /// is invoked.
+ ///
+ /// @param socket Socket into which connection should be accepted.
+ /// @param callback Callback function to be invoked when the new connection
+ /// arrives.
+ /// @tparam SocketType
+ template<typename SocketType>
+ void asyncAcceptInternal(const SocketType& socket, const CallbackType& callback) {
+ acceptor_->async_accept(socket.getASIOSocket(), callback);
+ }
+
+
+ /// @brief Underlying ASIO acceptor implementation.
+ boost::shared_ptr<typename ProtocolType::acceptor> acceptor_;
+
+};
+
+
+} // end of namespace asiolink
+} // end of isc
+
+#endif // IO_ACCEPTOR_H
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+// file obtain one at http://mozilla.org/MPL/2.0/.
#ifndef IO_SOCKET_H
#define IO_SOCKET_H 1
impl_->close();
}
+boost::asio::local::stream_protocol::socket&
+UnixDomainSocket::getASIOSocket() const {
+ return (impl_->socket_);
+}
+
} // end of namespace asiolink
} // end of namespace isc
/// @brief Closes the socket.
void close();
+ /// @brief Returns reference to the underlying ASIO socket.
+ ///
+ /// @return Reference to underlying ASIO socket.
+ virtual boost::asio::local::stream_protocol::socket& getASIOSocket() const;
+
private:
/// @brief Pointer to the implementation of this class.
--- /dev/null
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef UNIX_DOMAIN_SOCKET_ACCEPTOR_H
+#define UNIX_DOMAIN_SOCKET_ACCEPTOR_H
+
+#ifndef BOOST_ASIO_HPP
+#error "asio.hpp must be included before including this, see asiolink.h as to why"
+#endif
+
+#include <asiolink/io_acceptor.h>
+#include <asiolink/unix_domain_socket.h>
+#include <functional>
+
+namespace isc {
+namespace asiolink {
+
+class UnixDomainSocketAcceptor
+ : public IOAcceptor<boost::asio::local::stream_protocol,
+ std::function<void(const boost::system::error_code&)> > {
+public:
+
+ /// @brief Callback type used in call to @ref UnixDomainSocketAcceptor::asyncAccept.
+ typedef std::function<void(const boost::system::error_code&)> AcceptHandler;
+
+ explicit UnixDomainSocketAcceptor(IOService& io_service)
+ : IOAcceptor<boost::asio::local::stream_protocol,
+ std::function<void(const boost::system::error_code&)> >(io_service) {
+ }
+
+ virtual int getProtocol() const final {
+ return (AF_LOCAL);
+ }
+
+ /// @brief Asynchronously accept new connection.
+ ///
+ /// This method accepts new connection into the specified socket. When the
+ /// new connection arrives or an error occurs the specified callback function
+ /// is invoked.
+ ///
+ /// @param socket Socket into which connection should be accepted.
+ /// @param callback Callback function to be invoked when the new connection
+ /// arrives.
+ /// @tparam SocketType
+ void asyncAccept(const UnixDomainSocket& socket, const AcceptHandler& callback) {
+ asyncAcceptInternal(socket, callback);
+ }
+};
+
+} // end of namespace isc::asiolink
+} // end of namespace isc
+
+#endif // UNIX_DOMAIN_SOCKET_ACCEPTOR_H
--- /dev/null
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef UNIX_DOMAIN_SOCKET_ENDPOINT_H
+#define UNIX_DOMAIN_SOCKET_ENDPOINT_H
+
+#ifndef BOOST_ASIO_HPP
+#error "asio.hpp must be included before including this, see asiolink.h as to why"
+#endif
+
+#include <string>
+
+namespace isc {
+namespace asiolink {
+
+class UnixDomainSocketEndpoint {
+public:
+
+ explicit UnixDomainSocketEndpoint(const std::string& endpoint_path)
+ : endpoint_(endpoint_path) {
+ }
+
+ const boost::asio::local::stream_protocol::endpoint&
+ getASIOEndpoint() const {
+ return (endpoint_);
+ }
+
+private:
+
+ boost::asio::local::stream_protocol::endpoint endpoint_;
+
+};
+
+}
+}
+
+#endif // UNIX_DOMAIN_SOCKET_ENDPOINT_H
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/io_service.h>
+#include <asiolink/unix_domain_socket.h>
+#include <asiolink/unix_domain_socket_acceptor.h>
+#include <asiolink/unix_domain_socket_endpoint.h>
#include <config/command_mgr.h>
#include <config/command_socket_factory.h>
#include <cc/data.h>
#include <dhcp/iface_mgr.h>
#include <config/config_log.h>
#include <boost/bind.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <array>
#include <unistd.h>
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::config;
using namespace isc::data;
+namespace {
+
+class ConnectionPool;
+
+class Connection : public boost::enable_shared_from_this<Connection> {
+public:
+
+ Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
+ ConnectionPool& connection_pool)
+ : socket_(socket), connection_pool_(connection_pool),
+ response_in_progress_(false) {
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(),
+ 0);
+
+ }
+
+ void start() {
+ socket_->asyncReceive(&buf_[0], sizeof(buf_),
+ boost::bind(&Connection::receiveHandler,
+ shared_from_this(), _1, _2));
+
+
+ }
+
+ void stop() {
+ if (!response_in_progress_) {
+ socket_->close();
+ }
+ }
+
+ void receiveHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred);
+
+private:
+
+ boost::shared_ptr<UnixDomainSocket> socket_;
+
+ std::array<char, 65535> buf_;
+
+ ConnectionPool& connection_pool_;
+
+ bool response_in_progress_;
+
+};
+
+
+typedef boost::shared_ptr<Connection> ConnectionPtr;
+
+class ConnectionPool {
+public:
+
+ void start(const ConnectionPtr& connection) {
+ connection->start();
+ connections_.insert(connection);
+ }
+
+ void stop(const ConnectionPtr& connection) {
+ connection->stop();
+ connections_.erase(connection);
+ }
+
+ void stopAll() {
+ for (auto conn = connections_.begin(); conn != connections_.end();
+ ++conn) {
+ (*conn)->stop();
+ }
+ connections_.clear();
+ }
+
+private:
+
+ std::set<ConnectionPtr> connections_;
+
+};
+
+
+void
+Connection::receiveHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ if (ec) {
+ return;
+ }
+
+ ConstElementPtr cmd, rsp;
+
+ try {
+
+ response_in_progress_ = true;
+
+ // Try to interpret it as JSON.
+ std::string sbuf(&buf_[0], bytes_transferred);
+ cmd = Element::fromJSON(sbuf, true);
+
+ // If successful, then process it as a command.
+ rsp = CommandMgr::instance().processCommand(cmd);
+ } catch (const Exception& ex) {
+ LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
+ rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
+ }
+
+ if (!rsp) {
+ response_in_progress_ = false;
+ LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR);
+ return;
+ }
+
+ // Let's convert JSON response to text. Note that at this stage
+ // the rsp pointer is always set.
+ std::string txt = rsp->str();
+ size_t len = txt.length();
+ if (len > 65535) {
+ // Hmm, our response is too large. Let's send the first
+ // 64KB and hope for the best.
+ LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
+
+ len = 65535;
+ }
+
+ // Send the data back over socket.
+ socket_->write(txt.c_str(), len);
+
+ response_in_progress_ = false;
+
+
+ isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
+
+ connection_pool_.stop(shared_from_this());
+}
+
+}
+
namespace isc {
namespace config {
-CommandMgr::CommandMgr()
- : HookedCommandMgr() {
-}
+class CommandMgrImpl {
+public:
-CommandSocketPtr
-CommandMgr::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
- if (socket_) {
- isc_throw(SocketError, "There is already a control socket open");
+ CommandMgrImpl()
+ : io_service_(), acceptor_(), socket_(), socket_name_(),
+ connection_pool_() {
}
- socket_ = CommandSocketFactory::create(socket_info);
+ void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
+
+ void doAccept();
- return (socket_);
+ IOServicePtr io_service_;
+
+ boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
+
+ boost::shared_ptr<UnixDomainSocket> socket_;
+
+ std::string socket_name_;
+
+ ConnectionPool connection_pool_;
+};
+
+void
+CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
+ socket_name_.clear();
+
+ ConstElementPtr type = socket_info->get("socket-type");
+ if (!type) {
+ isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
+ }
+
+ if (type->stringValue() != "unix") {
+ isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
+ << type->stringValue());
+ }
+
+ // UNIX socket is requested. It takes one parameter: socket-name that
+ // specifies UNIX path of the socket.
+ ConstElementPtr name = socket_info->get("socket-name");
+ if (!name) {
+ isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
+ }
+
+ if (name->getType() != Element::string) {
+ isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
+ }
+
+ socket_name_ = name->stringValue();
+
+ acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
+ UnixDomainSocketEndpoint endpoint(socket_name_);
+ acceptor_->open(endpoint);
+ acceptor_->bind(endpoint);
+ acceptor_->listen();
+
+ doAccept();
+
+ // Install this socket in Interface Manager.
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
+
+}
+
+void
+CommandMgrImpl::doAccept() {
+ socket_.reset(new UnixDomainSocket(*io_service_));
+ acceptor_->asyncAccept(*socket_,
+ [this](const boost::system::error_code& ec) {
+ if (!ec) {
+ ConnectionPtr connection(new Connection(socket_, connection_pool_));
+ connection_pool_.start(connection);
+ doAccept();
+ }
+ });
+}
+
+CommandMgr::CommandMgr()
+ : HookedCommandMgr(), impl_(new CommandMgrImpl()) {
+}
+
+void
+CommandMgr::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
+ impl_->openCommandSocket(socket_info);
}
void CommandMgr::closeCommandSocket() {
- // First, let's close the socket for incoming new connections.
- if (socket_) {
- socket_->close();
- socket_.reset();
+ impl_->connection_pool_.stopAll();
+
+ if (impl_->acceptor_) {
+ isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
+ impl_->acceptor_->close();
+ static_cast<void>(::remove(impl_->socket_name_.c_str()));
}
- // Now let's close all existing connections that we may have.
+/* // Now let's close all existing connections that we may have.
for (std::list<CommandSocketPtr>::iterator conn = connections_.begin();
conn != connections_.end(); ++conn) {
(*conn)->close();
}
- connections_.clear();
+ connections_.clear(); */
}
return (false);
}
+int
+CommandMgr::getControlSocketFD() {
+ return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
+}
+
+
CommandMgr&
CommandMgr::instance() {
static CommandMgr cmd_mgr;
return (cmd_mgr);
}
+void
+CommandMgr::setIOService(const IOServicePtr& io_service) {
+ closeCommandSocket();
+ impl_->io_service_ = io_service;
+}
+
void
CommandMgr::commandReader(int sockfd) {
#ifndef COMMAND_MGR_H
#define COMMAND_MGR_H
+#include <asiolink/io_service.h>
#include <cc/data.h>
#include <config/hooked_command_mgr.h>
#include <config/command_socket.h>
#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
#include <list>
namespace isc {
namespace config {
+class CommandMgrImpl;
+
/// @brief Commands Manager implementation for the Kea servers.
///
/// This class extends @ref BaseCommandMgr with the ability to receive and
/// @return the only existing instance of the manager
static CommandMgr& instance();
+ /// @brief Sets IO service to be used by the command manager.
+ ///
+ /// @param io_service Pointer to the IO service.
+ void setIOService(const asiolink::IOServicePtr& io_service);
+
/// @brief Opens control socket with parameters specified in socket_info
///
/// Currently supported types are:
///
/// @param socket_info describes control socket parameters
/// @return object representing a socket
- CommandSocketPtr
+ void
openCommandSocket(const isc::data::ConstElementPtr& socket_info);
/// @brief Shuts down any open control sockets
/// @brief Returns control socket descriptor
///
/// This method should be used only in tests.
- int getControlSocketFD() const {
- return (socket_->getFD());
- }
+ int getControlSocketFD();
private:
/// Registers internal 'list-commands' command.
CommandMgr();
+ boost::shared_ptr<CommandMgrImpl> impl_;
+
/// @brief Control socket structure
///
/// This is the socket that accepts incoming connections. There can be at