noinst_LTLIBRARIES = libnetconf.la
-libnetconf_la_SOURCES = netconf_cfg_mgr.cc netconf_cfg_mgr.h
+libnetconf_la_SOURCES = fd_watcher.cc fd_watcher.h
+libnetconf_la_SOURCES += netconf_cfg_mgr.cc netconf_cfg_mgr.h
libnetconf_la_SOURCES += netconf_config.cc netconf_config.h
libnetconf_la_SOURCES += netconf_controller.cc netconf_controller.h
libnetconf_la_SOURCES += netconf_log.cc netconf_log.h
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+/// @file fd_watcher.cc
+/// Contains the file descriptor watcher.
+
+#include <config.h>
+
+#include <netconf/fd_watcher.h>
+
+#include <boost/function.hpp>
+#include <sys/select.h>
+#include <fcntl.h>
+
+#include <sysrepo.h>
+
+using namespace std;
+using namespace isc::util::thread;
+
+namespace isc {
+namespace netconf {
+
+FdWatcherPtr FdWatcher::fd_watcher_;
+
+FdWatcher::FdWatcher() {
+}
+
+FdWatcherPtr&
+FdWatcher::instance() {
+ if (!fd_watcher_) {
+ fd_watcher_.reset(new FdWatcher());
+ }
+ return (fd_watcher_);
+}
+
+FdWatcher::~FdWatcher() {
+ this->clear();
+}
+
+int
+FdWatcher::init(const asiolink::IOServicePtr& io_service) {
+ // Remember the IO service.
+ io_service_ = io_service;
+
+ // If it was already called the initial watcher is in the receiving set.
+ if (!readFds.empty()) {
+ return (SR_ERR_OK);
+ }
+
+ // sr_fd_watcher_init is C code.
+ int watcher_fd = 0;
+ // Set a termination handler so closed file descriptors can be flushed.
+ int ret = sr_fd_watcher_init(&watcher_fd, []() { return; });
+ if (ret != SR_ERR_OK) {
+ return (ret);
+ }
+
+ // Add the watcher file descriptor to the receiving set.
+ addFd(watcher_fd, true, false);
+
+ // Create the first polling thread.
+ thread_.reset(new Thread(threadBody));
+
+ return (ret);
+}
+
+void
+FdWatcher::clear() {
+ // Clear sets.
+ writeFds.clear();
+ readFds.clear();
+
+ // Wake up the thread.
+ if (thread_) {
+ watch_terminate_.markReady();
+ thread_->wait();
+ thread_.reset();
+ }
+
+ // Clear the terminate watch socket.
+ watch_terminate_.clearReady();
+
+ // Clean up sysrepo.
+ sr_fd_watcher_cleanup();
+}
+
+void
+FdWatcher::addFd(int fd, bool reading, bool writing) {
+ // Ignore return values.
+ if (reading) {
+ readFds.insert(fd);
+ }
+ if (writing) {
+ writeFds.insert(fd);
+ }
+}
+
+void
+FdWatcher::delFd(int fd, bool reading, bool writing) {
+ if (reading) {
+ auto it = readFds.find(fd);
+ if (it != readFds.end()) {
+ readFds.erase(it);
+ }
+ }
+ if (writing) {
+ auto it = writeFds.find(fd);
+ if (it != writeFds.end()) {
+ writeFds.erase(it);
+ }
+ }
+}
+
+void
+FdWatcher::postHandler() {
+ // Get the singleton.
+ FdWatcherPtr watcher = FdWatcher::instance();
+
+ // Collect the thread.
+ if (watcher->thread_) {
+ watcher->thread_->wait();
+ watcher->thread_.reset();
+ }
+
+ // Nothing to do if not initialized.
+ if (watcher->readFds.empty()) {
+ return;
+ }
+
+ // Get reading file descriptors.
+ int maxfd = 0;
+ fd_set rd_set;
+ FD_ZERO(&rd_set);
+ set<int> readFds = watcher->readFds;
+ for (int fd : readFds) {
+ FD_SET(fd, &rd_set);
+ if (fd > maxfd) {
+ maxfd = fd;
+ }
+ }
+
+ // Get writing file descriptors.
+ fd_set wr_set;
+ FD_ZERO(&wr_set);
+ set<int> writeFds = watcher->writeFds;
+ for (int fd : writeFds) {
+ FD_SET(fd, &wr_set);
+ if (fd > maxfd) {
+ maxfd = fd;
+ }
+ }
+
+ // No wait,
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ // zero out the errno to be safe.
+ errno = 0;
+
+ int ret = select(maxfd + 1, &rd_set, &wr_set, 0, &tv);
+ if (ret < 0) {
+ if (errno == EINTR) {
+ // Not lucky... Just retry.
+ watcher->io_service_->post(postHandler);
+ return;
+ } else if (errno == EBADF) {
+ // A file descriptor was closed before being removed.
+ for (int fd : readFds) {
+ if (fcntl(fd, F_GETFD) == -1) {
+ watcher->delFd(fd, true, true);
+ }
+ }
+ for (int fd : writeFds) {
+ if (fcntl(fd, F_GETFD) == -1) {
+ watcher->delFd(fd, true, true);
+ }
+ }
+ // Retry.
+ watcher->io_service_->post(postHandler);
+ return;
+ } else {
+ isc_throw(Unexpected, "select failed with " << strerror(errno));
+ }
+ } else if (ret != 0) {
+ // Scan reading file descriptors.
+ // Work on the copy as the watcher sets can be modified.
+ bool processed = false;
+ for (int fd : readFds) {
+ if (!FD_ISSET(fd, &rd_set)) {
+ continue;
+ }
+
+ // sr_fd_event_process is C code.
+ sr_fd_change_t* change_set = 0;
+ size_t cnt = 0;
+ // ignore return because we don't know what to do on error.
+ sr_fd_event_process(fd, SR_FD_INPUT_READY, &change_set, &cnt);
+ for (size_t i = 0; i < cnt; ++i) {
+ if (change_set[i].action == SR_FD_START_WATCHING) {
+ watcher->addFd(change_set[i].fd,
+ (change_set[i].events & SR_FD_INPUT_READY) != 0,
+ (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+ } else if (change_set[i].action == SR_FD_STOP_WATCHING) {
+ watcher->delFd(change_set[i].fd,
+ (change_set[i].events & SR_FD_INPUT_READY) != 0,
+ (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+ }
+ }
+ free(change_set);
+ processed = true;
+ break;
+ }
+
+ // Restart if something was done.
+ if (processed) {
+ watcher->io_service_->post(postHandler);
+ return;
+ }
+
+ // Scan writing file descriptors.
+ // Work on the copy as the watcher sets can be modified.
+ for (int fd : writeFds) {
+ if (!FD_ISSET(fd, &rd_set)) {
+ continue;
+ }
+
+ // sr_fd_event_process is C code.
+ sr_fd_change_t* change_set = 0;
+ size_t cnt = 0;
+ // ignore return because we don't know what to do on error.
+ sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &change_set, &cnt);
+ for (size_t i = 0; i < cnt; ++i) {
+ if (change_set[i].action == SR_FD_START_WATCHING) {
+ watcher->addFd(change_set[i].fd,
+ (change_set[i].events & SR_FD_INPUT_READY) != 0,
+ (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+ } else if (change_set[i].action == SR_FD_STOP_WATCHING) {
+ watcher->delFd(change_set[i].fd,
+ (change_set[i].events & SR_FD_INPUT_READY) != 0,
+ (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+ }
+ }
+ free(change_set);
+ processed = true;
+ break;
+ }
+
+ // Restart if something was done.
+ if (processed) {
+ watcher->io_service_->post(postHandler);
+ return;
+ }
+ }
+
+ // Either select() returns 0 or nothing was done.
+ // Create a new polling thread.
+ watcher->thread_.reset(new Thread(threadBody));
+}
+
+void
+FdWatcher::threadBody() {
+ // Get the singleton.
+ FdWatcherPtr watcher = FdWatcher::instance();
+
+ // Get reading file descriptors.
+ int maxfd = 0;
+ fd_set rd_set;
+ FD_ZERO(&rd_set);
+ for (int fd : watcher->readFds) {
+ FD_SET(fd, &rd_set);
+ if (fd > maxfd) {
+ maxfd = fd;
+ }
+ }
+ // Add the terminate watch socket.
+ int wt_fd = watcher->watch_terminate_.getSelectFd();
+ FD_SET(wt_fd, &rd_set);
+ if (wt_fd > maxfd) {
+ maxfd = wt_fd;
+ }
+
+ // Get writing file descriptors.
+ fd_set wr_set;
+ FD_ZERO(&wr_set);
+ for (int fd : watcher->writeFds) {
+ FD_SET(fd, &wr_set);
+ if (fd > maxfd) {
+ maxfd = fd;
+ }
+ }
+
+ // Wait until something happens.
+ select(maxfd + 1, &rd_set, &wr_set, 0, 0);
+
+ // Deal with events in the Kea event loop.
+ watcher->io_service_->post(postHandler);
+
+ return;
+}
+
+} // namespace netconf
+} // namespace isc
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+/// @file netconf_log.h
+/// Contains declarations for loggers used by the Kea netconf agent.
+
+#ifndef NETCONF_FD_WATCHER_H
+#define NETCONF_FD_WATCHER_H
+
+#include <util/watch_socket.h>
+#include <util/threads/thread.h>
+#include <asiolink/io_service.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <set>
+
+namespace isc {
+namespace netconf {
+
+/// @brief Forward declaration to the @c FdWatcher.
+class FdWatcher;
+
+/// @brief Type definition for the pointer to the @c FdWatcher.
+typedef boost::shared_ptr<FdWatcher> FdWatcherPtr;
+
+/// @brief Type definition for the pointer to the @c isc::util::thread::Thread.
+typedef boost::shared_ptr<isc::util::thread::Thread> ThreadPtr;
+
+/// @brief FdWatcher utility.
+///
+/// This class implements a sysrepo file descriptor watcher.
+/// The alternative is to leave sysrepo to create its own thread to
+/// handle suscriptions with an independent and likely incompatible
+/// event loop and of course locking issues.
+/// To simplify callbacks this class is implemented as a singleton.
+class FdWatcher : public boost::noncopyable {
+public:
+
+ /// @brief Static singleton instance method.
+ ///
+ /// This method returns the class singleton instance member.
+ /// It instantiates the singleton upon first invocation.
+ ///
+ /// @return the pointer reference to the singleton instance.
+ static FdWatcherPtr& instance();
+
+ /// @brief Destructor (final cleanup).
+ virtual ~FdWatcher();
+
+ /// @brief Initialize the fd watcher.
+ ///
+ /// @param io_service The IO service aka the Kea event loop.
+ /// @return SR_ERR_OK (0) on success, SR_ERR_XXX (>0) on error.
+ int init(const asiolink::IOServicePtr& io_service);
+
+ /// @brief Clear the fd watcher.
+ void clear();
+
+ /// @brief Add a file descriptor to watch.
+ ///
+ /// @param fd The file descriptor to add.
+ /// @param reading Boolean flag: true to watch for reading.
+ /// @param writing Boolean flag: true to watch for writing.
+ void addFd(int fd, bool reading, bool writing);
+
+ /// @brief Delete a file descriptor to watch.
+ ///
+ /// @param fd The file descriptor to delete.
+ /// @param reading Boolean flag: true to watch for reading.
+ /// @param writing Boolean flag: true to watch for writing.
+ void delFd(int fd, bool reading, bool writing);
+
+ /// @brief Post handler.
+ ///
+ /// The thread posts this handler on the IO service when there
+ /// should be a file descriptor available for IO.
+ static void postHandler();
+
+ /// @brief Thread body.
+ ///
+ /// The thread body: select() on file descriptors, when one is
+ /// available posts fdAvailable and returns.
+ static void threadBody();
+
+ /// @brief Shared pointer to the IOService object where to post callbacks.
+ asiolink::IOServicePtr io_service_;
+
+ /// @brief Terminate watch socket.
+ ///
+ /// Used to wake up the thread.
+ isc::util::WatchSocket watch_terminate_;
+
+ /// @brief Polling thread.
+ ThreadPtr thread_;
+
+ /// @brief Reading file descriptors.
+ std::set<int> readFds;
+
+ /// @brief Write file descriptors.
+ std::set<int> writeFds;
+
+private:
+ /// @brief Constructor (private to protect the singleton instance).
+ FdWatcher();
+
+ /// @brief Singleton instance value.
+ static FdWatcherPtr fd_watcher_;
+};
+
+} // namespace netconf
+} // namespace isc
+
+#endif // NETCONF_FD_WATCHER_H
#include <netconf/netconf_process.h>
#include <netconf/netconf_controller.h>
#include <netconf/netconf_log.h>
+#include <netconf/fd_watcher.h>
#include <asiolink/io_address.h>
#include <asiolink/io_error.h>
#include <cc/command_interpreter.h>
LOG_INFO(netconf_logger, NETCONF_STARTED).arg(VERSION);
try {
+ // Initialize file descriptor watcher.
+ FdWatcher::instance()->init(getIoService());
+
// Let's process incoming data or expiring timers in a loop until
// shutdown condition is detected.
while (!shouldShutdown()) {
} catch (const std::exception& ex) {
LOG_FATAL(netconf_logger, NETCONF_FAILED).arg(ex.what());
try {
+ FdWatcher::instance()->clear();
stopIOService();
} catch (...) {
// Ignore double errors
"Process run method failed: " << ex.what());
}
+ FdWatcher::instance()->clear();
LOG_DEBUG(netconf_logger, isc::log::DBGLVL_START_SHUT, NETCONF_RUN_EXIT);
}
TESTS += netconf_unittests
-netconf_unittests_SOURCES = get_config_unittest.cc
+netconf_unittests_SOURCES = fd_watcher_unittests.cc
+netconf_unittests_SOURCES += get_config_unittest.cc
netconf_unittests_SOURCES += netconf_cfg_mgr_unittests.cc
netconf_unittests_SOURCES += netconf_controller_unittests.cc
netconf_unittests_SOURCES += netconf_process_unittests.cc
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <netconf/fd_watcher.h>
+#include <yang/translator.h>
+#include <yang/yang_models.h>
+#include <asiolink/io_service.h>
+#include <gtest/gtest.h>
+#include <fcntl.h>
+
+using namespace std;
+using namespace isc;
+using namespace isc::netconf;
+using namespace isc::asiolink;
+using namespace isc::data;
+using namespace isc::yang;
+using namespace isc::util::thread;
+
+namespace {
+
+/// @brief Test subscribe callback.
+class TestCallback : public Callback {
+public:
+ /// @brief Module change callback.
+ ///
+ /// @param sess The running datastore session.
+ /// @param module_name The module name.
+ /// @param event The event.
+ /// @param private_ctx The private context.
+ /// @return the sysrepo return code.
+ int module_change(S_Session sess,
+ const char* /*module_name*/,
+ sr_notif_event_t /*event*/,
+ void* /*private_ctx*/) {
+ ++called;
+ thread_id = pthread_self();
+ return (SR_ERR_OK);
+ }
+
+ /// @brief Called counter.
+ static size_t called;
+
+ /// @brief Thread ID where the callback was run.
+ static pthread_t thread_id;
+};
+
+pthread_t TestCallback::thread_id;
+size_t TestCallback::called;
+
+/// @brief Modify.
+///
+/// Used as a thread body to modify the configuration.
+void
+modify() {
+ S_Connection conn(new Connection("modify thread"));
+ S_Session sess(new Session(conn));
+
+ // Set valid lifetime to 12345.
+ string xpath = "/kea-dhcp4-server:config/valid-lifetime";
+ ConstElementPtr elem = Element::create(12345);
+ TranslatorBasic tb(sess);
+ try {
+ tb.setItem(xpath, elem, SR_UINT32_T);
+ } catch (const std::exception& ex) {
+ cerr << "setItem failed with: " << ex.what() << endl;
+ }
+
+ // Commit is so it becomes visible to subscriptions.
+ try {
+ sess->commit();
+ } catch (const std::exception& ex) {
+ cerr << "commit failed with: " << ex.what() << endl;
+ }
+
+ sess.reset();
+ conn.reset();
+}
+
+// Verifies that before initialization file descriptor sets are empty.
+TEST(FdWatcherTest, constructor) {
+ FdWatcherPtr fw = FdWatcher::instance();
+ ASSERT_TRUE(fw);
+ EXPECT_EQ(0, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+ EXPECT_FALSE(fw->thread_);
+}
+
+// Verifies that init works as expected, e.g. there is one element in readFds.
+TEST(FdWatcherTest, init) {
+ FdWatcherPtr fw = FdWatcher::instance();
+ ASSERT_TRUE(fw);
+ IOServicePtr io_service(new IOService());
+ ASSERT_TRUE(io_service);
+ int ret = 0;
+ EXPECT_NO_THROW(ret = fw->init(io_service));
+
+ // Check the fd watcher.
+ EXPECT_EQ(SR_ERR_OK, ret);
+ EXPECT_EQ(1, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+ EXPECT_TRUE(fw->thread_);
+
+ // init can be called twice.
+ EXPECT_NO_THROW(ret = fw->init(io_service));
+ EXPECT_EQ(SR_ERR_OK, ret);
+
+ // Final cleanup (will go in a fixture class).
+ fw->clear();
+ io_service->stop();
+ io_service.reset();
+}
+
+// Verifies that without the fd watcher a second thread handles subscriptions.
+TEST(FdWatcherTest, subscribeThread) {
+ pthread_t main_id = pthread_self();
+ S_Connection conn(new Connection("testing main"));
+ S_Session sess(new Session(conn));
+ S_Subscribe subs(new Subscribe(sess));
+ S_Callback cb(new TestCallback());
+ string model = KEA_DHCP4_SERVER;
+
+ // Check that a subscription opens file descriptors.
+ int fd = open("/dev/null", O_RDONLY);
+ close(fd);
+ ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+ int fd2 = open("/dev/null", O_RDONLY);
+ close(fd2);
+ EXPECT_LT(fd, fd2);
+
+ // Modify the configuration in a thread.
+ ThreadPtr th(new Thread(modify));
+ th->wait();
+ th.reset();
+ subs.reset();
+ sess.reset();
+ conn.reset();
+
+ // Check the subscribe callback is run in another thread.
+ EXPECT_NE(main_id, TestCallback::thread_id);
+}
+
+// Verifies that with the fd watcher handles a subscription.
+TEST(FdWatcherTest, subscribe) {
+ FdWatcherPtr fw = FdWatcher::instance();
+ ASSERT_TRUE(fw);
+ IOServicePtr io_service(new IOService());
+ ASSERT_TRUE(io_service);
+ int ret = 0;
+ ASSERT_NO_THROW(fw->init(io_service));
+ ASSERT_EQ(SR_ERR_OK, ret);
+ EXPECT_EQ(1, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+ EXPECT_TRUE(fw->thread_);
+
+ // Get a session and subscribe to module changes.
+ S_Connection conn(new Connection("testing main"));
+ S_Session sess(new Session(conn));
+ S_Subscribe subs(new Subscribe(sess));
+ S_Callback cb(new TestCallback());
+ string model = KEA_DHCP4_SERVER;
+ ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+
+ // Check some file descriptors were opened.
+ io_service->poll();
+ EXPECT_EQ(3, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+
+ // Reset subscription and check file descriptors are closed.
+ subs.reset();
+ io_service->poll();
+ EXPECT_EQ(1, fw->readFds.size());
+
+ // Cleanup.
+ sess.reset();
+ conn.reset();
+ fw->clear();
+ io_service->stop();
+ io_service.reset();
+}
+
+// Verifies that with the fd watcher no second thread handles subscriptions.
+TEST(FdWatcherTest, subscribeNoThread) {
+ FdWatcherPtr fw = FdWatcher::instance();
+ ASSERT_TRUE(fw);
+ IOServicePtr io_service(new IOService());
+ ASSERT_TRUE(io_service);
+ int ret = 0;
+ ASSERT_NO_THROW(fw->init(io_service));
+ ASSERT_EQ(SR_ERR_OK, ret);
+ EXPECT_EQ(1, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+ EXPECT_TRUE(fw->thread_);
+
+ // Get a session and subscribe to module changes.
+ pthread_t main_id = pthread_self();
+ S_Connection conn(new Connection("testing main"));
+ S_Session sess(new Session(conn));
+ S_Subscribe subs(new Subscribe(sess));
+ S_Callback cb(new TestCallback());
+ string model = KEA_DHCP4_SERVER;
+ ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+
+ // Check some file descriptors were opened.
+ io_service->poll();
+ EXPECT_EQ(3, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+
+ // Modify the configuration in a thread.
+ TestCallback::called = 0;
+ ThreadPtr th(new Thread(modify));
+ while (TestCallback::called == 0) {
+ io_service->run_one();
+ }
+ io_service->poll();
+ th->wait();
+ th.reset();
+
+ // Reset subscription and check file descriptors are closed.
+ subs.reset();
+ io_service->poll();
+ EXPECT_EQ(1, fw->readFds.size());
+
+ // Cleanup.
+ sess.reset();
+ conn.reset();
+ fw->clear();
+ io_service->stop();
+ io_service.reset();
+
+ // Check the subscribe callback is run in the Kea thread.
+ EXPECT_EQ(main_id, TestCallback::thread_id);
+}
+
+// Verifies that with the fd watcher handles subscribe/unsubscribe sequence.
+TEST(FdWatcherTest, unsubscribe) {
+ FdWatcherPtr fw = FdWatcher::instance();
+ ASSERT_TRUE(fw);
+ IOServicePtr io_service(new IOService());
+ ASSERT_TRUE(io_service);
+ int ret = 0;
+ ASSERT_NO_THROW(fw->init(io_service));
+ ASSERT_EQ(SR_ERR_OK, ret);
+ EXPECT_EQ(1, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+ EXPECT_TRUE(fw->thread_);
+
+ // Get a session and subscribe to module changes.
+ S_Connection conn(new Connection("testing main"));
+ S_Session sess(new Session(conn));
+ S_Subscribe subs1(new Subscribe(sess));
+ S_Callback cb(new TestCallback());
+ string model = KEA_DHCP4_SERVER;
+ ASSERT_NO_THROW(subs1->module_change_subscribe(model.c_str(), cb));
+
+ // Check some file descriptors were opened.
+ io_service->poll();
+ EXPECT_EQ(3, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+
+ // Reset subscription and check file descriptors are closed.
+ subs1.reset();
+ io_service->poll();
+ EXPECT_EQ(1, fw->readFds.size());
+
+ // Try again to subscribe.
+ S_Subscribe subs2(new Subscribe(sess));
+ ASSERT_NO_THROW(subs2->module_change_subscribe(model.c_str(), cb));
+ io_service->poll();
+ EXPECT_EQ(3, fw->readFds.size());
+ EXPECT_EQ(0, fw->writeFds.size());
+
+ // Unsubscribe.
+ subs2.reset();
+ io_service->poll();
+ EXPECT_EQ(1, fw->readFds.size());
+
+ // Cleanup.
+ sess.reset();
+ conn.reset();
+ fw->clear();
+ io_service->stop();
+ io_service.reset();
+}
+
+}