From: Francis Dupont Date: Sat, 6 Oct 2018 16:13:11 +0000 (+0200) Subject: [153-netconf-fd-watcher] Added file descriptor watcher code from kea-yang X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c83ea1468a7b2552a5405bbff444b0689368a903;p=thirdparty%2Fkea.git [153-netconf-fd-watcher] Added file descriptor watcher code from kea-yang --- diff --git a/src/bin/netconf/Makefile.am b/src/bin/netconf/Makefile.am index 70c29377d2..b77ce7b917 100644 --- a/src/bin/netconf/Makefile.am +++ b/src/bin/netconf/Makefile.am @@ -44,7 +44,8 @@ BUILT_SOURCES = netconf_messages.h netconf_messages.cc noinst_LTLIBRARIES = libnetconf.la -libnetconf_la_SOURCES = netconf_cfg_mgr.cc netconf_cfg_mgr.h +libnetconf_la_SOURCES = fd_watcher.cc fd_watcher.h +libnetconf_la_SOURCES += netconf_cfg_mgr.cc netconf_cfg_mgr.h libnetconf_la_SOURCES += netconf_config.cc netconf_config.h libnetconf_la_SOURCES += netconf_controller.cc netconf_controller.h libnetconf_la_SOURCES += netconf_log.cc netconf_log.h diff --git a/src/bin/netconf/fd_watcher.cc b/src/bin/netconf/fd_watcher.cc new file mode 100644 index 0000000000..783e72d039 --- /dev/null +++ b/src/bin/netconf/fd_watcher.cc @@ -0,0 +1,306 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +/// @file fd_watcher.cc +/// Contains the file descriptor watcher. + +#include + +#include + +#include +#include +#include + +#include + +using namespace std; +using namespace isc::util::thread; + +namespace isc { +namespace netconf { + +FdWatcherPtr FdWatcher::fd_watcher_; + +FdWatcher::FdWatcher() { +} + +FdWatcherPtr& +FdWatcher::instance() { + if (!fd_watcher_) { + fd_watcher_.reset(new FdWatcher()); + } + return (fd_watcher_); +} + +FdWatcher::~FdWatcher() { + this->clear(); +} + +int +FdWatcher::init(const asiolink::IOServicePtr& io_service) { + // Remember the IO service. + io_service_ = io_service; + + // If it was already called the initial watcher is in the receiving set. + if (!readFds.empty()) { + return (SR_ERR_OK); + } + + // sr_fd_watcher_init is C code. + int watcher_fd = 0; + // Set a termination handler so closed file descriptors can be flushed. + int ret = sr_fd_watcher_init(&watcher_fd, []() { return; }); + if (ret != SR_ERR_OK) { + return (ret); + } + + // Add the watcher file descriptor to the receiving set. + addFd(watcher_fd, true, false); + + // Create the first polling thread. + thread_.reset(new Thread(threadBody)); + + return (ret); +} + +void +FdWatcher::clear() { + // Clear sets. + writeFds.clear(); + readFds.clear(); + + // Wake up the thread. + if (thread_) { + watch_terminate_.markReady(); + thread_->wait(); + thread_.reset(); + } + + // Clear the terminate watch socket. + watch_terminate_.clearReady(); + + // Clean up sysrepo. + sr_fd_watcher_cleanup(); +} + +void +FdWatcher::addFd(int fd, bool reading, bool writing) { + // Ignore return values. + if (reading) { + readFds.insert(fd); + } + if (writing) { + writeFds.insert(fd); + } +} + +void +FdWatcher::delFd(int fd, bool reading, bool writing) { + if (reading) { + auto it = readFds.find(fd); + if (it != readFds.end()) { + readFds.erase(it); + } + } + if (writing) { + auto it = writeFds.find(fd); + if (it != writeFds.end()) { + writeFds.erase(it); + } + } +} + +void +FdWatcher::postHandler() { + // Get the singleton. + FdWatcherPtr watcher = FdWatcher::instance(); + + // Collect the thread. + if (watcher->thread_) { + watcher->thread_->wait(); + watcher->thread_.reset(); + } + + // Nothing to do if not initialized. + if (watcher->readFds.empty()) { + return; + } + + // Get reading file descriptors. + int maxfd = 0; + fd_set rd_set; + FD_ZERO(&rd_set); + set readFds = watcher->readFds; + for (int fd : readFds) { + FD_SET(fd, &rd_set); + if (fd > maxfd) { + maxfd = fd; + } + } + + // Get writing file descriptors. + fd_set wr_set; + FD_ZERO(&wr_set); + set writeFds = watcher->writeFds; + for (int fd : writeFds) { + FD_SET(fd, &wr_set); + if (fd > maxfd) { + maxfd = fd; + } + } + + // No wait, + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + // zero out the errno to be safe. + errno = 0; + + int ret = select(maxfd + 1, &rd_set, &wr_set, 0, &tv); + if (ret < 0) { + if (errno == EINTR) { + // Not lucky... Just retry. + watcher->io_service_->post(postHandler); + return; + } else if (errno == EBADF) { + // A file descriptor was closed before being removed. + for (int fd : readFds) { + if (fcntl(fd, F_GETFD) == -1) { + watcher->delFd(fd, true, true); + } + } + for (int fd : writeFds) { + if (fcntl(fd, F_GETFD) == -1) { + watcher->delFd(fd, true, true); + } + } + // Retry. + watcher->io_service_->post(postHandler); + return; + } else { + isc_throw(Unexpected, "select failed with " << strerror(errno)); + } + } else if (ret != 0) { + // Scan reading file descriptors. + // Work on the copy as the watcher sets can be modified. + bool processed = false; + for (int fd : readFds) { + if (!FD_ISSET(fd, &rd_set)) { + continue; + } + + // sr_fd_event_process is C code. + sr_fd_change_t* change_set = 0; + size_t cnt = 0; + // ignore return because we don't know what to do on error. + sr_fd_event_process(fd, SR_FD_INPUT_READY, &change_set, &cnt); + for (size_t i = 0; i < cnt; ++i) { + if (change_set[i].action == SR_FD_START_WATCHING) { + watcher->addFd(change_set[i].fd, + (change_set[i].events & SR_FD_INPUT_READY) != 0, + (change_set[i].events & SR_FD_OUTPUT_READY) != 0); + } else if (change_set[i].action == SR_FD_STOP_WATCHING) { + watcher->delFd(change_set[i].fd, + (change_set[i].events & SR_FD_INPUT_READY) != 0, + (change_set[i].events & SR_FD_OUTPUT_READY) != 0); + } + } + free(change_set); + processed = true; + break; + } + + // Restart if something was done. + if (processed) { + watcher->io_service_->post(postHandler); + return; + } + + // Scan writing file descriptors. + // Work on the copy as the watcher sets can be modified. + for (int fd : writeFds) { + if (!FD_ISSET(fd, &rd_set)) { + continue; + } + + // sr_fd_event_process is C code. + sr_fd_change_t* change_set = 0; + size_t cnt = 0; + // ignore return because we don't know what to do on error. + sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &change_set, &cnt); + for (size_t i = 0; i < cnt; ++i) { + if (change_set[i].action == SR_FD_START_WATCHING) { + watcher->addFd(change_set[i].fd, + (change_set[i].events & SR_FD_INPUT_READY) != 0, + (change_set[i].events & SR_FD_OUTPUT_READY) != 0); + } else if (change_set[i].action == SR_FD_STOP_WATCHING) { + watcher->delFd(change_set[i].fd, + (change_set[i].events & SR_FD_INPUT_READY) != 0, + (change_set[i].events & SR_FD_OUTPUT_READY) != 0); + } + } + free(change_set); + processed = true; + break; + } + + // Restart if something was done. + if (processed) { + watcher->io_service_->post(postHandler); + return; + } + } + + // Either select() returns 0 or nothing was done. + // Create a new polling thread. + watcher->thread_.reset(new Thread(threadBody)); +} + +void +FdWatcher::threadBody() { + // Get the singleton. + FdWatcherPtr watcher = FdWatcher::instance(); + + // Get reading file descriptors. + int maxfd = 0; + fd_set rd_set; + FD_ZERO(&rd_set); + for (int fd : watcher->readFds) { + FD_SET(fd, &rd_set); + if (fd > maxfd) { + maxfd = fd; + } + } + // Add the terminate watch socket. + int wt_fd = watcher->watch_terminate_.getSelectFd(); + FD_SET(wt_fd, &rd_set); + if (wt_fd > maxfd) { + maxfd = wt_fd; + } + + // Get writing file descriptors. + fd_set wr_set; + FD_ZERO(&wr_set); + for (int fd : watcher->writeFds) { + FD_SET(fd, &wr_set); + if (fd > maxfd) { + maxfd = fd; + } + } + + // Wait until something happens. + select(maxfd + 1, &rd_set, &wr_set, 0, 0); + + // Deal with events in the Kea event loop. + watcher->io_service_->post(postHandler); + + return; +} + +} // namespace netconf +} // namespace isc diff --git a/src/bin/netconf/fd_watcher.h b/src/bin/netconf/fd_watcher.h new file mode 100644 index 0000000000..d4fe603022 --- /dev/null +++ b/src/bin/netconf/fd_watcher.h @@ -0,0 +1,118 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +/// @file netconf_log.h +/// Contains declarations for loggers used by the Kea netconf agent. + +#ifndef NETCONF_FD_WATCHER_H +#define NETCONF_FD_WATCHER_H + +#include +#include +#include + +#include +#include + +#include + +namespace isc { +namespace netconf { + +/// @brief Forward declaration to the @c FdWatcher. +class FdWatcher; + +/// @brief Type definition for the pointer to the @c FdWatcher. +typedef boost::shared_ptr FdWatcherPtr; + +/// @brief Type definition for the pointer to the @c isc::util::thread::Thread. +typedef boost::shared_ptr ThreadPtr; + +/// @brief FdWatcher utility. +/// +/// This class implements a sysrepo file descriptor watcher. +/// The alternative is to leave sysrepo to create its own thread to +/// handle suscriptions with an independent and likely incompatible +/// event loop and of course locking issues. +/// To simplify callbacks this class is implemented as a singleton. +class FdWatcher : public boost::noncopyable { +public: + + /// @brief Static singleton instance method. + /// + /// This method returns the class singleton instance member. + /// It instantiates the singleton upon first invocation. + /// + /// @return the pointer reference to the singleton instance. + static FdWatcherPtr& instance(); + + /// @brief Destructor (final cleanup). + virtual ~FdWatcher(); + + /// @brief Initialize the fd watcher. + /// + /// @param io_service The IO service aka the Kea event loop. + /// @return SR_ERR_OK (0) on success, SR_ERR_XXX (>0) on error. + int init(const asiolink::IOServicePtr& io_service); + + /// @brief Clear the fd watcher. + void clear(); + + /// @brief Add a file descriptor to watch. + /// + /// @param fd The file descriptor to add. + /// @param reading Boolean flag: true to watch for reading. + /// @param writing Boolean flag: true to watch for writing. + void addFd(int fd, bool reading, bool writing); + + /// @brief Delete a file descriptor to watch. + /// + /// @param fd The file descriptor to delete. + /// @param reading Boolean flag: true to watch for reading. + /// @param writing Boolean flag: true to watch for writing. + void delFd(int fd, bool reading, bool writing); + + /// @brief Post handler. + /// + /// The thread posts this handler on the IO service when there + /// should be a file descriptor available for IO. + static void postHandler(); + + /// @brief Thread body. + /// + /// The thread body: select() on file descriptors, when one is + /// available posts fdAvailable and returns. + static void threadBody(); + + /// @brief Shared pointer to the IOService object where to post callbacks. + asiolink::IOServicePtr io_service_; + + /// @brief Terminate watch socket. + /// + /// Used to wake up the thread. + isc::util::WatchSocket watch_terminate_; + + /// @brief Polling thread. + ThreadPtr thread_; + + /// @brief Reading file descriptors. + std::set readFds; + + /// @brief Write file descriptors. + std::set writeFds; + +private: + /// @brief Constructor (private to protect the singleton instance). + FdWatcher(); + + /// @brief Singleton instance value. + static FdWatcherPtr fd_watcher_; +}; + +} // namespace netconf +} // namespace isc + +#endif // NETCONF_FD_WATCHER_H diff --git a/src/bin/netconf/netconf_process.cc b/src/bin/netconf/netconf_process.cc index d3c469a610..45bd94102e 100644 --- a/src/bin/netconf/netconf_process.cc +++ b/src/bin/netconf/netconf_process.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,9 @@ NetconfProcess::run() { LOG_INFO(netconf_logger, NETCONF_STARTED).arg(VERSION); try { + // Initialize file descriptor watcher. + FdWatcher::instance()->init(getIoService()); + // Let's process incoming data or expiring timers in a loop until // shutdown condition is detected. while (!shouldShutdown()) { @@ -51,6 +55,7 @@ NetconfProcess::run() { } catch (const std::exception& ex) { LOG_FATAL(netconf_logger, NETCONF_FAILED).arg(ex.what()); try { + FdWatcher::instance()->clear(); stopIOService(); } catch (...) { // Ignore double errors @@ -59,6 +64,7 @@ NetconfProcess::run() { "Process run method failed: " << ex.what()); } + FdWatcher::instance()->clear(); LOG_DEBUG(netconf_logger, isc::log::DBGLVL_START_SHUT, NETCONF_RUN_EXIT); } diff --git a/src/bin/netconf/tests/Makefile.am b/src/bin/netconf/tests/Makefile.am index 0c14ae51e8..fd821ca5be 100644 --- a/src/bin/netconf/tests/Makefile.am +++ b/src/bin/netconf/tests/Makefile.am @@ -44,7 +44,8 @@ noinst_LTLIBRARIES = libbasic.la TESTS += netconf_unittests -netconf_unittests_SOURCES = get_config_unittest.cc +netconf_unittests_SOURCES = fd_watcher_unittests.cc +netconf_unittests_SOURCES += get_config_unittest.cc netconf_unittests_SOURCES += netconf_cfg_mgr_unittests.cc netconf_unittests_SOURCES += netconf_controller_unittests.cc netconf_unittests_SOURCES += netconf_process_unittests.cc diff --git a/src/bin/netconf/tests/fd_watcher_unittests.cc b/src/bin/netconf/tests/fd_watcher_unittests.cc new file mode 100644 index 0000000000..79b2b284a1 --- /dev/null +++ b/src/bin/netconf/tests/fd_watcher_unittests.cc @@ -0,0 +1,289 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace isc; +using namespace isc::netconf; +using namespace isc::asiolink; +using namespace isc::data; +using namespace isc::yang; +using namespace isc::util::thread; + +namespace { + +/// @brief Test subscribe callback. +class TestCallback : public Callback { +public: + /// @brief Module change callback. + /// + /// @param sess The running datastore session. + /// @param module_name The module name. + /// @param event The event. + /// @param private_ctx The private context. + /// @return the sysrepo return code. + int module_change(S_Session sess, + const char* /*module_name*/, + sr_notif_event_t /*event*/, + void* /*private_ctx*/) { + ++called; + thread_id = pthread_self(); + return (SR_ERR_OK); + } + + /// @brief Called counter. + static size_t called; + + /// @brief Thread ID where the callback was run. + static pthread_t thread_id; +}; + +pthread_t TestCallback::thread_id; +size_t TestCallback::called; + +/// @brief Modify. +/// +/// Used as a thread body to modify the configuration. +void +modify() { + S_Connection conn(new Connection("modify thread")); + S_Session sess(new Session(conn)); + + // Set valid lifetime to 12345. + string xpath = "/kea-dhcp4-server:config/valid-lifetime"; + ConstElementPtr elem = Element::create(12345); + TranslatorBasic tb(sess); + try { + tb.setItem(xpath, elem, SR_UINT32_T); + } catch (const std::exception& ex) { + cerr << "setItem failed with: " << ex.what() << endl; + } + + // Commit is so it becomes visible to subscriptions. + try { + sess->commit(); + } catch (const std::exception& ex) { + cerr << "commit failed with: " << ex.what() << endl; + } + + sess.reset(); + conn.reset(); +} + +// Verifies that before initialization file descriptor sets are empty. +TEST(FdWatcherTest, constructor) { + FdWatcherPtr fw = FdWatcher::instance(); + ASSERT_TRUE(fw); + EXPECT_EQ(0, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + EXPECT_FALSE(fw->thread_); +} + +// Verifies that init works as expected, e.g. there is one element in readFds. +TEST(FdWatcherTest, init) { + FdWatcherPtr fw = FdWatcher::instance(); + ASSERT_TRUE(fw); + IOServicePtr io_service(new IOService()); + ASSERT_TRUE(io_service); + int ret = 0; + EXPECT_NO_THROW(ret = fw->init(io_service)); + + // Check the fd watcher. + EXPECT_EQ(SR_ERR_OK, ret); + EXPECT_EQ(1, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + EXPECT_TRUE(fw->thread_); + + // init can be called twice. + EXPECT_NO_THROW(ret = fw->init(io_service)); + EXPECT_EQ(SR_ERR_OK, ret); + + // Final cleanup (will go in a fixture class). + fw->clear(); + io_service->stop(); + io_service.reset(); +} + +// Verifies that without the fd watcher a second thread handles subscriptions. +TEST(FdWatcherTest, subscribeThread) { + pthread_t main_id = pthread_self(); + S_Connection conn(new Connection("testing main")); + S_Session sess(new Session(conn)); + S_Subscribe subs(new Subscribe(sess)); + S_Callback cb(new TestCallback()); + string model = KEA_DHCP4_SERVER; + + // Check that a subscription opens file descriptors. + int fd = open("/dev/null", O_RDONLY); + close(fd); + ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb)); + int fd2 = open("/dev/null", O_RDONLY); + close(fd2); + EXPECT_LT(fd, fd2); + + // Modify the configuration in a thread. + ThreadPtr th(new Thread(modify)); + th->wait(); + th.reset(); + subs.reset(); + sess.reset(); + conn.reset(); + + // Check the subscribe callback is run in another thread. + EXPECT_NE(main_id, TestCallback::thread_id); +} + +// Verifies that with the fd watcher handles a subscription. +TEST(FdWatcherTest, subscribe) { + FdWatcherPtr fw = FdWatcher::instance(); + ASSERT_TRUE(fw); + IOServicePtr io_service(new IOService()); + ASSERT_TRUE(io_service); + int ret = 0; + ASSERT_NO_THROW(fw->init(io_service)); + ASSERT_EQ(SR_ERR_OK, ret); + EXPECT_EQ(1, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + EXPECT_TRUE(fw->thread_); + + // Get a session and subscribe to module changes. + S_Connection conn(new Connection("testing main")); + S_Session sess(new Session(conn)); + S_Subscribe subs(new Subscribe(sess)); + S_Callback cb(new TestCallback()); + string model = KEA_DHCP4_SERVER; + ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb)); + + // Check some file descriptors were opened. + io_service->poll(); + EXPECT_EQ(3, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + + // Reset subscription and check file descriptors are closed. + subs.reset(); + io_service->poll(); + EXPECT_EQ(1, fw->readFds.size()); + + // Cleanup. + sess.reset(); + conn.reset(); + fw->clear(); + io_service->stop(); + io_service.reset(); +} + +// Verifies that with the fd watcher no second thread handles subscriptions. +TEST(FdWatcherTest, subscribeNoThread) { + FdWatcherPtr fw = FdWatcher::instance(); + ASSERT_TRUE(fw); + IOServicePtr io_service(new IOService()); + ASSERT_TRUE(io_service); + int ret = 0; + ASSERT_NO_THROW(fw->init(io_service)); + ASSERT_EQ(SR_ERR_OK, ret); + EXPECT_EQ(1, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + EXPECT_TRUE(fw->thread_); + + // Get a session and subscribe to module changes. + pthread_t main_id = pthread_self(); + S_Connection conn(new Connection("testing main")); + S_Session sess(new Session(conn)); + S_Subscribe subs(new Subscribe(sess)); + S_Callback cb(new TestCallback()); + string model = KEA_DHCP4_SERVER; + ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb)); + + // Check some file descriptors were opened. + io_service->poll(); + EXPECT_EQ(3, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + + // Modify the configuration in a thread. + TestCallback::called = 0; + ThreadPtr th(new Thread(modify)); + while (TestCallback::called == 0) { + io_service->run_one(); + } + io_service->poll(); + th->wait(); + th.reset(); + + // Reset subscription and check file descriptors are closed. + subs.reset(); + io_service->poll(); + EXPECT_EQ(1, fw->readFds.size()); + + // Cleanup. + sess.reset(); + conn.reset(); + fw->clear(); + io_service->stop(); + io_service.reset(); + + // Check the subscribe callback is run in the Kea thread. + EXPECT_EQ(main_id, TestCallback::thread_id); +} + +// Verifies that with the fd watcher handles subscribe/unsubscribe sequence. +TEST(FdWatcherTest, unsubscribe) { + FdWatcherPtr fw = FdWatcher::instance(); + ASSERT_TRUE(fw); + IOServicePtr io_service(new IOService()); + ASSERT_TRUE(io_service); + int ret = 0; + ASSERT_NO_THROW(fw->init(io_service)); + ASSERT_EQ(SR_ERR_OK, ret); + EXPECT_EQ(1, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + EXPECT_TRUE(fw->thread_); + + // Get a session and subscribe to module changes. + S_Connection conn(new Connection("testing main")); + S_Session sess(new Session(conn)); + S_Subscribe subs1(new Subscribe(sess)); + S_Callback cb(new TestCallback()); + string model = KEA_DHCP4_SERVER; + ASSERT_NO_THROW(subs1->module_change_subscribe(model.c_str(), cb)); + + // Check some file descriptors were opened. + io_service->poll(); + EXPECT_EQ(3, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + + // Reset subscription and check file descriptors are closed. + subs1.reset(); + io_service->poll(); + EXPECT_EQ(1, fw->readFds.size()); + + // Try again to subscribe. + S_Subscribe subs2(new Subscribe(sess)); + ASSERT_NO_THROW(subs2->module_change_subscribe(model.c_str(), cb)); + io_service->poll(); + EXPECT_EQ(3, fw->readFds.size()); + EXPECT_EQ(0, fw->writeFds.size()); + + // Unsubscribe. + subs2.reset(); + io_service->poll(); + EXPECT_EQ(1, fw->readFds.size()); + + // Cleanup. + sess.reset(); + conn.reset(); + fw->clear(); + io_service->stop(); + io_service.reset(); +} + +}