std::vector<int> sockets;
sockets.reserve(dss->sockets.size());
- for(;;) {
+ for (;;) {
try {
+ if (dss->isStopped()) {
+ break;
+ }
+
+ if (!dss->connected) {
+ /* the sockets are not connected yet, likely because we detected a problem,
+ tried to reconnect and it failed. We will try to reconnect after the next
+ successful health-check (unless reconnectOnUp is false), or when trying
+ to send in the UDP listener thread, but until then we simply need to wait. */
+ dss->waitUntilConnected();
+ continue;
+ }
+
dss->pickSocketsReadyForReceiving(sockets);
+
+ /* check a second time here because we might have waited quite a bit
+ since the first check */
if (dss->isStopped()) {
break;
}
We don't want to reconnect the real socket if the healthcheck failed,
because it's not using the same socket.
*/
- if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV || savederrno == ENETUNREACH)) {
+ if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV || savederrno == ENETUNREACH || savederrno == EBADF)) {
ss->reconnect();
}
}
#include "config.h"
#include "ext/luawrapper/include/LuaContext.hpp"
+#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
std::thread tid;
std::mutex connectLock;
+ std::condition_variable d_connectedWait;
std::atomic_flag threadStarted;
bool d_stopped{false};
public:
}
bool reconnect();
+ void waitUntilConnected();
void hash();
void setId(const boost::uuids::uuid& newId);
void setWeight(int newWeight);
}
}
+ if (connected) {
+ tl.unlock();
+ d_connectedWait.notify_all();
+ }
+
return connected;
}
+void DownstreamState::waitUntilConnected()
+{
+ if (d_stopped) {
+ return;
+ }
+ if (connected) {
+ return;
+ }
+ {
+ std::unique_lock<std::mutex> lock(connectLock);
+ d_connectedWait.wait(lock, [this]{
+ return connected.load();
+ });
+ }
+}
+
void DownstreamState::stop()
{
if (d_stopped) {