#include "adaptation/AccessCheck.h"
#include "adaptation/Answer.h"
#include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
#endif
// implemented in client_side_reply.cc until sides have a common parent
adaptationAccessCheckPending(false),
startedAdaptation(false),
#endif
- receivedWholeRequestBody(false)
+ receivedWholeRequestBody(false),
+ theVirginReply(NULL),
+ theFinalReply(NULL)
{
fwd = theFwdState;
entry = fwd->entry;
return;
}
#endif
- handleMoreRequestBodyAvailable();
+ if (requestBodySource == bp)
+ handleMoreRequestBodyAvailable();
}
// the entire request or adapted response body was provided, successfully
return;
}
#endif
- handleRequestBodyProductionEnded();
+ if (requestBodySource == bp)
+ handleRequestBodyProductionEnded();
}
// premature end of the request or adapted response body production
return;
}
#endif
- handleRequestBodyProducerAborted();
+ if (requestBodySource == bp)
+ handleRequestBodyProducerAborted();
}
}
}
-// more adapted response body is available
void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
{
- const size_t contentSize = adaptedBodySource->buf().contentSize();
+ if (abortOnBadEntry("store entry aborted while kick producer callback"))
+ return;
- debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
- "response body at offset " << adaptedBodySource->consumedSize());
+ if(!adaptedBodySource)
+ return;
+
+ handleMoreAdaptedBodyAvailable();
+
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+}
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
if (abortOnBadEntry("entry refuses adapted body"))
return;
assert(entry);
+
+ size_t contentSize = adaptedBodySource->buf().contentSize();
+ bool consumedPartially = false;
+
+ if (!contentSize)
+ return; // XXX: bytesWanted asserts on zero-size ranges
+
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ServerStateData> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+ Dialer(this, &ServerStateData::resumeBodyStorage));
+ entry->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1) {
+ debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+ return;
+ }
+
+ if (spaceAvailable < contentSize ) {
+ debugs(11, 5, HERE << "postponing storage of " <<
+ (contentSize - spaceAvailable) << " body bytes");
+ contentSize = spaceAvailable;
+ consumedPartially=true;
+ }
+
+ debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
- currentOffset += bpc.buf.size;
+ const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+ currentOffset += ioBuf.length;
entry->write(ioBuf);
bpc.buf.consume(contentSize);
bpc.checkIn();
void
ServerStateData::handleAdaptedBodyProductionEnded()
{
- stopConsumingFrom(adaptedBodySource);
-
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;
+
+ // end consumption if we consumed everything
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+ // else resumeBodyStorage() will eventually consume the rest
+}
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+ stopConsumingFrom(adaptedBodySource);
handleAdaptationCompleted();
}
AclMatchedName = NULL;
}
+void
+ClientHttpRequest::resumeBodyStorage()
+{
+ if(!adaptedBodySource)
+ return;
+
+ noteMoreBodyDataAvailable(adaptedBodySource);
+}
+
void
ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
{
assert(request_satisfaction_mode);
assert(adaptedBodySource != NULL);
- if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ if (size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = storeEntry()->bytesWanted(Range<size_t>(0,contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ClientHttpRequest> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage",
+ Dialer(this, &ClientHttpRequest::resumeBodyStorage));
+ storeEntry()->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1)
+ return;
+
+ if (spaceAvailable < contentSize )
+ contentSize = spaceAvailable;
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset);
+ const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize);
storeEntry()->write(ioBuf);
- // assume can write everything
- request_satisfaction_offset += contentSize;
+ // assume StoreEntry::write() writes the entire ioBuf
+ request_satisfaction_offset += ioBuf.length;
bpc.buf.consume(contentSize);
bpc.checkIn();
}
ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
{
assert(!virginHeadSource);
- if (adaptedBodySource != NULL) { // did not end request satisfaction yet
- // We do not expect more because noteMoreBodyDataAvailable always
- // consumes everything. We do not even have a mechanism to consume
- // leftovers after noteMoreBodyDataAvailable notifications seize.
- assert(adaptedBodySource->exhausted());
+ // should we end request satisfaction now?
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
endRequestSatisfaction();
- }
}
void