void
HttpStateData::noteDelayAwareReadChance()
{
- flags.do_next_read = true;
+ waitingForDelayAwareReadChance = false;
maybeReadVirginBody();
}
void
HttpStateData::readReply(const CommIoCbParams &io)
{
- Must(!flags.do_next_read); // XXX: should have been set false by mayReadVirginBody()
- flags.do_next_read = false;
-
debugs(11, 5, io.conn);
+ waitingForCommRead = false;
// Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us
if (io.flag == Comm::ERR_CLOSING) {
const auto readSizeWanted = readSizeMax ? entry->bytesWanted(Range<size_t>(0, readSizeMax)) : 0;
if (readSizeWanted <= 0) {
- delayRead();
+ // XXX: If we avoid Comm::ReadNow(), we should not Comm::Read() again
+ // when the wait is over. We should go straight to readReply() instead.
+
+#if USE_ADAPTATION
+ // XXX: We are duplicating Client::calcBufferSpaceToReserve() logic.
+ // XXX: Some other delayRead() cases may lack kickReads() guarantees.
+ // TODO: Refactor maybeMakeSpaceAvailable() to properly treat each
+ // no-read case instead of calling delayRead() for the remaining cases.
+
+ if (responseBodyBuffer) {
+ debugs(11, 5, "avoid delayRead() to give adaptation a chance to drain overflow buffer: " << responseBodyBuffer->contentSize());
+ return; // wait for Client::noteMoreBodySpaceAvailable()
+ }
+
+ if (virginBodyDestination && virginBodyDestination->buf().hasContent()) {
+ debugs(11, 5, "avoid delayRead() to give adaptation a chance to drain body pipe buffer: " << virginBodyDestination->buf().contentSize());
+ return; // wait for Client::noteMoreBodySpaceAvailable()
+ }
+#endif
+
+ delayRead(); /// wait for Client::noteDelayAwareReadChance()
return;
}
case Comm::INPROGRESS:
if (inBuf.isEmpty())
debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
- flags.do_next_read = true;
maybeReadVirginBody();
return;
case Comm::ENDFILE: // close detected by 0-byte read
eof = 1;
- flags.do_next_read = false;
/* Continue to process previously read data */
break;
const auto err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request, fwd->al);
err->xerrno = rd.xerrno;
fwd->fail(err);
- flags.do_next_read = false;
closeServer();
mustStop("HttpStateData::readReply");
return;
if (!flags.headers_parsed && !eof) {
debugs(11, 9, "needs more at " << inBuf.length());
- flags.do_next_read = true;
/** \retval false If we have not finished parsing the headers and may get more data.
* Schedules more reads to retrieve the missing data.
*/
assert(error != ERR_NONE);
entry->reset();
fwd->fail(new ErrorState(error, Http::scBadGateway, fwd->request, fwd->al));
- flags.do_next_read = false;
closeServer();
mustStop("HttpStateData::continueAfterParsingHeader");
return false; // quit on error
addVirginReplyBody(decodedData.content(), decodedData.contentSize());
if (doneParsing) {
lastChunk = 1;
- flags.do_next_read = false;
markParsedVirginReplyAsWhole("http parsed last-chunk");
} else if (eof) {
markPrematureReplyBodyEofFailure();
HttpStateData::processReplyBody()
{
if (!flags.headers_parsed) {
- flags.do_next_read = true;
maybeReadVirginBody();
return;
}
if (entry->isAccepting()) {
if (flags.chunked) {
if (!decodeAndWriteReplyBody()) {
- flags.do_next_read = false;
serverComplete();
return;
}
} else {
commSetConnTimeout(serverConnection, Config.Timeout.read, nil);
}
-
- flags.do_next_read = true;
}
break;
// TODO: Remove serverConnectionSaved but preserve exception safety.
commUnsetConnTimeout(serverConnection);
- flags.do_next_read = false;
comm_remove_close_handler(serverConnection->fd, closeHandler);
closeHandler = nullptr;
void
HttpStateData::maybeReadVirginBody()
{
- // too late to read
- if (!Comm::IsConnOpen(serverConnection) || fd_table[serverConnection->fd].closing())
+ if (!Comm::IsConnOpen(serverConnection) || fd_table[serverConnection->fd].closing()) {
+ debugs(11, 3, "no, peer connection gone");
return;
+ }
+
+ if (eof) {
+ // tolerate hypothetical calls between Comm::ENDFILE and closeServer()
+ debugs(11, 3, "no, saw EOF");
+ return;
+ }
+
+ if (lastChunk) {
+ // tolerate hypothetical calls between setting lastChunk and clearing serverConnection
+ debugs(11, 3, "no, saw last-chunk");
+ return;
+ }
if (!canBufferMoreReplyBytes()) {
abortTransaction("more response bytes required, but the read buffer is full and cannot be drained");
return;
}
- // XXX: get rid of the do_next_read flag
- // check for the proper reasons preventing read(2)
- if (!flags.do_next_read)
+ if (waitingForDelayAwareReadChance) {
+ debugs(11, 5, "no, still waiting for noteDelayAwareReadChance()");
return;
+ }
- flags.do_next_read = false;
+ if (waitingForCommRead) {
+ debugs(11, 3, "no, already waiting for readReply()");
+ return;
+ }
- // must not already be waiting for read(2) ...
assert(!Comm::MonitorsRead(serverConnection->fd));
// wait for read(2) to be possible.
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply);
Comm::Read(serverConnection, call);
+ waitingForCommRead = true;
}
/// Desired inBuf capacity based on various capacity preferences/limits:
AsyncCall::Pointer timeoutCall = JobCallback(11, 5,
TimeoutDialer, this, HttpStateData::httpTimeout);
commSetConnTimeout(serverConnection, Config.Timeout.lifetime, timeoutCall);
- flags.do_next_read = true;
maybeReadVirginBody();
if (request->body_pipe != nullptr) {