return o.str();
}
+ bool reachedMaxStreamID() const;
bool canBeReused() const override;
+ /* full now but will become usable later */
+ bool willBeReusable() const;
void setHealthCheck(bool h)
{
void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback);
void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback);
void stopIO();
+
void handleResponse(PendingRequest&& request);
void handleResponseError(PendingRequest&& request, const struct timeval& now);
void handleIOError();
size_t d_inPos{0};
uint32_t d_highestStreamID{0};
bool d_healthCheckQuery{false};
+ bool d_firstWrite{true};
};
class DownstreamDoHConnectionsManager
void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
{
+ if (write) {
+ if (d_firstWrite) {
+ ++d_ds->tcpConnectTimeouts;
+ }
+ else {
+ ++d_ds->tcpWriteTimeouts;
+ }
+ }
+ else {
+ ++d_ds->tcpReadTimeouts;
+ }
+
handleIOError();
}
+bool DoHConnectionToBackend::reachedMaxStreamID() const
+{
+ const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
+ return d_highestStreamID == maximumStreamID;
+}
+
bool DoHConnectionToBackend::canBeReused() const
{
if (d_connectionDied) {
return false;
}
- const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
- if (d_highestStreamID == maximumStreamID) {
+ if (reachedMaxStreamID()) {
return false;
}
return true;
}
+bool DoHConnectionToBackend::willBeReusable() const
+{
+ if (!d_connectionDied && d_proxyProtocolPayload.empty() && !reachedMaxStreamID()) {
+ return true;
+ }
+
+ return false;
+}
+
const std::unordered_map<std::string, std::string> DoHConnectionToBackend::s_constants = {
{"method-name", ":method"},
{"method-value", "POST"},
void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
{
- // cerr<<"in "<<__PRETTY_FUNCTION__<<" with query ID "<<ntohs(dh->id)<<endl;
auto payloadSize = std::to_string(query.d_buffer.size());
bool addXForwarded = d_ds->d_addXForwardedHeaders;
auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
if (newStreamId < 0) {
d_connectionDied = true;
+ ++d_ds->tcpDiedSendingQuery;
d_currentStreams.erase(streamId);
throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
}
- // cerr<<"stream ID is "<<newStreamId<<" for a query of size "<<payloadSize<<endl;
auto rv = nghttp2_session_send(d_session.get());
if (rv != 0) {
d_connectionDied = true;
+ ++d_ds->tcpDiedSendingQuery;
d_currentStreams.erase(streamId);
throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
}
if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
}
+
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ conn->d_lastDataReceivedTime = now;
+
// cerr<<"after read send"<<endl;
nghttp2_session_send(conn->d_session.get());
}
}
catch (const std::exception& e) {
vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
+ ++conn->d_ds->tcpDiedReadingResponse;
conn->handleIOError();
break;
}
}
else if (newState == IOState::Done) {
// cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
- ++conn->d_queries;
+ conn->d_firstWrite = false;
conn->d_out.clear();
conn->d_outPos = 0;
conn->stopIO();
}
catch (const std::exception& e) {
vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
+ ++conn->d_ds->tcpDiedSendingQuery;
conn->handleIOError();
}
}
else if (newState == IOState::NeedRead) {
ttd = getBackendReadTTD(now);
}
- else if (isFresh() && d_queries == 0) {
+ else if (isFresh() && d_firstWrite) {
/* first write just after the non-blocking connect */
ttd = getBackendConnectTTD(now);
}
if (state == IOState::NeedRead) {
ttd = getBackendReadTTD(now);
}
- else if (isFresh() && d_queries == 0) {
+ else if (isFresh() && d_firstWrite == 0) {
/* first write just after the non-blocking connect */
ttd = getBackendConnectTTD(now);
}
auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
// cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
if (state == IOState::Done) {
- ++conn->d_queries;
+ conn->d_firstWrite = false;
conn->d_out.clear();
conn->d_outPos = 0;
conn->stopIO();
catch (const std::exception& e) {
vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
conn->handleIOError();
+ ++conn->d_ds->tcpDiedSendingQuery;
}
}
if (stream != conn->d_currentStreams.end()) {
// cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
stream->second.d_finished = true;
+ ++conn->d_queries;
auto request = std::move(stream->second);
conn->d_currentStreams.erase(stream->first);
else {
vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
if (stream == conn->d_currentStreams.end()) {
vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
// cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
auto stream = conn->d_currentStreams.find(stream_id);
if (stream == conn->d_currentStreams.end()) {
catch (...) {
vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
conn->d_connectionDied = true;
+ ++conn->d_ds->tcpDiedReadingResponse;
return 0;
}
nghttp2_session_callbacks* cbs = nullptr;
if (nghttp2_session_callbacks_new(&cbs) != 0) {
d_connectionDied = true;
+ ++d_ds->tcpDiedSendingQuery;
vinfolog("Unable to create a callback object for a new HTTP/2 session");
return;
}
nghttp2_session* sess = nullptr;
if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
d_connectionDied = true;
+ ++d_ds->tcpDiedSendingQuery;
vinfolog("Coult not allocate a new HTTP/2 session");
return;
}
int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
if (rv != 0) {
d_connectionDied = true;
+ ++d_ds->tcpDiedSendingQuery;
vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
return;
}
void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
{
+ if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
+ return;
+ }
+ t_nextCleanup = now.tv_sec + s_cleanupInterval;
+
struct timeval freshCutOff = now;
freshCutOff.tv_sec -= 1;
for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) {
for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) {
if (!(*connIt)) {
- ++connIt;
+ connIt = dsIt->second.erase(connIt);
continue;
}
auto backendId = ds->getID();
- if (s_cleanupInterval > 0 && (t_nextCleanup == 0 || t_nextCleanup <= now.tv_sec)) {
- t_nextCleanup = now.tv_sec + s_cleanupInterval;
- //cerr<<"cleaning up"<<endl;
- cleanupClosedConnections(now);
- }
+ cleanupClosedConnections(now);
const bool haveProxyProtocol = !proxyProtocolPayload.empty();
if (!haveProxyProtocol) {
for (auto listIt = list.begin(); listIt != list.end();) {
auto& entry = *listIt;
if (!entry->canBeReused()) {
- listIt = list.erase(listIt);
+ if (!entry->willBeReusable()) {
+ listIt = list.erase(listIt);
+ }
+ else {
+ ++listIt;
+ }
continue;
}
entry->setReused();
setThreadName("dnsdist/dohClie");
DoHClientThreadData data;
-
data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data);
struct timeval now;
if (now.tv_sec > lastTimeoutScan) {
lastTimeoutScan = now.tv_sec;
+ DownstreamDoHConnectionsManager::cleanupClosedConnections(now);
handleH2Timeouts(*data.mplexer, now);
if (g_dohStatesDumpRequested > 0) {
if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
delete tmp;
+ ++g_stats.outgoingDoHQueryPipeFull;
tmp = nullptr;
return false;
}
#ifdef HAVE_NGHTTP2
if (g_outgoingDoHWorkerThreads > 0) {
g_dohClientThreads = std::make_unique<DoHClientCollection>(g_outgoingDoHWorkerThreads);
- g_dohClientThreads->addThread();
+ for (size_t idx = 0; idx < g_outgoingDoHWorkerThreads; idx++) {
+ g_dohClientThreads->addThread();
+ }
}
return true;
#else