return 0;
}
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+void DownstreamState::reconnect()
{
+ connected = false;
+ if (fd != -1) {
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
if (!IsAnyAddress(remote)) {
fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
if (!IsAnyAddress(sourceAddr)) {
catch(const std::runtime_error& error) {
infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
}
+ }
+}
+
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+{
+ if (!IsAnyAddress(remote)) {
+ reconnect();
idStates.resize(g_maxOutstanding);
sw.start();
infolog("Added downstream server %s", remote.toStringWithPort());
static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen)
{
+ ssize_t result;
+
if (ss->sourceItf == 0) {
- return send(sd, request, requestLen, 0);
+ result = send(sd, request, requestLen, 0);
+ }
+ else {
+ struct msghdr msgh;
+ struct iovec iov;
+ char cbuf[256];
+ fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
+ addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
+ result = sendmsg(sd, &msgh, 0);
}
- struct msghdr msgh;
- struct iovec iov;
- char cbuf[256];
- fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
- addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
- return sendmsg(sd, &msgh, 0);
+ if (result == -1) {
+ int savederrno = errno;
+ vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
+
+ /* This might sound silly, but on Linux send() might fail with EINVAL
+ if the interface the socket was bound to doesn't exist anymore. */
+ if (savederrno == EINVAL) {
+ ss->reconnect();
+ }
+ }
+
+ return result;
}
// listens to incoming queries, sends out to downstream servers, noting the intended return path
}
catch(const std::runtime_error& error) {
infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
+ newState = false;
+ dss->connected = false;
}
}