{ "setPoolServerPolicy", true, "name, func, pool", "set the server selection policy for this pool to one named 'name' and provided by 'function'" },
{ "setQueryCount", true, "bool", "set whether queries should be counted" },
{ "setQueryCountFilter", true, "func", "filter queries that would be counted, where `func` is a function with parameter `dq` which decides whether a query should and how it should be counted" },
+ { "setRingBuffersLockRetries", true, "n", "set the number of attempts to get a non-blocking lock to a ringbuffer shard before blocking" },
{ "setRingBuffersSize", true, "n", "set the capacity of the ringbuffers used for live traffic inspection to `n`" },
{ "setRules", true, "list of rules", "replace the current rules with the supplied list of pairs of DNS Rules and DNS Actions (see `newRuleAction()`)" },
{ "setServerPolicy", true, "policy", "set server selection policy to that policy" },
if (ret->connected) {
if(g_launchWork) {
g_launchWork->push_back([ret,cpus]() {
- ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+ ret->tid = thread(responderThread, ret);
if (!cpus.empty()) {
mapThreadToCPUList(ret->tid.native_handle(), cpus);
}
});
}
else {
- ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+ ret->tid = thread(responderThread, ret);
if (!cpus.empty()) {
mapThreadToCPUList(ret->tid.native_handle(), cpus);
}
g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1);
});
+ g_lua.writeFunction("setRingBuffersLockRetries", [](size_t retries) {
+ setLuaSideEffect();
+ g_rings.setNumberOfLockRetries(retries);
+ });
+
g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) {
setLuaSideEffect();
g_hashperturb = pertub;
#include "dnsdist.hh"
#include "lock.hh"
-std::atomic<size_t> Rings::s_queryInserterId;
-std::atomic<size_t> Rings::s_responseInserterId;
-
size_t Rings::numDistinctRequestors()
{
std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
/* we get launched with a pipe on which we receive file descriptors from clients that we own
from that point on */
- const auto queryInserterId = g_rings.getQueryInserterId();
bool outstanding = false;
time_t lastTCPCleanup = time(nullptr);
DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime);
- if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) {
+ if (!processQuery(holders, dq, poolname, &delayMsec, now)) {
goto drop;
}
struct timespec answertime;
gettime(&answertime);
unsigned int udiff = 1000000.0*DiffTime(now,answertime);
- g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId);
+ g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote);
rewrittenResponse.clear();
}
}
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(std::shared_ptr<DownstreamState> dss, const size_t responseInserterId)
+void* responderThread(std::shared_ptr<DownstreamState> dss)
try {
auto localRespRulactions = g_resprulactions.getLocal();
#ifdef HAVE_DNSCRYPT
struct timespec ts;
gettime(&ts);
- g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId);
+ g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote);
if(dh->rcode == RCode::ServFail)
g_stats.servfailResponses++;
}
}
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId)
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
{
- g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId);
+ g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh);
if(g_qcount.enabled) {
string qname = (*dq.qname).toString(".");
}
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
-static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId)
+static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
{
assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
uint16_t queryId = 0;
DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
- if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId))
+ if (!processQuery(holders, dq, poolname, &delayMsec, now))
{
return;
}
}
#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
-static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId)
+static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
{
struct MMReceiver
{
continue;
}
- processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId);
+ processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
}
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
// listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void* udpClientThread(ClientState* cs, size_t queryInserterId)
+static void* udpClientThread(ClientState* cs)
try
{
LocalHolders holders;
#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
if (g_udpVectorSize > 1) {
- MultipleMessagesUDPClientThread(cs, holders, queryInserterId);
+ MultipleMessagesUDPClientThread(cs, holders);
}
else
continue;
}
- processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId);
+ processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
}
}
}
}
if (dss->connected) {
- dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId());
+ dss->tid = thread(responderThread, dss);
}
}
memset(&fake, 0, sizeof(fake));
fake.id = ids.origID;
- g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote, 0);
+ g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
}
}
}
auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
addServerToPool(localPools, "", ret);
if (ret->connected) {
- ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+ ret->tid = thread(responderThread, ret);
}
g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
}
for(auto& cs : toLaunch) {
if (cs->udpFD >= 0) {
- thread t1(udpClientThread, cs, g_rings.getQueryInserterId());
+ thread t1(udpClientThread, cs);
if (!cs->cpus.empty()) {
mapThreadToCPUList(t1.native_handle(), cs->cpus);
}
std::mutex respLock;
};
- Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards)
+ Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries)
{
setCapacity(capacity, numberOfShards);
+ if (numberOfShards <= 1) {
+ d_nbLockTries = 0;
+ }
}
std::unordered_map<int, vector<boost::variant<string,double> > > getTopBandwidth(unsigned int numentries);
size_t numDistinctRequestors();
}
}
- static size_t getQueryInserterId()
- {
- return s_queryInserterId++;
- }
-
- static size_t getResponseInserterId()
+ void setNumberOfLockRetries(size_t retries)
{
- return s_responseInserterId++;
+ if (d_numberOfShards <= 1) {
+ d_nbLockTries = 0;
+ } else {
+ d_nbLockTries = retries;
+ }
}
size_t getNumberOfShards() const
return d_numberOfShards;
}
- void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId)
+ void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
{
- auto shardId = getShardId(queryInserterId);
+ for (size_t idx = 0; idx < d_nbLockTries; idx++) {
+ auto shardId = getShardId();
+ std::unique_lock<std::mutex> wl(d_shards[shardId]->queryLock, std::try_to_lock);
+ if (wl.owns_lock()) {
+ d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+ return;
+ }
+ }
+
+ /* out of luck, let's just wait */
+ auto shardId = getShardId();
std::lock_guard<std::mutex> wl(d_shards[shardId]->queryLock);
d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
}
- void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId)
+ void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
{
- auto shardId = getShardId(responseInserterId);
+ for (size_t idx = 0; idx < d_nbLockTries; idx++) {
+ auto shardId = getShardId();
+ std::unique_lock<std::mutex> wl(d_shards[shardId]->respLock, std::try_to_lock);
+ if (wl.owns_lock()) {
+ d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+ }
+ }
+
+ /* out of luck, let's just wait */
+ auto shardId = getShardId();
std::lock_guard<std::mutex> wl(d_shards[shardId]->respLock);
d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
}
std::vector<std::unique_ptr<Shard> > d_shards;
private:
- size_t getShardId(size_t id) const
+ size_t getShardId()
{
- return (id % d_numberOfShards);
+ return (d_currentShardId++ % d_numberOfShards);
}
- static std::atomic<size_t> s_queryInserterId;
- static std::atomic<size_t> s_responseInserterId;
+ std::atomic<size_t> d_currentShardId;
size_t d_numberOfShards;
+ size_t d_nbLockTries = 5;
+
};
extern Rings g_rings;
template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
-void* responderThread(std::shared_ptr<DownstreamState> state, size_t responseInserterId);
+void* responderThread(std::shared_ptr<DownstreamState> state);
extern std::mutex g_luamutex;
extern LuaContext g_lua;
extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
void resetLuaSideEffect(); // reset to indeterminate state
bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote);
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId);
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now);
bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec);
bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom);
void restoreFlags(struct dnsheader* dh, uint16_t origFlags);