class DoHConnectionToBackend : public ConnectionToBackend
{
public:
+ using StreamID = int32_t;
+
DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
void handleTimeout(const struct timeval& now, bool write) override;
private:
static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
- static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data);
- static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data);
+ static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, StreamID stream_id, const uint8_t* data, size_t len, void* user_data);
+ static int on_stream_close_callback(nghttp2_session* session, StreamID stream_id, uint32_t error_code, void* user_data);
static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
static const std::unordered_map<std::string, std::string> s_constants;
- std::unordered_map<int32_t, PendingRequest> d_currentStreams;
+ std::unordered_map<StreamID, PendingRequest> d_currentStreams;
std::string d_proxyProtocolPayload;
PacketBuffer d_out;
PacketBuffer d_in;
pending.d_query = std::move(query);
pending.d_sender = std::move(sender);
- uint32_t streamId = nghttp2_session_get_next_stream_id(d_session.get());
+ uint32_t tentativeStreamId = nghttp2_session_get_next_stream_id(d_session.get());
+ if (tentativeStreamId == static_cast<uint32_t>(1 << 31)) {
+ /* running out of stream IDs */
+ d_connectionDied = true;
+ nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
+ throw std::runtime_error("No more stream IDs");
+ }
+
+ auto streamId = static_cast<StreamID>(tentativeStreamId);
auto insertPair = d_currentStreams.insert({streamId, std::move(pending)});
if (!insertPair.second) {
/* there is a stream ID collision, something is very wrong! */
nghttp2_data_provider data_provider;
data_provider.source.ptr = this;
- data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
+ data_provider.read_callback = [](nghttp2_session* session, StreamID stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
(void)session;
(void)source;
auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
}
if (!d_inIOCallback) {
- auto rv = nghttp2_session_send(d_session.get());
- if (rv != 0) {
+ auto rtv = nghttp2_session_send(d_session.get());
+ if (rtv != 0) {
d_connectionDied = true;
++d_ds->tcpDiedSendingQuery;
d_currentStreams.erase(streamId);
- throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
+ throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rtv));
}
}
return 0;
}
-int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data)
+int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, StreamID stream_id, const uint8_t* data, size_t len, void* user_data)
{
(void)session;
(void)flags;
return 0;
}
-int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
+int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, StreamID stream_id, uint32_t error_code, void* user_data)
{
(void)session;
DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);