*/
static void on_dnsdist(h2o_socket_t *listener, const char *err)
{
- DOHUnit *du = nullptr;
- DOHServerConfig* dsc = reinterpret_cast<DOHServerConfig*>(listener->data);
- ssize_t got = read(dsc->dohresponsepair[1], &du, sizeof(du));
-
- if (got < 0) {
- warnlog("Error reading a DOH internal response: %s", strerror(errno));
- return;
- }
- else if (static_cast<size_t>(got) != sizeof(du)) {
- return;
- }
+ /* we want to read as many responses from the pipe as possible before
+ giving up. Even if we are overloaded and fighting with the DoH connections
+ for the CPU, the first thing we need to do is to send responses to free slots
+ anyway. */
+ while (true) {
+ DOHUnit *du = nullptr;
+ DOHServerConfig* dsc = reinterpret_cast<DOHServerConfig*>(listener->data);
+ ssize_t got = read(dsc->dohresponsepair[1], &du, sizeof(du));
+
+ if (got < 0) {
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ warnlog("Error reading a DOH internal response: %s", strerror(errno));
+ }
+ return;
+ }
+ else if (static_cast<size_t>(got) != sizeof(du)) {
+ return;
+ }
- if (!du->req) { // it got killed in flight
- du->self = nullptr;
- du->release();
- return;
- }
+ if (!du->req) { // it got killed in flight
+ du->self = nullptr;
+ du->release();
+ continue;
+ }
- if (!du->tcp && du->truncated && du->response.size() > sizeof(dnsheader)) {
- /* restoring the original ID */
- dnsheader* queryDH = reinterpret_cast<struct dnsheader*>(du->query.data() + du->proxyProtocolPayloadSize);
- queryDH->id = du->ids.origID;
+ if (!du->tcp && du->truncated && du->response.size() > sizeof(dnsheader)) {
+ /* restoring the original ID */
+ dnsheader* queryDH = reinterpret_cast<struct dnsheader*>(du->query.data() + du->proxyProtocolPayloadSize);
+ queryDH->id = du->ids.origID;
- auto cpq = std::make_unique<DoHCrossProtocolQuery>(du);
+ auto cpq = std::make_unique<DoHCrossProtocolQuery>(du);
- du->get();
- du->tcp = true;
- du->truncated = false;
+ du->get();
+ du->tcp = true;
+ du->truncated = false;
- if (g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq))) {
- return;
- }
- else {
- du->release();
+ if (g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq))) {
+ continue;
+ }
+ else {
+ du->release();
+ continue;
+ }
}
- }
- if (du->self) {
- // we are back in the h2o main thread now, so we don't risk
- // a race (h2o killing the query) when accessing du->req anymore
- *du->self = nullptr; // so we don't clean up again in on_generator_dispose
- du->self = nullptr;
- }
+ if (du->self) {
+ // we are back in the h2o main thread now, so we don't risk
+ // a race (h2o killing the query) when accessing du->req anymore
+ *du->self = nullptr; // so we don't clean up again in on_generator_dispose
+ du->self = nullptr;
+ }
- handleResponse(*dsc->df, du->req, du->status_code, du->response, dsc->df->d_customResponseHeaders, du->contentType, true);
+ handleResponse(*dsc->df, du->req, du->status_code, du->response, dsc->df->d_customResponseHeaders, du->contentType, true);
- du->release();
+ du->release();
+ }
}
/* called when a TCP connection has been accepted, the TLS session has not been established */