]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.hh
dnsdist: Implement FFI LB policies
[thirdparty/pdns.git] / pdns / dnsdist.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #pragma once
23 #include "config.h"
24 #include "ext/luawrapper/include/LuaContext.hpp"
25
26 #include <atomic>
27 #include <mutex>
28 #include <string>
29 #include <thread>
30 #include <time.h>
31 #include <unistd.h>
32 #include <unordered_map>
33
34 #include <boost/variant.hpp>
35
36 #include "bpf-filter.hh"
37 #include "capabilities.hh"
38 #include "circular_buffer.hh"
39 #include "dnscrypt.hh"
40 #include "dnsdist-cache.hh"
41 #include "dnsdist-dynbpf.hh"
42 #include "dnsname.hh"
43 #include "doh.hh"
44 #include "ednsoptions.hh"
45 #include "gettime.hh"
46 #include "iputils.hh"
47 #include "misc.hh"
48 #include "mplexer.hh"
49 #include "sholder.hh"
50 #include "tcpiohandler.hh"
51 #include "uuid-utils.hh"
52
53 void carbonDumpThread();
54 uint64_t uptimeOfProcess(const std::string& str);
55
56 extern uint16_t g_ECSSourcePrefixV4;
57 extern uint16_t g_ECSSourcePrefixV6;
58 extern bool g_ECSOverride;
59
60 typedef std::unordered_map<string, string> QTag;
61
62 struct DNSQuestion
63 {
64 DNSQuestion(const DNSName* name, uint16_t type, uint16_t class_, unsigned int consumed_, const ComboAddress* lc, const ComboAddress* rem, struct dnsheader* header, size_t bufferSize, uint16_t queryLen, bool isTcp, const struct timespec* queryTime_):
65 qname(name), local(lc), remote(rem), dh(header), queryTime(queryTime_), size(bufferSize), consumed(consumed_), tempFailureTTL(boost::none), qtype(type), qclass(class_), len(queryLen), ecsPrefixLength(rem->sin4.sin_family == AF_INET ? g_ECSSourcePrefixV4 : g_ECSSourcePrefixV6), tcp(isTcp), ecsOverride(g_ECSOverride) {
66 const uint16_t* flags = getFlagsFromDNSHeader(dh);
67 origFlags = *flags;
68 }
69 DNSQuestion(const DNSQuestion&) = delete;
70 DNSQuestion& operator=(const DNSQuestion&) = delete;
71 DNSQuestion(DNSQuestion&&) = default;
72
73 std::string getTrailingData() const;
74 bool setTrailingData(const std::string&);
75
76 #ifdef HAVE_PROTOBUF
77 boost::optional<boost::uuids::uuid> uniqueId;
78 #endif
79 Netmask ecs;
80 boost::optional<Netmask> subnet;
81 std::string sni; /* Server Name Indication, if any (DoT or DoH) */
82 std::string poolname;
83 const DNSName* qname{nullptr};
84 const ComboAddress* local{nullptr};
85 const ComboAddress* remote{nullptr};
86 std::shared_ptr<QTag> qTag{nullptr};
87 std::shared_ptr<std::map<uint16_t, EDNSOptionView> > ednsOptions;
88 std::shared_ptr<DNSCryptQuery> dnsCryptQuery{nullptr};
89 std::shared_ptr<DNSDistPacketCache> packetCache{nullptr};
90 struct dnsheader* dh{nullptr};
91 const struct timespec* queryTime{nullptr};
92 struct DOHUnit* du{nullptr};
93 size_t size;
94 unsigned int consumed{0};
95 int delayMsec{0};
96 boost::optional<uint32_t> tempFailureTTL;
97 uint32_t cacheKeyNoECS;
98 uint32_t cacheKey;
99 const uint16_t qtype;
100 const uint16_t qclass;
101 uint16_t len;
102 uint16_t ecsPrefixLength;
103 uint16_t origFlags;
104 uint8_t ednsRCode{0};
105 const bool tcp;
106 bool skipCache{false};
107 bool ecsOverride;
108 bool useECS{true};
109 bool addXPF{true};
110 bool ecsSet{false};
111 bool ecsAdded{false};
112 bool ednsAdded{false};
113 bool useZeroScope{false};
114 bool dnssecOK{false};
115 };
116
117 struct DNSResponse : DNSQuestion
118 {
119 DNSResponse(const DNSName* name, uint16_t type, uint16_t class_, unsigned int consumed_, const ComboAddress* lc, const ComboAddress* rem, struct dnsheader* header, size_t bufferSize, uint16_t responseLen, bool isTcp, const struct timespec* queryTime_):
120 DNSQuestion(name, type, class_, consumed_, lc, rem, header, bufferSize, responseLen, isTcp, queryTime_) { }
121 DNSResponse(const DNSResponse&) = delete;
122 DNSResponse& operator=(const DNSResponse&) = delete;
123 DNSResponse(DNSResponse&&) = default;
124 };
125
126 /* so what could you do:
127 drop,
128 fake up nxdomain,
129 provide actual answer,
130 allow & and stop processing,
131 continue processing,
132 modify header: (servfail|refused|notimp), set TC=1,
133 send to pool */
134
135 class DNSAction
136 {
137 public:
138 enum class Action { Drop, Nxdomain, Refused, Spoof, Allow, HeaderModify, Pool, Delay, Truncate, ServFail, None, NoOp, NoRecurse, SpoofRaw };
139 static std::string typeToString(const Action& action)
140 {
141 switch(action) {
142 case Action::Drop:
143 return "Drop";
144 case Action::Nxdomain:
145 return "Send NXDomain";
146 case Action::Refused:
147 return "Send Refused";
148 case Action::Spoof:
149 return "Spoof an answer";
150 case Action::SpoofRaw:
151 return "Spoof an answer from raw bytes";
152 case Action::Allow:
153 return "Allow";
154 case Action::HeaderModify:
155 return "Modify the header";
156 case Action::Pool:
157 return "Route to a pool";
158 case Action::Delay:
159 return "Delay";
160 case Action::Truncate:
161 return "Truncate over UDP";
162 case Action::ServFail:
163 return "Send ServFail";
164 case Action::None:
165 case Action::NoOp:
166 return "Do nothing";
167 case Action::NoRecurse:
168 return "Set rd=0";
169 }
170
171 return "Unknown";
172 }
173
174 virtual Action operator()(DNSQuestion*, string* ruleresult) const =0;
175 virtual ~DNSAction()
176 {
177 }
178 virtual string toString() const = 0;
179 virtual std::map<string, double> getStats() const
180 {
181 return {{}};
182 }
183 };
184
185 class DNSResponseAction
186 {
187 public:
188 enum class Action { Allow, Delay, Drop, HeaderModify, ServFail, None };
189 virtual Action operator()(DNSResponse*, string* ruleresult) const =0;
190 virtual ~DNSResponseAction()
191 {
192 }
193 virtual string toString() const = 0;
194 };
195
196 struct DynBlock
197 {
198 DynBlock(): action(DNSAction::Action::None), warning(false)
199 {
200 }
201
202 DynBlock(const std::string& reason_, const struct timespec& until_, const DNSName& domain_, DNSAction::Action action_): reason(reason_), until(until_), domain(domain_), action(action_), warning(false)
203 {
204 }
205
206 DynBlock(const DynBlock& rhs): reason(rhs.reason), until(rhs.until), domain(rhs.domain), action(rhs.action), warning(rhs.warning)
207 {
208 blocks.store(rhs.blocks);
209 }
210
211 DynBlock& operator=(const DynBlock& rhs)
212 {
213 reason=rhs.reason;
214 until=rhs.until;
215 domain=rhs.domain;
216 action=rhs.action;
217 blocks.store(rhs.blocks);
218 warning=rhs.warning;
219 return *this;
220 }
221
222 string reason;
223 struct timespec until;
224 DNSName domain;
225 DNSAction::Action action;
226 mutable std::atomic<unsigned int> blocks;
227 bool warning;
228 };
229
230 extern GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
231
232 extern vector<pair<struct timeval, std::string> > g_confDelta;
233
234 extern uint64_t getLatencyCount(const std::string&);
235
236 struct DNSDistStats
237 {
238 using stat_t=std::atomic<uint64_t>; // aww yiss ;-)
239 stat_t responses{0};
240 stat_t servfailResponses{0};
241 stat_t queries{0};
242 stat_t frontendNXDomain{0};
243 stat_t frontendServFail{0};
244 stat_t frontendNoError{0};
245 stat_t nonCompliantQueries{0};
246 stat_t nonCompliantResponses{0};
247 stat_t rdQueries{0};
248 stat_t emptyQueries{0};
249 stat_t aclDrops{0};
250 stat_t dynBlocked{0};
251 stat_t ruleDrop{0};
252 stat_t ruleNXDomain{0};
253 stat_t ruleRefused{0};
254 stat_t ruleServFail{0};
255 stat_t selfAnswered{0};
256 stat_t downstreamTimeouts{0};
257 stat_t downstreamSendErrors{0};
258 stat_t truncFail{0};
259 stat_t noPolicy{0};
260 stat_t cacheHits{0};
261 stat_t cacheMisses{0};
262 stat_t latency0_1{0}, latency1_10{0}, latency10_50{0}, latency50_100{0}, latency100_1000{0}, latencySlow{0}, latencySum{0};
263 stat_t securityStatus{0};
264
265 double latencyAvg100{0}, latencyAvg1000{0}, latencyAvg10000{0}, latencyAvg1000000{0};
266 typedef std::function<uint64_t(const std::string&)> statfunction_t;
267 typedef boost::variant<stat_t*, double*, statfunction_t> entry_t;
268 std::vector<std::pair<std::string, entry_t>> entries{
269 {"responses", &responses},
270 {"servfail-responses", &servfailResponses},
271 {"queries", &queries},
272 {"frontend-nxdomain", &frontendNXDomain},
273 {"frontend-servfail", &frontendServFail},
274 {"frontend-noerror", &frontendNoError},
275 {"acl-drops", &aclDrops},
276 {"rule-drop", &ruleDrop},
277 {"rule-nxdomain", &ruleNXDomain},
278 {"rule-refused", &ruleRefused},
279 {"rule-servfail", &ruleServFail},
280 {"self-answered", &selfAnswered},
281 {"downstream-timeouts", &downstreamTimeouts},
282 {"downstream-send-errors", &downstreamSendErrors},
283 {"trunc-failures", &truncFail},
284 {"no-policy", &noPolicy},
285 {"latency0-1", &latency0_1},
286 {"latency1-10", &latency1_10},
287 {"latency10-50", &latency10_50},
288 {"latency50-100", &latency50_100},
289 {"latency100-1000", &latency100_1000},
290 {"latency-slow", &latencySlow},
291 {"latency-avg100", &latencyAvg100},
292 {"latency-avg1000", &latencyAvg1000},
293 {"latency-avg10000", &latencyAvg10000},
294 {"latency-avg1000000", &latencyAvg1000000},
295 {"uptime", uptimeOfProcess},
296 {"real-memory-usage", getRealMemoryUsage},
297 {"special-memory-usage", getSpecialMemoryUsage},
298 {"udp-in-errors", boost::bind(udpErrorStats, "udp-in-errors")},
299 {"udp-noport-errors", boost::bind(udpErrorStats, "udp-noport-errors")},
300 {"udp-recvbuf-errors", boost::bind(udpErrorStats, "udp-recvbuf-errors")},
301 {"udp-sndbuf-errors", boost::bind(udpErrorStats, "udp-sndbuf-errors")},
302 {"noncompliant-queries", &nonCompliantQueries},
303 {"noncompliant-responses", &nonCompliantResponses},
304 {"rdqueries", &rdQueries},
305 {"empty-queries", &emptyQueries},
306 {"cache-hits", &cacheHits},
307 {"cache-misses", &cacheMisses},
308 {"cpu-iowait", getCPUIOWait},
309 {"cpu-steal", getCPUSteal},
310 {"cpu-sys-msec", getCPUTimeSystem},
311 {"cpu-user-msec", getCPUTimeUser},
312 {"fd-usage", getOpenFileDescriptors},
313 {"dyn-blocked", &dynBlocked},
314 {"dyn-block-nmg-size", [](const std::string&) { return g_dynblockNMG.getLocal()->size(); }},
315 {"security-status", &securityStatus},
316 // Latency histogram
317 {"latency-sum", &latencySum},
318 {"latency-count", getLatencyCount},
319 };
320 };
321
322 extern struct DNSDistStats g_stats;
323 void doLatencyStats(double udiff);
324
325
326 struct StopWatch
327 {
328 StopWatch(bool realTime=false): d_needRealTime(realTime)
329 {
330 }
331 struct timespec d_start{0,0};
332 bool d_needRealTime{false};
333
334 void start() {
335 if(gettime(&d_start, d_needRealTime) < 0)
336 unixDie("Getting timestamp");
337
338 }
339
340 void set(const struct timespec& from) {
341 d_start = from;
342 }
343
344 double udiff() const {
345 struct timespec now;
346 if(gettime(&now, d_needRealTime) < 0)
347 unixDie("Getting timestamp");
348
349 return 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
350 }
351
352 double udiffAndSet() {
353 struct timespec now;
354 if(gettime(&now, d_needRealTime) < 0)
355 unixDie("Getting timestamp");
356
357 auto ret= 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
358 d_start = now;
359 return ret;
360 }
361
362 };
363
364 class BasicQPSLimiter
365 {
366 public:
367 BasicQPSLimiter()
368 {
369 }
370
371 BasicQPSLimiter(unsigned int burst): d_tokens(burst)
372 {
373 d_prev.start();
374 }
375
376 bool check(unsigned int rate, unsigned int burst) const // this is not quite fair
377 {
378 auto delta = d_prev.udiffAndSet();
379
380 if(delta > 0.0) // time, frequently, does go backwards..
381 d_tokens += 1.0 * rate * (delta/1000000.0);
382
383 if(d_tokens > burst) {
384 d_tokens = burst;
385 }
386
387 bool ret=false;
388 if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
389 ret=true;
390 --d_tokens;
391 }
392
393 return ret;
394 }
395
396 bool seenSince(const struct timespec& cutOff) const
397 {
398 return cutOff < d_prev.d_start;
399 }
400
401 protected:
402 mutable StopWatch d_prev;
403 mutable double d_tokens;
404 };
405
406 class QPSLimiter : public BasicQPSLimiter
407 {
408 public:
409 QPSLimiter(): BasicQPSLimiter()
410 {
411 }
412
413 QPSLimiter(unsigned int rate, unsigned int burst): BasicQPSLimiter(burst), d_rate(rate), d_burst(burst), d_passthrough(false)
414 {
415 d_prev.start();
416 }
417
418 unsigned int getRate() const
419 {
420 return d_passthrough ? 0 : d_rate;
421 }
422
423 int getPassed() const
424 {
425 return d_passed;
426 }
427
428 int getBlocked() const
429 {
430 return d_blocked;
431 }
432
433 bool check() const // this is not quite fair
434 {
435 if (d_passthrough) {
436 return true;
437 }
438
439 bool ret = BasicQPSLimiter::check(d_rate, d_burst);
440 if (ret) {
441 d_passed++;
442 }
443 else {
444 d_blocked++;
445 }
446
447 return ret;
448 }
449 private:
450 mutable unsigned int d_passed{0};
451 mutable unsigned int d_blocked{0};
452 unsigned int d_rate;
453 unsigned int d_burst;
454 bool d_passthrough{true};
455 };
456
457 struct ClientState;
458
459 struct IDState
460 {
461 IDState(): sentTime(true), delayMsec(0), tempFailureTTL(boost::none) { origDest.sin4.sin_family = 0;}
462 IDState(const IDState& orig): origRemote(orig.origRemote), origDest(orig.origDest), age(orig.age)
463 {
464 usageIndicator.store(orig.usageIndicator.load());
465 origFD = orig.origFD;
466 origID = orig.origID;
467 delayMsec = orig.delayMsec;
468 tempFailureTTL = orig.tempFailureTTL;
469 }
470
471 static const int64_t unusedIndicator = -1;
472
473 static bool isInUse(int64_t usageIndicator)
474 {
475 return usageIndicator != unusedIndicator;
476 }
477
478 bool isInUse() const
479 {
480 return usageIndicator != unusedIndicator;
481 }
482
483 /* return true if the value has been successfully replaced meaning that
484 no-one updated the usage indicator in the meantime */
485 bool tryMarkUnused(int64_t expectedUsageIndicator)
486 {
487 return usageIndicator.compare_exchange_strong(expectedUsageIndicator, unusedIndicator);
488 }
489
490 /* mark as unused no matter what, return true if the state was in use before */
491 bool markAsUsed()
492 {
493 auto currentGeneration = generation++;
494 return markAsUsed(currentGeneration);
495 }
496
497 /* mark as unused no matter what, return true if the state was in use before */
498 bool markAsUsed(int64_t currentGeneration)
499 {
500 int64_t oldUsage = usageIndicator.exchange(currentGeneration);
501 return oldUsage != unusedIndicator;
502 }
503
504 /* We use this value to detect whether this state is in use.
505 For performance reasons we don't want to use a lock here, but that means
506 we need to be very careful when modifying this value. Modifications happen
507 from:
508 - one of the UDP or DoH 'client' threads receiving a query, selecting a backend
509 then picking one of the states associated to this backend (via the idOffset).
510 Most of the time this state should not be in use and usageIndicator is -1, but we
511 might not yet have received a response for the query previously associated to this
512 state, meaning that we will 'reuse' this state and erase the existing state.
513 If we ever receive a response for this state, it will be discarded. This is
514 mostly fine for UDP except that we still need to be careful in order to miss
515 the 'outstanding' counters, which should only be increased when we are picking
516 an empty state, and not when reusing ;
517 For DoH, though, we have dynamically allocated a DOHUnit object that needs to
518 be freed, as well as internal objects internals to libh2o.
519 - one of the UDP receiver threads receiving a response from a backend, picking
520 the corresponding state and sending the response to the client ;
521 - the 'healthcheck' thread scanning the states to actively discover timeouts,
522 mostly to keep some counters like the 'outstanding' one sane.
523 We previously based that logic on the origFD (FD on which the query was received,
524 and therefore from where the response should be sent) but this suffered from an
525 ABA problem since it was quite likely that a UDP 'client thread' would reset it to the
526 same value since we only have so much incoming sockets:
527 - 1/ 'client' thread gets a query and set origFD to its FD, say 5 ;
528 - 2/ 'receiver' thread gets a response, read the value of origFD to 5, check that the qname,
529 qtype and qclass match
530 - 3/ during that time the 'client' thread reuses the state, setting again origFD to 5 ;
531 - 4/ the 'receiver' thread uses compare_exchange_strong() to only replace the value if it's still
532 5, except it's not the same 5 anymore and it overrides a fresh state.
533 We now use a 32-bit unsigned counter instead, which is incremented every time the state is set,
534 wrapping around if necessary, and we set an atomic signed 64-bit value, so that we still have -1
535 when the state is unused and the value of our counter otherwise.
536 */
537 std::atomic<int64_t> usageIndicator{unusedIndicator}; // set to unusedIndicator to indicate this state is empty // 8
538 std::atomic<uint32_t> generation{0}; // increased every time a state is used, to be able to detect an ABA issue // 4
539 ComboAddress origRemote; // 28
540 ComboAddress origDest; // 28
541 StopWatch sentTime; // 16
542 DNSName qname; // 80
543 std::shared_ptr<DNSCryptQuery> dnsCryptQuery{nullptr};
544 #ifdef HAVE_PROTOBUF
545 boost::optional<boost::uuids::uuid> uniqueId;
546 #endif
547 boost::optional<Netmask> subnet{boost::none};
548 std::shared_ptr<DNSDistPacketCache> packetCache{nullptr};
549 std::shared_ptr<QTag> qTag{nullptr};
550 const ClientState* cs{nullptr};
551 DOHUnit* du{nullptr};
552 uint32_t cacheKey; // 4
553 uint32_t cacheKeyNoECS; // 4
554 uint16_t age; // 4
555 uint16_t qtype; // 2
556 uint16_t qclass; // 2
557 uint16_t origID; // 2
558 uint16_t origFlags; // 2
559 int origFD{-1};
560 int delayMsec;
561 boost::optional<uint32_t> tempFailureTTL;
562 bool ednsAdded{false};
563 bool ecsAdded{false};
564 bool skipCache{false};
565 bool destHarvested{false}; // if true, origDest holds the original dest addr, otherwise the listening addr
566 bool dnssecOK{false};
567 bool useZeroScope;
568 };
569
570 typedef std::unordered_map<string, unsigned int> QueryCountRecords;
571 typedef std::function<std::tuple<bool, string>(const DNSQuestion* dq)> QueryCountFilter;
572 struct QueryCount {
573 QueryCount()
574 {
575 pthread_rwlock_init(&queryLock, nullptr);
576 }
577 ~QueryCount()
578 {
579 pthread_rwlock_destroy(&queryLock);
580 }
581 QueryCountRecords records;
582 QueryCountFilter filter;
583 pthread_rwlock_t queryLock;
584 bool enabled{false};
585 };
586
587 extern QueryCount g_qcount;
588
589 struct ClientState
590 {
591 ClientState(const ComboAddress& local_, bool isTCP_, bool doReusePort, int fastOpenQueue, const std::string& itfName, const std::set<int>& cpus_): cpus(cpus_), local(local_), interface(itfName), fastOpenQueueSize(fastOpenQueue), tcp(isTCP_), reuseport(doReusePort)
592 {
593 }
594
595 std::set<int> cpus;
596 ComboAddress local;
597 std::shared_ptr<DNSCryptContext> dnscryptCtx{nullptr};
598 std::shared_ptr<TLSFrontend> tlsFrontend{nullptr};
599 std::shared_ptr<DOHFrontend> dohFrontend{nullptr};
600 std::string interface;
601 std::atomic<uint64_t> queries{0};
602 mutable std::atomic<uint64_t> responses{0};
603 std::atomic<uint64_t> tcpDiedReadingQuery{0};
604 std::atomic<uint64_t> tcpDiedSendingResponse{0};
605 std::atomic<uint64_t> tcpGaveUp{0};
606 std::atomic<uint64_t> tcpClientTimeouts{0};
607 std::atomic<uint64_t> tcpDownstreamTimeouts{0};
608 std::atomic<uint64_t> tcpCurrentConnections{0};
609 std::atomic<uint64_t> tlsNewSessions{0}; // A new TLS session has been negotiated, no resumption
610 std::atomic<uint64_t> tlsResumptions{0}; // A TLS session has been resumed, either via session id or via a TLS ticket
611 std::atomic<uint64_t> tlsUnknownTicketKey{0}; // A TLS ticket has been presented but we don't have the associated key (might have expired)
612 std::atomic<uint64_t> tlsInactiveTicketKey{0}; // A TLS ticket has been successfully resumed but the key is no longer active, we should issue a new one
613 std::atomic<uint64_t> tls10queries{0}; // valid DNS queries received via TLSv1.0
614 std::atomic<uint64_t> tls11queries{0}; // valid DNS queries received via TLSv1.1
615 std::atomic<uint64_t> tls12queries{0}; // valid DNS queries received via TLSv1.2
616 std::atomic<uint64_t> tls13queries{0}; // valid DNS queries received via TLSv1.3
617 std::atomic<uint64_t> tlsUnknownqueries{0}; // valid DNS queries received via unknown TLS version
618 std::atomic<double> tcpAvgQueriesPerConnection{0.0};
619 /* in ms */
620 std::atomic<double> tcpAvgConnectionDuration{0.0};
621 int udpFD{-1};
622 int tcpFD{-1};
623 int fastOpenQueueSize{0};
624 bool muted{false};
625 bool tcp;
626 bool reuseport;
627 bool ready{false};
628
629 int getSocket() const
630 {
631 return udpFD != -1 ? udpFD : tcpFD;
632 }
633
634 bool isUDP() const
635 {
636 return udpFD != -1;
637 }
638
639 bool isTCP() const
640 {
641 return udpFD == -1;
642 }
643
644 bool hasTLS() const
645 {
646 return tlsFrontend != nullptr || dohFrontend != nullptr;
647 }
648
649 std::string getType() const
650 {
651 std::string result = udpFD != -1 ? "UDP" : "TCP";
652
653 if (dohFrontend) {
654 result += " (DNS over HTTPS)";
655 }
656 else if (tlsFrontend) {
657 result += " (DNS over TLS)";
658 }
659 else if (dnscryptCtx) {
660 result += " (DNSCrypt)";
661 }
662
663 return result;
664 }
665
666 #ifdef HAVE_EBPF
667 shared_ptr<BPFFilter> d_filter;
668
669 void detachFilter()
670 {
671 if (d_filter) {
672 d_filter->removeSocket(getSocket());
673 d_filter = nullptr;
674 }
675 }
676
677 void attachFilter(shared_ptr<BPFFilter> bpf)
678 {
679 detachFilter();
680
681 bpf->addSocket(getSocket());
682 d_filter = bpf;
683 }
684 #endif /* HAVE_EBPF */
685
686 void updateTCPMetrics(size_t nbQueries, uint64_t durationMs)
687 {
688 tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
689 tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
690 }
691 };
692
693 class TCPClientCollection {
694 std::vector<int> d_tcpclientthreads;
695 std::atomic<uint64_t> d_numthreads{0};
696 std::atomic<uint64_t> d_pos{0};
697 std::atomic<uint64_t> d_queued{0};
698 const uint64_t d_maxthreads{0};
699 std::mutex d_mutex;
700 int d_singlePipe[2];
701 const bool d_useSinglePipe;
702 public:
703
704 TCPClientCollection(size_t maxThreads, bool useSinglePipe=false): d_maxthreads(maxThreads), d_singlePipe{-1,-1}, d_useSinglePipe(useSinglePipe)
705
706 {
707 d_tcpclientthreads.reserve(maxThreads);
708
709 if (d_useSinglePipe) {
710 if (pipe(d_singlePipe) < 0) {
711 int err = errno;
712 throw std::runtime_error("Error creating the TCP single communication pipe: " + stringerror(err));
713 }
714
715 if (!setNonBlocking(d_singlePipe[0])) {
716 int err = errno;
717 close(d_singlePipe[0]);
718 close(d_singlePipe[1]);
719 throw std::runtime_error("Error setting the TCP single communication pipe non-blocking: " + stringerror(err));
720 }
721
722 if (!setNonBlocking(d_singlePipe[1])) {
723 int err = errno;
724 close(d_singlePipe[0]);
725 close(d_singlePipe[1]);
726 throw std::runtime_error("Error setting the TCP single communication pipe non-blocking: " + stringerror(err));
727 }
728 }
729 }
730 int getThread()
731 {
732 uint64_t pos = d_pos++;
733 ++d_queued;
734 return d_tcpclientthreads[pos % d_numthreads];
735 }
736 bool hasReachedMaxThreads() const
737 {
738 return d_numthreads >= d_maxthreads;
739 }
740 uint64_t getThreadsCount() const
741 {
742 return d_numthreads;
743 }
744 uint64_t getQueuedCount() const
745 {
746 return d_queued;
747 }
748 void decrementQueuedCount()
749 {
750 --d_queued;
751 }
752 void addTCPClientThread();
753 };
754
755 extern std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
756
757 struct DownstreamState
758 {
759 typedef std::function<std::tuple<DNSName, uint16_t, uint16_t>(const DNSName&, uint16_t, uint16_t, dnsheader*)> checkfunc_t;
760
761 DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, const std::string& sourceItfName, size_t numberOfSockets, bool connect);
762 DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, std::string(), 1, true) {}
763 ~DownstreamState()
764 {
765 for (auto& fd : sockets) {
766 if (fd >= 0) {
767 close(fd);
768 fd = -1;
769 }
770 }
771 pthread_rwlock_destroy(&d_lock);
772 }
773 boost::uuids::uuid id;
774 std::set<unsigned int> hashes;
775 mutable pthread_rwlock_t d_lock;
776 std::vector<int> sockets;
777 const std::string sourceItfName;
778 std::mutex socketsLock;
779 std::mutex connectLock;
780 std::unique_ptr<FDMultiplexer> mplexer{nullptr};
781 std::thread tid;
782 const ComboAddress remote;
783 QPSLimiter qps;
784 vector<IDState> idStates;
785 const ComboAddress sourceAddr;
786 checkfunc_t checkFunction;
787 DNSName checkName{"a.root-servers.net."};
788 QType checkType{QType::A};
789 uint16_t checkClass{QClass::IN};
790 std::atomic<uint64_t> idOffset{0};
791 std::atomic<uint64_t> sendErrors{0};
792 std::atomic<uint64_t> outstanding{0};
793 std::atomic<uint64_t> reuseds{0};
794 std::atomic<uint64_t> queries{0};
795 std::atomic<uint64_t> responses{0};
796 struct {
797 std::atomic<uint64_t> sendErrors{0};
798 std::atomic<uint64_t> reuseds{0};
799 std::atomic<uint64_t> queries{0};
800 } prev;
801 std::atomic<uint64_t> tcpDiedSendingQuery{0};
802 std::atomic<uint64_t> tcpDiedReadingResponse{0};
803 std::atomic<uint64_t> tcpGaveUp{0};
804 std::atomic<uint64_t> tcpReadTimeouts{0};
805 std::atomic<uint64_t> tcpWriteTimeouts{0};
806 std::atomic<uint64_t> tcpCurrentConnections{0};
807 std::atomic<double> tcpAvgQueriesPerConnection{0.0};
808 /* in ms */
809 std::atomic<double> tcpAvgConnectionDuration{0.0};
810 size_t socketsOffset{0};
811 double queryLoad{0.0};
812 double dropRate{0.0};
813 double latencyUsec{0.0};
814 int order{1};
815 int weight{1};
816 int tcpConnectTimeout{5};
817 int tcpRecvTimeout{30};
818 int tcpSendTimeout{30};
819 unsigned int checkInterval{1};
820 unsigned int lastCheck{0};
821 const unsigned int sourceItf{0};
822 uint16_t retries{5};
823 uint16_t xpfRRCode{0};
824 uint16_t checkTimeout{1000}; /* in milliseconds */
825 uint8_t currentCheckFailures{0};
826 uint8_t consecutiveSuccessfulChecks{0};
827 uint8_t maxCheckFailures{1};
828 uint8_t minRiseSuccesses{1};
829 StopWatch sw;
830 set<string> pools;
831 enum class Availability { Up, Down, Auto} availability{Availability::Auto};
832 bool mustResolve{false};
833 bool upStatus{false};
834 bool useECS{false};
835 bool setCD{false};
836 bool disableZeroScope{false};
837 std::atomic<bool> connected{false};
838 std::atomic_flag threadStarted;
839 bool tcpFastOpen{false};
840 bool ipBindAddrNoPort{true};
841
842 bool isUp() const
843 {
844 if(availability == Availability::Down)
845 return false;
846 if(availability == Availability::Up)
847 return true;
848 return upStatus;
849 }
850 void setUp() { availability = Availability::Up; }
851 void setDown() { availability = Availability::Down; }
852 void setAuto() { availability = Availability::Auto; }
853 string getName() const {
854 return name;
855 }
856 string getNameWithAddr() const {
857 return nameWithAddr;
858 }
859 void setName(const std::string& newName)
860 {
861 name = newName;
862 if (newName.empty()) {
863 nameWithAddr = newName.empty() ? remote.toStringWithPort() : (name + " (" + remote.toStringWithPort()+ ")");
864 }
865 }
866
867 string getStatus() const
868 {
869 string status;
870 if(availability == DownstreamState::Availability::Up)
871 status = "UP";
872 else if(availability == DownstreamState::Availability::Down)
873 status = "DOWN";
874 else
875 status = (upStatus ? "up" : "down");
876 return status;
877 }
878 bool reconnect();
879 void hash();
880 void setId(const boost::uuids::uuid& newId);
881 void setWeight(int newWeight);
882
883 void updateTCPMetrics(size_t nbQueries, uint64_t durationMs)
884 {
885 tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
886 tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
887 }
888 private:
889 std::string name;
890 std::string nameWithAddr;
891 };
892 using servers_t =vector<std::shared_ptr<DownstreamState>>;
893
894 void responderThread(std::shared_ptr<DownstreamState> state);
895 extern std::mutex g_luamutex;
896 extern LuaContext g_lua;
897 extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
898
899 class DNSRule
900 {
901 public:
902 virtual ~DNSRule ()
903 {
904 }
905 virtual bool matches(const DNSQuestion* dq) const =0;
906 virtual string toString() const = 0;
907 mutable std::atomic<uint64_t> d_matches{0};
908 };
909
910 struct dnsdist_ffi_servers_list_t;
911 struct dnsdist_ffi_server_t;
912 struct dnsdist_ffi_dnsquestion_t;
913
914 struct ServerPolicy
915 {
916 template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
917 using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
918 typedef std::function<shared_ptr<DownstreamState>(const NumberedServerVector& servers, const DNSQuestion*)> policyfunc_t;
919 typedef std::function<unsigned int(dnsdist_ffi_servers_list_t* servers, dnsdist_ffi_dnsquestion_t* dq)> ffipolicyfunc_t;
920
921 ServerPolicy(const std::string& name_, policyfunc_t policy_, bool isLua_): name(name_), policy(policy_), isLua(isLua_)
922 {
923 }
924 ServerPolicy(const std::string& name_, ffipolicyfunc_t policy_): name(name_), ffipolicy(policy_), isLua(true), isFFI(true)
925 {
926 }
927 ServerPolicy()
928 {
929 }
930
931 string name;
932 policyfunc_t policy;
933 ffipolicyfunc_t ffipolicy;
934 bool isLua{false};
935 bool isFFI{false};
936
937 std::string toString() const {
938 return string("ServerPolicy") + (isLua ? " (Lua)" : "") + " \"" + name + "\"";
939 }
940 };
941
942 struct ServerPool
943 {
944 ServerPool()
945 {
946 pthread_rwlock_init(&d_lock, nullptr);
947 }
948 ~ServerPool()
949 {
950 pthread_rwlock_destroy(&d_lock);
951 }
952
953 const std::shared_ptr<DNSDistPacketCache> getCache() const { return packetCache; };
954
955 bool getECS() const
956 {
957 return d_useECS;
958 }
959
960 void setECS(bool useECS)
961 {
962 d_useECS = useECS;
963 }
964
965 std::shared_ptr<DNSDistPacketCache> packetCache{nullptr};
966 std::shared_ptr<ServerPolicy> policy{nullptr};
967
968 size_t countServers(bool upOnly)
969 {
970 size_t count = 0;
971 ReadLock rl(&d_lock);
972 for (const auto& server : d_servers) {
973 if (!upOnly || std::get<1>(server)->isUp() ) {
974 count++;
975 }
976 }
977 return count;
978 }
979
980 ServerPolicy::NumberedServerVector getServers()
981 {
982 ServerPolicy::NumberedServerVector result;
983 {
984 ReadLock rl(&d_lock);
985 result = d_servers;
986 }
987 return result;
988 }
989
990 void addServer(shared_ptr<DownstreamState>& server)
991 {
992 WriteLock wl(&d_lock);
993 unsigned int count = (unsigned int) d_servers.size();
994 d_servers.push_back(make_pair(++count, server));
995 /* we need to reorder based on the server 'order' */
996 std::stable_sort(d_servers.begin(), d_servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
997 return a.second->order < b.second->order;
998 });
999 /* and now we need to renumber for Lua (custom policies) */
1000 size_t idx = 1;
1001 for (auto& serv : d_servers) {
1002 serv.first = idx++;
1003 }
1004 }
1005
1006 void removeServer(shared_ptr<DownstreamState>& server)
1007 {
1008 WriteLock wl(&d_lock);
1009 size_t idx = 1;
1010 bool found = false;
1011 for (auto it = d_servers.begin(); it != d_servers.end();) {
1012 if (found) {
1013 /* we need to renumber the servers placed
1014 after the removed one, for Lua (custom policies) */
1015 it->first = idx++;
1016 it++;
1017 }
1018 else if (it->second == server) {
1019 it = d_servers.erase(it);
1020 found = true;
1021 } else {
1022 idx++;
1023 it++;
1024 }
1025 }
1026 }
1027
1028 private:
1029 ServerPolicy::NumberedServerVector d_servers;
1030 pthread_rwlock_t d_lock;
1031 bool d_useECS{false};
1032 };
1033 using pools_t=map<std::string,std::shared_ptr<ServerPool>>;
1034 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy);
1035 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
1036 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server);
1037
1038 struct CarbonConfig
1039 {
1040 ComboAddress server;
1041 std::string namespace_name;
1042 std::string ourname;
1043 std::string instance_name;
1044 unsigned int interval;
1045 };
1046
1047 enum ednsHeaderFlags {
1048 EDNS_HEADER_FLAG_NONE = 0,
1049 EDNS_HEADER_FLAG_DO = 32768
1050 };
1051
1052 struct DNSDistRuleAction
1053 {
1054 std::shared_ptr<DNSRule> d_rule;
1055 std::shared_ptr<DNSAction> d_action;
1056 boost::uuids::uuid d_id;
1057 uint64_t d_creationOrder;
1058 };
1059
1060 struct DNSDistResponseRuleAction
1061 {
1062 std::shared_ptr<DNSRule> d_rule;
1063 std::shared_ptr<DNSResponseAction> d_action;
1064 boost::uuids::uuid d_id;
1065 uint64_t d_creationOrder;
1066 };
1067
1068 extern GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
1069 extern DNSAction::Action g_dynBlockAction;
1070
1071 extern GlobalStateHolder<vector<CarbonConfig> > g_carbon;
1072 extern GlobalStateHolder<ServerPolicy> g_policy;
1073 extern GlobalStateHolder<servers_t> g_dstates;
1074 extern GlobalStateHolder<pools_t> g_pools;
1075 extern GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
1076 extern GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
1077 extern GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
1078 extern GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
1079 extern GlobalStateHolder<NetmaskGroup> g_ACL;
1080
1081 extern ComboAddress g_serverControl; // not changed during runtime
1082
1083 extern std::vector<std::tuple<ComboAddress, bool, bool, int, std::string, std::set<int>>> g_locals; // not changed at runtime (we hope XXX)
1084 extern std::vector<shared_ptr<TLSFrontend>> g_tlslocals;
1085 extern std::vector<shared_ptr<DOHFrontend>> g_dohlocals;
1086 extern std::vector<std::unique_ptr<ClientState>> g_frontends;
1087 extern bool g_truncateTC;
1088 extern bool g_fixupCase;
1089 extern int g_tcpRecvTimeout;
1090 extern int g_tcpSendTimeout;
1091 extern int g_udpTimeout;
1092 extern uint16_t g_maxOutstanding;
1093 extern std::atomic<bool> g_configurationDone;
1094 extern uint64_t g_maxTCPClientThreads;
1095 extern uint64_t g_maxTCPQueuedConnections;
1096 extern size_t g_maxTCPQueriesPerConn;
1097 extern size_t g_maxTCPConnectionDuration;
1098 extern size_t g_maxTCPConnectionsPerClient;
1099 extern std::atomic<uint16_t> g_cacheCleaningDelay;
1100 extern std::atomic<uint16_t> g_cacheCleaningPercentage;
1101 extern uint32_t g_staleCacheEntriesTTL;
1102 extern bool g_apiReadWrite;
1103 extern std::string g_apiConfigDirectory;
1104 extern bool g_servFailOnNoPolicy;
1105 extern uint32_t g_hashperturb;
1106 extern bool g_useTCPSinglePipe;
1107 extern uint16_t g_downstreamTCPCleanupInterval;
1108 extern size_t g_udpVectorSize;
1109 extern bool g_preserveTrailingData;
1110 extern bool g_allowEmptyResponse;
1111 extern bool g_roundrobinFailOnNoServer;
1112 extern double g_consistentHashBalancingFactor;
1113
1114 #ifdef HAVE_EBPF
1115 extern shared_ptr<BPFFilter> g_defaultBPFFilter;
1116 extern std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
1117 #endif /* HAVE_EBPF */
1118
1119 struct LocalHolders
1120 {
1121 LocalHolders(): acl(g_ACL.getLocal()), policy(g_policy.getLocal()), rulactions(g_rulactions.getLocal()), cacheHitRespRulactions(g_cachehitresprulactions.getLocal()), selfAnsweredRespRulactions(g_selfansweredresprulactions.getLocal()), servers(g_dstates.getLocal()), dynNMGBlock(g_dynblockNMG.getLocal()), dynSMTBlock(g_dynblockSMT.getLocal()), pools(g_pools.getLocal())
1122 {
1123 }
1124
1125 LocalStateHolder<NetmaskGroup> acl;
1126 LocalStateHolder<ServerPolicy> policy;
1127 LocalStateHolder<vector<DNSDistRuleAction> > rulactions;
1128 LocalStateHolder<vector<DNSDistResponseRuleAction> > cacheHitRespRulactions;
1129 LocalStateHolder<vector<DNSDistResponseRuleAction> > selfAnsweredRespRulactions;
1130 LocalStateHolder<servers_t> servers;
1131 LocalStateHolder<NetmaskTree<DynBlock> > dynNMGBlock;
1132 LocalStateHolder<SuffixMatchTree<DynBlock> > dynSMTBlock;
1133 LocalStateHolder<pools_t> pools;
1134 };
1135
1136 struct dnsheader;
1137
1138 void controlThread(int fd, ComboAddress local);
1139 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName);
1140 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName);
1141 ServerPolicy::NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName);
1142
1143 std::shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1144
1145 std::shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1146 std::shared_ptr<DownstreamState> wrandom(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1147 std::shared_ptr<DownstreamState> whashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1148 std::shared_ptr<DownstreamState> chashed(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1149 std::shared_ptr<DownstreamState> roundrobin(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq);
1150
1151 struct WebserverConfig
1152 {
1153 std::string password;
1154 std::string apiKey;
1155 boost::optional<std::map<std::string, std::string> > customHeaders;
1156 std::mutex lock;
1157 };
1158
1159 void setWebserverAPIKey(const boost::optional<std::string> apiKey);
1160 void setWebserverPassword(const std::string& password);
1161 void setWebserverCustomHeaders(const boost::optional<std::map<std::string, std::string> > customHeaders);
1162
1163 void dnsdistWebserverThread(int sock, const ComboAddress& local);
1164 void tcpAcceptorThread(void* p);
1165 #ifdef HAVE_DNS_OVER_HTTPS
1166 void dohThread(ClientState* cs);
1167 #endif /* HAVE_DNS_OVER_HTTPS */
1168
1169 void setLuaNoSideEffect(); // if nothing has been declared, set that there are no side effects
1170 void setLuaSideEffect(); // set to report a side effect, cancelling all _no_ side effect calls
1171 bool getLuaNoSideEffect(); // set if there were only explicit declarations of _no_ side effect
1172 void resetLuaSideEffect(); // reset to indeterminate state
1173
1174 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed);
1175 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted);
1176 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop);
1177
1178 bool checkQueryHeaders(const struct dnsheader* dh);
1179
1180 extern std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
1181 int handleDNSCryptQuery(char* packet, uint16_t len, std::shared_ptr<DNSCryptQuery> query, uint16_t* decryptedQueryLen, bool tcp, time_t now, std::vector<uint8_t>& response);
1182 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp);
1183
1184 bool addXPF(DNSQuestion& dq, uint16_t optionCode);
1185
1186 uint16_t getRandomDNSID();
1187
1188 #include "dnsdist-snmp.hh"
1189
1190 extern bool g_snmpEnabled;
1191 extern bool g_snmpTrapsEnabled;
1192 extern DNSDistSNMPAgent* g_snmpAgent;
1193 extern bool g_addEDNSToSelfGeneratedResponses;
1194
1195 extern std::set<std::string> g_capabilitiesToRetain;
1196 static const uint16_t s_udpIncomingBufferSize{1500}; // don't accept UDP queries larger than this value
1197 static const size_t s_maxPacketCacheEntrySize{4096}; // don't cache responses larger than this value
1198
1199 enum class ProcessQueryResult { Drop, SendAnswer, PassToBackend };
1200 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend);
1201
1202 DNSResponse makeDNSResponseFromIDState(IDState& ids, struct dnsheader* dh, size_t bufferSize, uint16_t responseLen, bool isTCP);
1203 void setIDStateFromDNSQuestion(IDState& ids, DNSQuestion& dq, DNSName&& qname);
1204
1205 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state);
1206 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck=false);