return now;
}
+void IncomingHTTP2Connection::updateIO(std::shared_ptr<IncomingTCPConnectionState>& conn, IOState newState, const timeval& now)
+{
+ (void)conn;
+ (void)now;
+ updateIO(newState, newState == IOState::NeedWrite ? handleWritableIOCallback : handleReadableIOCallback);
+}
+
void IncomingHTTP2Connection::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback)
{
boost::optional<struct timeval> ttd{boost::none};
+ if (newState == IOState::Async) {
+ auto shared = shared_from_this();
+ updateIOForAsync(shared);
+ return;
+ }
+
auto shared = std::dynamic_pointer_cast<IncomingHTTP2Connection>(shared_from_this());
if (!shared || !d_ioState) {
return;
void stopIO();
uint32_t getConcurrentStreamsCount() const;
+ void updateIO(std::shared_ptr<IncomingTCPConnectionState>& conn, IOState newState, const timeval& now) override;
void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback);
void handleIOError();
bool sendResponse(StreamID streamID, PendingQuery& context, uint16_t responseCode, const HeadersMap& customResponseHeaders, const std::string& contentType = "", bool addContentType = true);
static void handleIOCallback(int desc, FDMultiplexer::funcparam_t& param);
static void handleAsyncReady(int desc, FDMultiplexer::funcparam_t& param);
- static void updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now);
static void queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend);
static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write);
virtual void handleIO();
+ virtual void updateIO(std::shared_ptr<IncomingTCPConnectionState>& conn, IOState newState, const timeval& now);
+ void updateIOForAsync(std::shared_ptr<IncomingTCPConnectionState>& conn);
QueryProcessingResult handleQuery(PacketBuffer&& query, const struct timeval& now, std::optional<int32_t> streamID);
virtual void handleResponse(const struct timeval& now, TCPResponse&& response) override;
// for the same reason we need to update the state right away, nobody will do that for us
if (state->active()) {
- updateIO(state, iostate, now);
+ state->updateIO(state, iostate, now);
// if we have not finished reading every available byte, we _need_ to do an actual read
// attempt before waiting for the socket to become readable again, because if there is
// buffered data available the socket might never become readable again.
}
}
+void IncomingTCPConnectionState::updateIOForAsync(std::shared_ptr<IncomingTCPConnectionState>& conn)
+{
+ auto fds = conn->d_handler.getAsyncFDs();
+ for (const auto desc : fds) {
+ conn->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, conn);
+ }
+ conn->d_ioState->update(IOState::Done, handleIOCallback, conn);
+}
+
void IncomingTCPConnectionState::updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now)
{
if (newState == IOState::Async) {
- auto fds = state->d_handler.getAsyncFDs();
- for (const auto desc : fds) {
- state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
- }
- state->d_ioState->update(IOState::Done, handleIOCallback, state);
- }
- else {
- state->d_ioState->update(newState, handleIOCallback, state, newState == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
+ updateIOForAsync(state);
+ return;
}
+
+ state->d_ioState->update(newState, handleIOCallback, state, newState == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
}
/* called from the backend code when a new response has been received */