d_healthCheckQuery = h;
}
+ void stopIO();
+
private:
static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
bool d_finished{false};
};
void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback);
- void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback);
- void stopIO();
-
+ void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD=false);
+ void watchForRemoteHostClosingConnection();
void handleResponse(PendingRequest&& request);
void handleResponseError(PendingRequest&& request, const struct timeval& now);
void handleIOError();
if (newState == IOState::Done) {
if (conn->getConcurrentStreamsCount() == 0) {
conn->stopIO();
+ conn->watchForRemoteHostClosingConnection();
ioGuard.release();
break;
}
if (conn->getConcurrentStreamsCount() > 0) {
conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
}
+ else {
+ conn->watchForRemoteHostClosingConnection();
+ }
}
ioGuard.release();
}
}
}
-void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback)
+void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
{
struct timeval now;
gettimeofday(&now, nullptr);
boost::optional<struct timeval> ttd{boost::none};
- if (d_healthCheckQuery) {
- ttd = getBackendHealthCheckTTD(now);
- }
- else if (newState == IOState::NeedRead) {
- ttd = getBackendReadTTD(now);
- }
- else if (isFresh() && d_firstWrite) {
- /* first write just after the non-blocking connect */
- ttd = getBackendConnectTTD(now);
- }
- else {
- ttd = getBackendWriteTTD(now);
+ if (!noTTD) {
+ if (d_healthCheckQuery) {
+ ttd = getBackendHealthCheckTTD(now);
+ }
+ else if (newState == IOState::NeedRead) {
+ ttd = getBackendReadTTD(now);
+ }
+ else if (isFresh() && d_firstWrite) {
+ /* first write just after the non-blocking connect */
+ ttd = getBackendConnectTTD(now);
+ }
+ else {
+ ttd = getBackendWriteTTD(now);
+ }
}
auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
}
}
+void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
+{
+ if (willBeReusable() && !d_healthCheckQuery) {
+ updateIO(IOState::NeedRead, handleReadableIOCallback, false);
+ }
+}
+
void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback)
{
struct timeval now;
if (conn->getConcurrentStreamsCount() > 0) {
conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
}
+ else {
+ conn->watchForRemoteHostClosingConnection();
+ }
}
else {
conn->updateIO(state, handleWritableIOCallback);
}
#endif
+ if (frame->hd.type == NGHTTP2_GOAWAY) {
+ conn->d_connectionDied = true;
+ }
+
/* is this the last frame for this stream? */
if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
conn->handleResponseError(std::move(request), now);
}
+
if (conn->getConcurrentStreamsCount() == 0) {
conn->stopIO();
+ conn->watchForRemoteHostClosingConnection();
}
}
else {
stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
if (stream->second.d_finished) {
- //cerr<<"we now have the full response!"<<endl;
- //cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
+ // cerr<<"we now have the full response!"<<endl;
+ // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
auto request = std::move(stream->second);
conn->d_currentStreams.erase(stream->first);
}
if (conn->getConcurrentStreamsCount() == 0) {
conn->stopIO();
+ conn->watchForRemoteHostClosingConnection();
}
}
- else {
- //cerr<<"but the stream is not finished yet"<<endl;
- }
return 0;
}
if (conn->getConcurrentStreamsCount() == 0) {
//cerr<<"stopping IO"<<endl;
conn->stopIO();
- //cerr<<"our current refcnt is now "<<conn->getUsageCount()<<endl;
+ conn->watchForRemoteHostClosingConnection();
}
return 0;
size_t result = 0;
for (const auto& backend : t_downstreamConnections) {
result += backend.second.size();
+ for (auto& conn : backend.second) {
+ conn->stopIO();
+ }
}
t_downstreamConnections.clear();
return result;
void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
{
+ //cerr<<"cleanup interval is "<<s_cleanupInterval<<", next cleanup is "<<t_nextCleanup<<", now is "<<now.tv_sec<<endl;
if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
return;
}
+
t_nextCleanup = now.tv_sec + s_cleanupInterval;
struct timeval freshCutOff = now;
return os;
}
-struct DOHConnection
+class DOHConnection
{
+public:
DOHConnection(bool needProxyProtocol) :
d_session(std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(nullptr, nghttp2_session_del)), d_needProxyProtocol(needProxyProtocol)
{
int rv = nghttp2_submit_response(d_session.get(), streamId, hdrs, sizeof(hdrs) / sizeof(*hdrs), &dataProvider);
// cerr<<"Submitting response for stream ID "<<streamId<<": "<<rv<<endl;
BOOST_CHECK_EQUAL(rv, 0);
+ /* just in case, see if we have anything to send */
+ rv = nghttp2_session_send(d_session.get());
+ BOOST_CHECK_EQUAL(rv, 0);
}
void submitError(uint32_t streamId, uint16_t status, const std::string& msg)
int rv = nghttp2_submit_response(d_session.get(), streamId, hdrs, sizeof(hdrs) / sizeof(*hdrs), nullptr);
BOOST_CHECK_EQUAL(rv, 0);
+ /* just in case, see if we have anything to send */
+ rv = nghttp2_session_send(d_session.get());
+ BOOST_CHECK_EQUAL(rv, 0);
}
void submitGoAway()
{
int rv = nghttp2_submit_goaway(d_session.get(), NGHTTP2_FLAG_NONE, 0, NGHTTP2_INTERNAL_ERROR, nullptr, 0);
BOOST_CHECK_EQUAL(rv, 0);
+ /* just in case, see if we have anything to send */
+ rv = nghttp2_session_send(d_session.get());
+ BOOST_CHECK_EQUAL(rv, 0);
}
private:
static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
{
- //DOHConnection* conn = reinterpret_cast<DOHConnection*>(user_data);
-
if (error_code == 0) {
return 0;
}
dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
}},
/* read settings, headers and response from the server */
- {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ /* set the outgoing descriptor (backend connection) as NOT ready anymore */
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+ }},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ s_connectionBuffers.at(desc)->submitGoAway();
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+ }},
+ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
/* read settings, headers and responses from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ s_connectionBuffers.at(desc)->submitGoAway();
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+ }},
+ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
queries.push_back({std::move(sender), std::move(internalQuery)});
}
+ bool firstQueryDone = false;
s_steps = {
{ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
/* opening */
/* read settings, headers and responses from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&firstQueryDone](int desc, const ExpectedStep& step) {
+ firstQueryDone = true;
+ }},
/* headers */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ }},
/* data */
{ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
/* set the outgoing descriptor (backend connection) as ready */
- dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
}},
/* read settings, headers and responses from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ /* later the backend sends a go away frame */
+ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ s_connectionBuffers.at(desc)->submitGoAway();
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
bool result = sendH2Query(backend, s_mplexer, sliced, std::move(query.second), false);
BOOST_CHECK_EQUAL(result, true);
- while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+ while (!firstQueryDone && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
s_mplexer->run(&now);
}
BOOST_CHECK_EQUAL(query.first->d_valid, true);
+ BOOST_CHECK_EQUAL(firstQueryDone, true);
}
{
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
{ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ /* try to read, the backend says to go away */
+ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+ s_connectionBuffers.at(desc)->submitGoAway();
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
queries.push_back({std::move(sender), std::move(internalQuery)});
}
+ bool done = false;
s_steps = {
{ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
/* opening */
/* read settings, headers and responses from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+ /* mark backend as not ready */
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+ done = true;
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
BOOST_CHECK_EQUAL(result, true);
}
- while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+ while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
s_mplexer->run(&now);
}
queries.push_back({std::move(sender), std::move(internalQuery)});
}
+ bool done = false;
s_steps = {
{ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
/* opening */
/* read settings, headers and responses (second attempt) */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+ /* mark backend as not ready */
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+ done = true;
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
BOOST_CHECK_EQUAL(result, true);
}
- while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+ while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
s_mplexer->run(&now);
}
queries.push_back({std::move(sender), std::move(internalQuery)});
}
+ bool done = false;
s_steps = {
{ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
/* opening */
/* read settings, headers and response from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+ /* mark backend as not ready */
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+ done = true;
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
BOOST_CHECK_EQUAL(result, true);
}
- while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+ while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
s_mplexer->run(&now);
}
queries.push_back({std::move(sender), std::move(internalQuery)});
}
+ bool done = false;
s_steps = {
{ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
/* opening */
/* read settings, headers and responses from the server */
{ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
/* acknowledge settings */
- {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+ /* mark backend as not ready */
+ dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+ done = true;
+ }},
{ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
};
BOOST_CHECK_EQUAL(result, true);
}
- while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+ while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
s_mplexer->run(&now);
}
for (auto& query : queries) {
BOOST_CHECK_EQUAL(query.first->d_valid, true);
}
+
+ BOOST_CHECK_EQUAL(clearH2Connections(), 0U);
}
BOOST_AUTO_TEST_SUITE_END();