]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[153-netconf-fd-watcher] Added file descriptor watcher code from kea-yang
authorFrancis Dupont <fdupont@isc.org>
Sat, 6 Oct 2018 16:13:11 +0000 (18:13 +0200)
committerFrancis Dupont <fdupont@isc.org>
Sat, 6 Oct 2018 16:13:11 +0000 (18:13 +0200)
src/bin/netconf/Makefile.am
src/bin/netconf/fd_watcher.cc [new file with mode: 0644]
src/bin/netconf/fd_watcher.h [new file with mode: 0644]
src/bin/netconf/netconf_process.cc
src/bin/netconf/tests/Makefile.am
src/bin/netconf/tests/fd_watcher_unittests.cc [new file with mode: 0644]

index 70c29377d2476f23897ed6576a06f79614b92e18..b77ce7b917645d667974e3150ef638bbd843df8f 100644 (file)
@@ -44,7 +44,8 @@ BUILT_SOURCES = netconf_messages.h netconf_messages.cc
 
 noinst_LTLIBRARIES = libnetconf.la
 
-libnetconf_la_SOURCES  = netconf_cfg_mgr.cc netconf_cfg_mgr.h
+libnetconf_la_SOURCES  = fd_watcher.cc fd_watcher.h
+libnetconf_la_SOURCES += netconf_cfg_mgr.cc netconf_cfg_mgr.h
 libnetconf_la_SOURCES += netconf_config.cc netconf_config.h
 libnetconf_la_SOURCES += netconf_controller.cc netconf_controller.h
 libnetconf_la_SOURCES += netconf_log.cc netconf_log.h
