#include <functional>
#include <thread>
#include <mutex>
+#include <condition_variable>
#include <system_error>
#include <lldpctl.h>
"Couldn't find interface '" + if_name + "'");
}
- std::scoped_lock lock { mutex_ };
+ std::unique_lock lock { mutex_ };
if (0 == interface_callbacks_.erase(if_name) ) {
throw std::system_error(LLDPCTL_ERR_NOT_EXIST,
"No callback registered for interface '" + if_name + "'");
}
+
+ /* Wait for any active callbacks to complete. See comment in WatchCallback about callback and mutex handling. */
+ cv_.wait(
+ lock,
+ [this]
+ {
+ return active_callbacks_ == 0;
+ } );
}
private:
auto self { static_cast<LldpWatch<X, Y> *>(p) };
- std::scoped_lock lock { self->mutex_ };
+ /*
+ * Run the callbacks without holding the mutex to avoid a deadlock that occurs when
+ * - the client's callback also uses a mutex, and when
+ * - while that mutex is held, RegisterInterfaceCallback or UnregisterInterfaceCallback is called while a WatchCallback is running.
+ * To achieve that we copy the callback information into local variables.
+ * We also need to track the number of active callbacks to allow UnregisterInterfaceCallback to wait
+ * for any running callbacks to complete.
+ */
+ std::optional<std::pair<ChangeCallback<X>, X *>> general_cb;
+ std::optional<std::pair<ChangeCallback<Y>, Y *>> interface_cb;
+
+ {
+ std::unique_lock lock{ self->mutex_ };
+ ++self->active_callbacks_;
+
+ if (self->general_callback_.has_value()) {
+ general_cb.emplace(self->general_callback_.value());
+ }
- if (self->general_callback_.has_value()) {
- auto [callback, ctx] { self->general_callback_.value() };
+ if (auto it{ self->interface_callbacks_.find(if_name) };
+ it != self->interface_callbacks_.end()) {
+ interface_cb.emplace(it->second);
+ }
+ }
+
+ /* Run the callbacks without holding the mutex. */
+ if (general_cb.has_value()) {
+ auto [callback, ctx]{ general_cb.value() };
callback(if_name, change, interface_atom, neighbor_atom, ctx);
}
- if (auto it { self->interface_callbacks_.find(if_name) };
- it != self->interface_callbacks_.end()) {
- auto [callback, ctx] { it->second };
+ if (interface_cb.has_value()) {
+ auto [callback, ctx]{ interface_cb.value() };
callback(if_name, change, interface_atom, neighbor_atom, ctx);
}
+
+ /* Decrement active callbacks and notify waiting threads. */
+ {
+ std::unique_lock lock{ self->mutex_ };
+ --self->active_callbacks_;
+ self->cv_.notify_all();
+ }
}
lldpctl_conn_t *conn_ { ::lldpctl_new(nullptr, nullptr, this) };
std::jthread thread_;
std::mutex mutex_;
+ std::condition_variable cv_;
+ size_t active_callbacks_{ 0 };
const std::optional<std::pair<ChangeCallback<X>, X *>> general_callback_;
std::map<std::string, std::pair<ChangeCallback<Y>, Y *>, std::less<>>
interface_callbacks_;