g_pools.setState(localPools);
states.erase(remove(states.begin(), states.end(), server), states.end());
g_dstates.setState(states);
+ server->stop();
} );
g_lua.writeFunction("truncateTC", [](bool tc) { setLuaSideEffect(); g_truncateTC=tc; });
std::vector<int> sockets;
sockets.reserve(dss->sockets.size());
- for(;;) {
+ for(; !dss->isStopped(); ) {
dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
try {
pickBackendSocketsReadyForReceiving(dss, sockets);
char * response = packet;
size_t responseSize = sizeof(packet);
- if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
+ if (got == 0 && dss->isStopped()) {
+ break;
+ }
+
+ if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader)) {
continue;
+ }
uint16_t responseLen = static_cast<uint16_t>(got);
queryId = dh->id;
DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, const std::string& sourceItfName, size_t numberOfSockets, bool connect);
DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, std::string(), 1, true) {}
- ~DownstreamState()
- {
- for (auto& fd : sockets) {
- if (fd >= 0) {
- close(fd);
- fd = -1;
- }
- }
- }
+ ~DownstreamState();
+
boost::uuids::uuid id;
std::vector<unsigned int> hashes;
mutable ReadWriteLock d_lock;
void hash();
void setId(const boost::uuids::uuid& newId);
void setWeight(int newWeight);
+ void stop();
+ bool isStopped() const
+ {
+ return d_stopped;
+ }
void updateTCPMetrics(size_t nbQueries, uint64_t durationMs)
{
private:
std::string name;
std::string nameWithAddr;
+ bool d_stopped{false};
};
using servers_t =vector<std::shared_ptr<DownstreamState>>;
return connected;
}
+
+void DownstreamState::stop()
+{
+ std::unique_lock<std::mutex> tl(connectLock);
+ std::lock_guard<std::mutex> slock(socketsLock);
+ d_stopped = true;
+
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ }
+ }
+}
+
void DownstreamState::hash()
{
vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
idStates.resize(g_maxOutstanding);
sw.start();
}
+}
+
+DownstreamState::~DownstreamState()
+{
+ for (auto& fd : sockets) {
+ if (fd >= 0) {
+ close(fd);
+ fd = -1;
+ }
+ }
+ // we need to either detach or join the thread before it
+ // is destroyed
+ if (threadStarted.test_and_set()) {
+ tid.detach();
+ }
}