diff --git a/src/bin/netconf/fd_watcher.cc b/src/bin/netconf/fd_watcher.cc
new file mode 100644 (file)
index 0000000..783e72d
--- /dev/null
@@ -0,0 +1,306 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+/// @file fd_watcher.cc
+/// Contains the file descriptor watcher.
+
+#include <config.h>
+
+#include <netconf/fd_watcher.h>
+
+#include <boost/function.hpp>
+#include <sys/select.h>
+#include <fcntl.h>
+
+#include <sysrepo.h>
+
+using namespace std;
+using namespace isc::util::thread;
+
+namespace isc {
+namespace netconf {
+
+FdWatcherPtr FdWatcher::fd_watcher_;
+
+FdWatcher::FdWatcher() {
+}
+
+FdWatcherPtr&
+FdWatcher::instance() {
+    if (!fd_watcher_) {
+        fd_watcher_.reset(new FdWatcher());
+    }
+    return (fd_watcher_);
+}
+
+FdWatcher::~FdWatcher() {
+    this->clear();
+}
+
+int
+FdWatcher::init(const asiolink::IOServicePtr& io_service) {
+    // Remember the IO service.
+    io_service_ = io_service;
+
+    // If it was already called the initial watcher is in the receiving set.
+    if (!readFds.empty()) {
+        return (SR_ERR_OK);
+    }
+
+    // sr_fd_watcher_init is C code.
+    int watcher_fd = 0;
+    // Set a termination handler so closed file descriptors can be flushed.
+    int ret = sr_fd_watcher_init(&watcher_fd, []() { return; });
+    if (ret != SR_ERR_OK) {
+        return (ret);
+    }
+
+    // Add the watcher file descriptor to the receiving set.
+    addFd(watcher_fd, true, false);
+
+    // Create the first polling thread.
+    thread_.reset(new Thread(threadBody));
+
+    return (ret);
+}
+
+void
+FdWatcher::clear() {
+    // Clear sets.
+    writeFds.clear();
+    readFds.clear();
+
+    // Wake up the thread.
+    if (thread_) {
+        watch_terminate_.markReady();
+        thread_->wait();
+        thread_.reset();
+    }
+
+    // Clear the terminate watch socket.
+    watch_terminate_.clearReady();
+
+    // Clean up sysrepo.
+    sr_fd_watcher_cleanup();
+}
+
+void
+FdWatcher::addFd(int fd, bool reading, bool writing) {
+    // Ignore return values.
+    if (reading) {
+        readFds.insert(fd);
+    }
+    if (writing) {
+        writeFds.insert(fd);
+    }
+}
+
+void
+FdWatcher::delFd(int fd, bool reading, bool writing) {
+    if (reading) {
+        auto it = readFds.find(fd);
+        if (it != readFds.end()) {
+            readFds.erase(it);
+        }
+    }
+    if (writing) {
+        auto it = writeFds.find(fd);
+        if (it != writeFds.end()) {
+            writeFds.erase(it);
+        }
+    }
+}
+
+void
+FdWatcher::postHandler() {
+    // Get the singleton.
+    FdWatcherPtr watcher = FdWatcher::instance();
+
+    // Collect the thread.
+    if (watcher->thread_) {
+        watcher->thread_->wait();
+        watcher->thread_.reset();
+    }
+
+    // Nothing to do if not initialized.
+    if (watcher->readFds.empty()) {
+        return;
+    }
+
+    // Get reading file descriptors.
+    int maxfd = 0;
+    fd_set rd_set;
+    FD_ZERO(&rd_set);
+    set<int> readFds = watcher->readFds;
+    for (int fd : readFds) {
+        FD_SET(fd, &rd_set);
+        if (fd > maxfd) {
+            maxfd = fd;
+        }
+    }
+
+    // Get writing file descriptors.
+    fd_set wr_set;
+    FD_ZERO(&wr_set);
+    set<int> writeFds = watcher->writeFds;
+    for (int fd : writeFds) {
+        FD_SET(fd, &wr_set);
+        if (fd > maxfd) {
+            maxfd = fd;
+        }
+    }
+
+    // No wait,
+    struct timeval tv;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+
+    // zero out the errno to be safe.
+    errno = 0;
+
+    int ret = select(maxfd + 1, &rd_set, &wr_set, 0, &tv);
+    if (ret < 0) {
+        if (errno == EINTR) {
+            // Not lucky... Just retry.
+            watcher->io_service_->post(postHandler);
+            return;
+        } else if (errno == EBADF) {
+            // A file descriptor was closed before being removed.
+            for (int fd : readFds) {
+                if (fcntl(fd, F_GETFD) == -1) {
+                    watcher->delFd(fd, true, true);
+                }
+            }
+            for (int fd : writeFds) {
+                if (fcntl(fd, F_GETFD) == -1) {
+                    watcher->delFd(fd, true, true);
+                }
+            }
+            // Retry.
+            watcher->io_service_->post(postHandler);
+            return;
+        } else {
+            isc_throw(Unexpected, "select failed with " << strerror(errno));
+        }
+    } else if (ret != 0) {
+        // Scan reading file descriptors.
+        // Work on the copy as the watcher sets can be modified.
+        bool processed = false;
+        for (int fd : readFds) {
+            if (!FD_ISSET(fd, &rd_set)) {
+                continue;
+            }
+
+            // sr_fd_event_process is C code.
+            sr_fd_change_t* change_set = 0;
+            size_t cnt = 0;
+            // ignore return because we don't know what to do on error.
+            sr_fd_event_process(fd, SR_FD_INPUT_READY, &change_set, &cnt);
+            for (size_t i = 0; i < cnt; ++i) {
+                if (change_set[i].action == SR_FD_START_WATCHING) {
+                    watcher->addFd(change_set[i].fd,
+                        (change_set[i].events & SR_FD_INPUT_READY) != 0,
+                        (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+                } else if (change_set[i].action == SR_FD_STOP_WATCHING) {
+                    watcher->delFd(change_set[i].fd,
+                        (change_set[i].events & SR_FD_INPUT_READY) != 0,
+                        (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+                }
+            }
+            free(change_set);
+            processed = true;
+            break;
+        }
+
+        // Restart if something was done.
+        if (processed) {
+            watcher->io_service_->post(postHandler);
+            return;
+        }
+
+        // Scan writing file descriptors.
+        // Work on the copy as the watcher sets can be modified.
+        for (int fd : writeFds) {
+            if (!FD_ISSET(fd, &rd_set)) {
+                continue;
+            }
+
+            // sr_fd_event_process is C code.
+            sr_fd_change_t* change_set = 0;
+            size_t cnt = 0;
+            // ignore return because we don't know what to do on error.
+            sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &change_set, &cnt);
+            for (size_t i = 0; i < cnt; ++i) {
+                if (change_set[i].action == SR_FD_START_WATCHING) {
+                    watcher->addFd(change_set[i].fd,
+                        (change_set[i].events & SR_FD_INPUT_READY) != 0,
+                        (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+                } else if (change_set[i].action == SR_FD_STOP_WATCHING) {
+                    watcher->delFd(change_set[i].fd,
+                        (change_set[i].events & SR_FD_INPUT_READY) != 0,
+                        (change_set[i].events & SR_FD_OUTPUT_READY) != 0);
+                }
+            }
+            free(change_set);
+            processed = true;
+            break;
+        }
+
+        // Restart if something was done.
+        if (processed) {
+            watcher->io_service_->post(postHandler);
+            return;
+        }
+    }
+
+    // Either select() returns 0 or nothing was done.
+    // Create a new polling thread.
+    watcher->thread_.reset(new Thread(threadBody));
+}
+
+void
+FdWatcher::threadBody() {
+    // Get the singleton.
+    FdWatcherPtr watcher = FdWatcher::instance();
+
+    // Get reading file descriptors.
+    int maxfd = 0;
+    fd_set rd_set;
+    FD_ZERO(&rd_set);
+    for (int fd : watcher->readFds) {
+        FD_SET(fd, &rd_set);
+        if (fd > maxfd) {
+            maxfd = fd;
+        }
+    }
+    // Add the terminate watch socket.
+    int wt_fd = watcher->watch_terminate_.getSelectFd();
+    FD_SET(wt_fd, &rd_set);
+    if (wt_fd > maxfd) {
+        maxfd = wt_fd;
+    }
+
+    // Get writing file descriptors.
+    fd_set wr_set;
+    FD_ZERO(&wr_set);
+    for (int fd : watcher->writeFds) {
+        FD_SET(fd, &wr_set);
+        if (fd > maxfd) {
+            maxfd = fd;
+        }
+    }
+
+    // Wait until something happens.
+    select(maxfd + 1, &rd_set, &wr_set, 0, 0);
+
+    // Deal with events in the Kea event loop.
+    watcher->io_service_->post(postHandler);
+
+    return;
+}
+
+} // namespace netconf
+} // namespace isc
diff --git a/src/bin/netconf/fd_watcher.h b/src/bin/netconf/fd_watcher.h
new file mode 100644 (file)
index 0000000..d4fe603
--- /dev/null
@@ -0,0 +1,118 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+/// @file netconf_log.h
+/// Contains declarations for loggers used by the Kea netconf agent.
+
+#ifndef NETCONF_FD_WATCHER_H
+#define NETCONF_FD_WATCHER_H
+
+#include <util/watch_socket.h>
+#include <util/threads/thread.h>
+#include <asiolink/io_service.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <set>
+
+namespace isc {
+namespace netconf {
+
+/// @brief Forward declaration to the @c FdWatcher.
+class FdWatcher;
+
+/// @brief Type definition for the pointer to the @c FdWatcher.
+typedef boost::shared_ptr<FdWatcher> FdWatcherPtr;
+
+/// @brief Type definition for the pointer to the @c isc::util::thread::Thread.
+typedef boost::shared_ptr<isc::util::thread::Thread> ThreadPtr;
+
+/// @brief FdWatcher utility.
+///
+/// This class implements a sysrepo file descriptor watcher.
+/// The alternative is to leave sysrepo to create its own thread to
+/// handle suscriptions with an independent and likely incompatible
+/// event loop and of course locking issues.
+/// To simplify callbacks this class is implemented as a singleton.
+class FdWatcher : public boost::noncopyable {
+public:
+
+    /// @brief Static singleton instance method.
+    ///
+    /// This method returns the class singleton instance member.
+    /// It instantiates the singleton upon first invocation.
+    ///
+    /// @return the pointer reference to the singleton instance.
+    static FdWatcherPtr& instance();
+
+    /// @brief Destructor (final cleanup).
+    virtual ~FdWatcher();
+
+    /// @brief Initialize the fd watcher.
+    ///
+    /// @param io_service The IO service aka the Kea event loop.
+    /// @return SR_ERR_OK (0) on success, SR_ERR_XXX (>0) on error.
+    int init(const asiolink::IOServicePtr& io_service);
+
+    /// @brief Clear the fd watcher.
+    void clear();
+
+    /// @brief Add a file descriptor to watch.
+    ///
+    /// @param fd The file descriptor to add.
+    /// @param reading Boolean flag: true to watch for reading.
+    /// @param writing Boolean flag: true to watch for writing.
+    void addFd(int fd, bool reading, bool writing);
+
+    /// @brief Delete a file descriptor to watch.
+    ///
+    /// @param fd The file descriptor to delete.
+    /// @param reading Boolean flag: true to watch for reading.
+    /// @param writing Boolean flag: true to watch for writing.
+    void delFd(int fd, bool reading, bool writing);
+
+    /// @brief Post handler.
+    ///
+    /// The thread posts this handler on the IO service when there
+    /// should be a file descriptor available for IO.
+    static void postHandler();
+
+    /// @brief Thread body.
+    ///
+    /// The thread body: select() on file descriptors, when one is
+    /// available posts fdAvailable and returns.
+    static void threadBody();
+
+    /// @brief Shared pointer to the IOService object where to post callbacks.
+    asiolink::IOServicePtr io_service_;
+
+    /// @brief Terminate watch socket.
+    ///
+    /// Used to wake up the thread.
+    isc::util::WatchSocket watch_terminate_;
+
+    /// @brief Polling thread.
+    ThreadPtr thread_;
+
+    /// @brief Reading file descriptors.
+    std::set<int> readFds;
+
+    /// @brief Write file descriptors.
+    std::set<int> writeFds;
+
+private:
+    /// @brief Constructor (private to protect the singleton instance).
+    FdWatcher();
+
+    /// @brief Singleton instance value.
+    static FdWatcherPtr fd_watcher_;
+};
+
+} // namespace netconf
+} // namespace isc
+
+#endif // NETCONF_FD_WATCHER_H
index d3c469a610c1fa5edee0a9267323ec8683ec2ad8..45bd94102e38d05d0682f2b7afd140a247d28a64 100644 (file)
@@ -9,6 +9,7 @@
 #include <netconf/netconf_process.h>
 #include <netconf/netconf_controller.h>
 #include <netconf/netconf_log.h>
+#include <netconf/fd_watcher.h>
 #include <asiolink/io_address.h>
 #include <asiolink/io_error.h>
 #include <cc/command_interpreter.h>
@@ -42,6 +43,9 @@ NetconfProcess::run() {
     LOG_INFO(netconf_logger, NETCONF_STARTED).arg(VERSION);
 
     try {
+        // Initialize file descriptor watcher.
+        FdWatcher::instance()->init(getIoService());
+
         // Let's process incoming data or expiring timers in a loop until
         // shutdown condition is detected.
         while (!shouldShutdown()) {
@@ -51,6 +55,7 @@ NetconfProcess::run() {
     } catch (const std::exception& ex) {
         LOG_FATAL(netconf_logger, NETCONF_FAILED).arg(ex.what());
         try {
+            FdWatcher::instance()->clear();
             stopIOService();
         } catch (...) {
             // Ignore double errors
@@ -59,6 +64,7 @@ NetconfProcess::run() {
                   "Process run method failed: " << ex.what());
     }
 
+    FdWatcher::instance()->clear();
     LOG_DEBUG(netconf_logger, isc::log::DBGLVL_START_SHUT, NETCONF_RUN_EXIT);
 }
 
index 0c14ae51e88bf3ccad4e0d03fbfb94bda19aed91..fd821ca5be61812d8462bde31d34117c40caecf9 100644 (file)
@@ -44,7 +44,8 @@ noinst_LTLIBRARIES = libbasic.la
 
 TESTS += netconf_unittests
 
-netconf_unittests_SOURCES  = get_config_unittest.cc
+netconf_unittests_SOURCES  = fd_watcher_unittests.cc
+netconf_unittests_SOURCES += get_config_unittest.cc
 netconf_unittests_SOURCES += netconf_cfg_mgr_unittests.cc
 netconf_unittests_SOURCES += netconf_controller_unittests.cc
 netconf_unittests_SOURCES += netconf_process_unittests.cc
diff --git a/src/bin/netconf/tests/fd_watcher_unittests.cc b/src/bin/netconf/tests/fd_watcher_unittests.cc
new file mode 100644 (file)
index 0000000..79b2b28
--- /dev/null
@@ -0,0 +1,289 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <netconf/fd_watcher.h>
+#include <yang/translator.h>
+#include <yang/yang_models.h>
+#include <asiolink/io_service.h>
+#include <gtest/gtest.h>
+#include <fcntl.h>
+
+using namespace std;
+using namespace isc;
+using namespace isc::netconf;
+using namespace isc::asiolink;
+using namespace isc::data;
+using namespace isc::yang;
+using namespace isc::util::thread;
+
+namespace {
+
+/// @brief Test subscribe callback.
+class TestCallback : public Callback {
+public:
+    /// @brief Module change callback.
+    ///
+    /// @param sess The running datastore session.
+    /// @param module_name The module name.
+    /// @param event The event.
+    /// @param private_ctx The private context.
+    /// @return the sysrepo return code.
+    int module_change(S_Session sess,
+                      const char* /*module_name*/,
+                      sr_notif_event_t /*event*/,
+                      void* /*private_ctx*/) {
+        ++called;
+        thread_id = pthread_self();
+        return (SR_ERR_OK);
+    }
+
+    /// @brief Called counter.
+    static size_t called;
+
+    /// @brief Thread ID where the callback was run.
+    static pthread_t thread_id;
+};
+
+pthread_t TestCallback::thread_id;
+size_t TestCallback::called;
+
+/// @brief Modify.
+///
+/// Used as a thread body to modify the configuration.
+void
+modify() {
+    S_Connection conn(new Connection("modify thread"));
+    S_Session sess(new Session(conn));
+
+    // Set valid lifetime to 12345.
+    string xpath = "/kea-dhcp4-server:config/valid-lifetime";
+    ConstElementPtr elem = Element::create(12345);
+    TranslatorBasic tb(sess);
+    try {
+        tb.setItem(xpath, elem, SR_UINT32_T);
+    } catch (const std::exception& ex) {
+        cerr << "setItem failed with: " << ex.what() << endl;
+    }
+
+    // Commit is so it becomes visible to subscriptions.
+    try {
+        sess->commit();
+    } catch (const std::exception& ex) {
+        cerr << "commit failed with: " << ex.what() << endl;
+    }
+
+    sess.reset();
+    conn.reset();
+}
+
+// Verifies that before initialization file descriptor sets are empty.
+TEST(FdWatcherTest, constructor) {
+    FdWatcherPtr fw = FdWatcher::instance();
+    ASSERT_TRUE(fw);
+    EXPECT_EQ(0, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+    EXPECT_FALSE(fw->thread_);
+}
+
+// Verifies that init works as expected, e.g. there is one element in readFds.
+TEST(FdWatcherTest, init) {
+    FdWatcherPtr fw = FdWatcher::instance();
+    ASSERT_TRUE(fw);
+    IOServicePtr io_service(new IOService());
+    ASSERT_TRUE(io_service);
+    int ret = 0;
+    EXPECT_NO_THROW(ret = fw->init(io_service));
+
+    // Check the fd watcher.
+    EXPECT_EQ(SR_ERR_OK, ret);
+    EXPECT_EQ(1, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+    EXPECT_TRUE(fw->thread_);
+
+    // init can be called twice.
+    EXPECT_NO_THROW(ret = fw->init(io_service));
+    EXPECT_EQ(SR_ERR_OK, ret);
+
+    // Final cleanup (will go in a fixture class).
+    fw->clear();
+    io_service->stop();
+    io_service.reset();
+}
+
+// Verifies that without the fd watcher a second thread handles subscriptions.
+TEST(FdWatcherTest, subscribeThread) {
+    pthread_t main_id = pthread_self();
+    S_Connection conn(new Connection("testing main"));
+    S_Session sess(new Session(conn));
+    S_Subscribe subs(new Subscribe(sess));
+    S_Callback cb(new TestCallback());
+    string model = KEA_DHCP4_SERVER;
+
+    // Check that a subscription opens file descriptors.
+    int fd = open("/dev/null", O_RDONLY);
+    close(fd);
+    ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+    int fd2 = open("/dev/null", O_RDONLY);
+    close(fd2);
+    EXPECT_LT(fd, fd2);
+
+    // Modify the configuration in a thread.
+    ThreadPtr th(new Thread(modify));
+    th->wait();
+    th.reset();
+    subs.reset();
+    sess.reset();
+    conn.reset();
+
+    // Check the subscribe callback is run in another thread.
+    EXPECT_NE(main_id, TestCallback::thread_id);
+}
+
+// Verifies that with the fd watcher handles a subscription.
+TEST(FdWatcherTest, subscribe) {
+    FdWatcherPtr fw = FdWatcher::instance();
+    ASSERT_TRUE(fw);
+    IOServicePtr io_service(new IOService());
+    ASSERT_TRUE(io_service);
+    int ret = 0;
+    ASSERT_NO_THROW(fw->init(io_service));
+    ASSERT_EQ(SR_ERR_OK, ret);
+    EXPECT_EQ(1, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+    EXPECT_TRUE(fw->thread_);
+
+    // Get a session and subscribe to module changes.
+    S_Connection conn(new Connection("testing main"));
+    S_Session sess(new Session(conn));
+    S_Subscribe subs(new Subscribe(sess));
+    S_Callback cb(new TestCallback());
+    string model = KEA_DHCP4_SERVER;
+    ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+
+    // Check some file descriptors were opened.
+    io_service->poll();
+    EXPECT_EQ(3, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+
+    // Reset subscription and check file descriptors are closed.
+    subs.reset();
+    io_service->poll();
+    EXPECT_EQ(1, fw->readFds.size());
+
+    // Cleanup.
+    sess.reset();
+    conn.reset();
+    fw->clear();
+    io_service->stop();
+    io_service.reset();
+}
+
+// Verifies that with the fd watcher no second thread handles subscriptions.
+TEST(FdWatcherTest, subscribeNoThread) {
+    FdWatcherPtr fw = FdWatcher::instance();
+    ASSERT_TRUE(fw);
+    IOServicePtr io_service(new IOService());
+    ASSERT_TRUE(io_service);
+    int ret = 0;
+    ASSERT_NO_THROW(fw->init(io_service));
+    ASSERT_EQ(SR_ERR_OK, ret);
+    EXPECT_EQ(1, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+    EXPECT_TRUE(fw->thread_);
+
+    // Get a session and subscribe to module changes.
+    pthread_t main_id = pthread_self();
+    S_Connection conn(new Connection("testing main"));
+    S_Session sess(new Session(conn));
+    S_Subscribe subs(new Subscribe(sess));
+    S_Callback cb(new TestCallback());
+    string model = KEA_DHCP4_SERVER;
+    ASSERT_NO_THROW(subs->module_change_subscribe(model.c_str(), cb));
+
+    // Check some file descriptors were opened.
+    io_service->poll();
+    EXPECT_EQ(3, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+
+    // Modify the configuration in a thread.
+    TestCallback::called = 0;
+    ThreadPtr th(new Thread(modify));
+    while (TestCallback::called == 0) {
+        io_service->run_one();
+    }
+    io_service->poll();
+    th->wait();
+    th.reset();
+
+    // Reset subscription and check file descriptors are closed.
+    subs.reset();
+    io_service->poll();
+    EXPECT_EQ(1, fw->readFds.size());
+
+    // Cleanup.
+    sess.reset();
+    conn.reset();
+    fw->clear();
+    io_service->stop();
+    io_service.reset();
+
+    // Check the subscribe callback is run in the Kea thread.
+    EXPECT_EQ(main_id, TestCallback::thread_id);
+}
+
+// Verifies that with the fd watcher handles subscribe/unsubscribe sequence.
+TEST(FdWatcherTest, unsubscribe) {
+    FdWatcherPtr fw = FdWatcher::instance();
+    ASSERT_TRUE(fw);
+    IOServicePtr io_service(new IOService());
+    ASSERT_TRUE(io_service);
+    int ret = 0;
+    ASSERT_NO_THROW(fw->init(io_service));
+    ASSERT_EQ(SR_ERR_OK, ret);
+    EXPECT_EQ(1, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+    EXPECT_TRUE(fw->thread_);
+
+    // Get a session and subscribe to module changes.
+    S_Connection conn(new Connection("testing main"));
+    S_Session sess(new Session(conn));
+    S_Subscribe subs1(new Subscribe(sess));
+    S_Callback cb(new TestCallback());
+    string model = KEA_DHCP4_SERVER;
+    ASSERT_NO_THROW(subs1->module_change_subscribe(model.c_str(), cb));
+
+    // Check some file descriptors were opened.
+    io_service->poll();
+    EXPECT_EQ(3, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+
+    // Reset subscription and check file descriptors are closed.
+    subs1.reset();
+    io_service->poll();
+    EXPECT_EQ(1, fw->readFds.size());
+
+    // Try again to subscribe.
+    S_Subscribe subs2(new Subscribe(sess));
+    ASSERT_NO_THROW(subs2->module_change_subscribe(model.c_str(), cb));
+    io_service->poll();
+    EXPECT_EQ(3, fw->readFds.size());
+    EXPECT_EQ(0, fw->writeFds.size());
+
+    // Unsubscribe.
+    subs2.reset();
+    io_service->poll();
+    EXPECT_EQ(1, fw->readFds.size());
+
+    // Cleanup.
+    sess.reset();
+    conn.reset();
+    fw->clear();
+    io_service->stop();
+    io_service.reset();
+}
+
+}