DOHServerConfig(uint32_t idleTimeout, uint32_t internalPipeBufferSize): accept_ctx(std::make_shared<DOHAcceptContext>())
{
int fd[2];
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
if (pipe(fd) < 0) {
unixDie("Creating a pipe for DNS over HTTPS");
}
if (internalPipeBufferSize > 0) {
setPipeBufferSize(dohquerypair[0], internalPipeBufferSize);
}
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
if (pipe(fd) < 0) {
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
close(dohquerypair[0]);
close(dohquerypair[1]);
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
unixDie("Creating a pipe for DNS over HTTPS");
}
std::shared_ptr<DOHAcceptContext> accept_ctx{nullptr};
ClientState* cs{nullptr};
std::shared_ptr<DOHFrontend> df{nullptr};
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
int dohquerypair[2]{-1,-1};
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
int dohresponsepair[2]{-1,-1};
};
auto sender = std::make_shared<DoHTCPCrossQuerySender>(*query.d_idstate.cs);
return sender;
}
+
+ DOHUnitUniquePtr&& releaseDU()
+ {
+ return std::move(query.d_idstate.du);
+ }
};
/*
We are not in the main DoH thread but in the DoH 'client' thread.
*/
-static void processDOHQuery(DOHUnitUniquePtr&& unit)
+static void processDOHQuery(DOHUnitUniquePtr&& unit, bool inMainThread = false)
{
+ const auto handleImmediateResponse = [inMainThread](DOHUnitUniquePtr&& du, const char* reason) {
+ if (inMainThread) {
+ handleResponse(*du->dsc->df, du->req, du->status_code, du->response, du->dsc->df->d_customResponseHeaders, du->contentType, true);
+ /* so the unique pointer is stored in the InternalState which itself is stored in the unique pointer itself. We likely need
+ a better design, but for now let's just reset the internal one since we know it is no longer needed. */
+ du->ids.du.reset();
+ }
+ else {
+ sendDoHUnitToTheMainThread(std::move(du), reason);
+ }
+ };
+
auto& ids = unit->ids;
ids.du = std::move(unit);
auto& du = ids.du;
// but we should be fine as long as we don't touch du->req
// outside of the main DoH thread
du->status_code = 500;
- sendDoHUnitToTheMainThread(std::move(du), "DoH killed in flight");
+ handleImmediateResponse(std::move(du), "DoH killed in flight");
return;
}
+
+ {
+ // if there was no EDNS, we add it with a large buffer size
+ // so we can use UDP to talk to the backend.
+ auto dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data()));
+
+ if (!dh->arcount) {
+ if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) {
+ dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data())); // may have reallocated
+ dh->arcount = htons(1);
+ du->ids.ednsAdded = true;
+ }
+ }
+ else {
+ // we leave existing EDNS in place
+ }
+ }
+
remote = du->ids.origRemote;
DOHServerConfig* dsc = du->dsc;
auto& holders = dsc->holders;
++g_stats.nonCompliantQueries;
++cs.nonCompliantQueries;
du->status_code = 400;
- sendDoHUnitToTheMainThread(std::move(du), "DoH non-compliant query");
+ handleImmediateResponse(std::move(du), "DoH non-compliant query");
return;
}
if (!checkQueryHeaders(dh, cs)) {
du->status_code = 400;
- sendDoHUnitToTheMainThread(std::move(du), "DoH invalid headers");
+ handleImmediateResponse(std::move(du), "DoH invalid headers");
return;
}
dh->qr = true;
du->response = std::move(du->query);
- sendDoHUnitToTheMainThread(std::move(du), "DoH empty query");
+ handleImmediateResponse(std::move(du), "DoH empty query");
return;
}
if (result == ProcessQueryResult::Drop) {
du->status_code = 403;
- sendDoHUnitToTheMainThread(std::move(du), "DoH dropped query");
+ handleImmediateResponse(std::move(du), "DoH dropped query");
return;
}
handleResponseSent(ids.qname, QType(ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoH, dnsdist::Protocol::DoH);
}
- sendDoHUnitToTheMainThread(std::move(du), "DoH self-answered response");
+ handleImmediateResponse(std::move(du), "DoH self-answered response");
return;
}
if (result != ProcessQueryResult::PassToBackend) {
du->status_code = 500;
- sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available");
+ handleImmediateResponse(std::move(du), "DoH no backend available");
return;
}
auto downstream = du->downstream;
if (downstream == nullptr) {
du->status_code = 502;
- sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available");
+ handleImmediateResponse(std::move(du), "DoH no backend available");
return;
}
return;
}
else {
- cpq->handleInternalError();
+ if (inMainThread) {
+ du = cpq->releaseDU();
+ du->status_code = 502;
+ handleImmediateResponse(std::move(du), "DoH internal error");
+ }
+ else {
+ cpq->handleInternalError();
+ }
return;
}
}
ComboAddress dest = dq.ids.origDest;
if (!assignOutgoingUDPQueryToBackend(downstream, htons(queryId), dq, du->query, dest)) {
- du->status_code = 500;
- sendDoHUnitToTheMainThread(std::move(du), "DoH internal error");
+ du->status_code = 502;
+ handleImmediateResponse(std::move(du), "DoH internal error");
return;
}
}
catch (const std::exception& e) {
vinfolog("Got an error in DOH question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
du->status_code = 500;
- sendDoHUnitToTheMainThread(std::move(du), "DoH internal error");
+ handleImmediateResponse(std::move(du), "DoH internal error");
return;
}
du->sni = sni;
}
#endif /* HAVE_H2O_SOCKET_GET_SSL_SERVER_NAME */
-
du->self = reinterpret_cast<DOHUnit**>(h2o_mem_alloc_shared(&req->pool, sizeof(*self), on_generator_dispose));
auto ptr = du.release();
*(ptr->self) = ptr;
+
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+ processDOHQuery(DOHUnitUniquePtr(ptr, DOHUnit::release), true);
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
try {
static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail");
ssize_t sent = write(dsc->dohquerypair[0], &ptr, sizeof(ptr));
ptr->release();
}
}
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
}
catch (const std::exception& e) {
vinfolog("Had error parsing DoH DNS packet from %s: %s", remote.toStringWithPort(), e.what());
contentType = contentType_;
}
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
/* query has been parsed by h2o, which called doh_handler() in the main DoH thread.
- In order not to blockfor long, doh_handler() called doh_dispatch_query() which allocated
+ In order not to block for long, doh_handler() called doh_dispatch_query() which allocated
a DOHUnit object and passed it to us */
static void dnsdistclient(int qsock)
{
continue;
}
- // if there was no EDNS, we add it with a large buffer size
- // so we can use UDP to talk to the backend.
- auto dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data()));
-
- if (!dh->arcount) {
- if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) {
- dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data())); // may have reallocated
- dh->arcount = htons(1);
- du->ids.ednsAdded = true;
- }
- }
- else {
- // we leave existing EDNS in place
- }
-
- processDOHQuery(std::move(du));
+ processDOHQuery(std::move(du), false);
}
catch (const std::exception& e) {
errlog("Error while processing query received over DoH: %s", e.what());
}
}
}
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
/* Called in the main DoH thread if h2o finds that dnsdist gave us an answer by writing into
the dohresponsepair[0] side of the pipe so from:
dsc->df = cs->dohFrontend;
dsc->h2o_config.server_name = h2o_iovec_init(df->d_serverTokens.c_str(), df->d_serverTokens.size());
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
std::thread dnsdistThread(dnsdistclient, dsc->dohquerypair[1]);
dnsdistThread.detach(); // gets us better error reporting
+#endif
setThreadName("dnsdist/doh");
// I wonder if this registers an IP address.. I think it does