#if USE_ADAPTATION
#include "adaptation/AccessCheck.h"
#include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
#endif
// implemented in client_side_reply.cc until sides have a common parent
, adaptationAccessCheckPending(false)
, startedAdaptation(false)
#endif
+ ,theVirginReply(NULL),
+ theFinalReply(NULL)
{
fwd = theFwdState;
entry = fwd->entry;
return;
}
#endif
- handleMoreRequestBodyAvailable();
+ if (requestBodySource == bp)
+ handleMoreRequestBodyAvailable();
}
// the entire request or adapted response body was provided, successfully
return;
}
#endif
- handleRequestBodyProductionEnded();
+ if (requestBodySource == bp)
+ handleRequestBodyProductionEnded();
}
// premature end of the request or adapted response body production
return;
}
#endif
- handleRequestBodyProducerAborted();
+ if (requestBodySource == bp)
+ handleRequestBodyProducerAborted();
}
handleAdaptationAborted(!final);
}
-// more adapted response body is available
void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
{
- const size_t contentSize = adaptedBodySource->buf().contentSize();
+ if (abortOnBadEntry("store entry aborted while kick producer callback"))
+ return;
- debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
- "response body at offset " << adaptedBodySource->consumedSize());
+ if (!adaptedBodySource)
+ return;
+
+ handleMoreAdaptedBodyAvailable();
+
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+}
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
if (abortOnBadEntry("entry refuses adapted body"))
return;
assert(entry);
+
+ size_t contentSize = adaptedBodySource->buf().contentSize();
+
+ if (!contentSize)
+ return; // XXX: bytesWanted asserts on zero-size ranges
+
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ServerStateData> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+ Dialer(this, &ServerStateData::resumeBodyStorage));
+ entry->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1) {
+ debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+ return;
+ }
+
+ if (spaceAvailable < contentSize ) {
+ debugs(11, 5, HERE << "postponing storage of " <<
+ (contentSize - spaceAvailable) << " body bytes");
+ contentSize = spaceAvailable;
+ }
+
+ debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
- currentOffset += bpc.buf.size;
+ const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+ currentOffset += ioBuf.length;
entry->write(ioBuf);
bpc.buf.consume(contentSize);
bpc.checkIn();
void
ServerStateData::handleAdaptedBodyProductionEnded()
{
- stopConsumingFrom(adaptedBodySource);
-
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;
+ // end consumption if we consumed everything
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+ // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+ stopConsumingFrom(adaptedBodySource);
handleAdaptationCompleted();
}
void handleAdaptationCompleted();
void handleAdaptationAborted(bool bypassable = false);
+
+ /// called by StoreEntry when it has more buffer space available
+ void resumeBodyStorage();
+ /// called when the entire adapted response body is consumed
+ void endAdaptedBodyConsumption();
#endif
protected:
virtual void lock();
virtual void release();
+#if USE_ADAPTATION
+ /// call back producer when more buffer space is available
+ void deferProducer(const AsyncCall::Pointer &producer);
+ /// calls back producer registered with deferProducer
+ void kickProducer();
+#endif
+
private:
static MemAllocator *pool;
+#if USE_ADAPTATION
+ /// producer callback registered with deferProducer
+ AsyncCall::Pointer deferredProducer;
+#endif
+
bool validLength() const;
bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const;
};
flags.error = 0;
}
+ StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) :
+ length(anLength),
+ offset (anOffset),
+ data(aMemBuf->content()) {
+ flags.error = 0;
+ }
+
Range<int64_t> range() const {
return Range<int64_t>(offset, offset + length);
}
handleAdaptationFailure(!final);
}
+void
+ClientHttpRequest::resumeBodyStorage()
+{
+ if (!adaptedBodySource)
+ return;
+
+ noteMoreBodyDataAvailable(adaptedBodySource);
+}
+
void
ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
{
assert(request_satisfaction_mode);
assert(adaptedBodySource != NULL);
- if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ if (size_t contentSize = adaptedBodySource->buf().contentSize()) {
+ // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+ // We have to add 1 to avoid suspending forever.
+ const size_t bytesWanted = storeEntry()->bytesWanted(Range<size_t>(0,contentSize));
+ const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ClientHttpRequest> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage",
+ Dialer(this, &ClientHttpRequest::resumeBodyStorage));
+ storeEntry()->deferProducer(call);
+ }
+
+ // XXX: bytesWanted API does not allow us to write just one byte!
+ if (!spaceAvailable && contentSize > 1)
+ return;
+
+ if (spaceAvailable < contentSize )
+ contentSize = spaceAvailable;
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset);
+ const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize);
storeEntry()->write(ioBuf);
- // assume can write everything
- request_satisfaction_offset += contentSize;
+ // assume StoreEntry::write() writes the entire ioBuf
+ request_satisfaction_offset += ioBuf.length;
bpc.buf.consume(contentSize);
bpc.checkIn();
}
ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
{
assert(!virginHeadSource);
- if (adaptedBodySource != NULL) { // did not end request satisfaction yet
- // We do not expect more because noteMoreBodyDataAvailable always
- // consumes everything. We do not even have a mechanism to consume
- // leftovers after noteMoreBodyDataAvailable notifications seize.
- assert(adaptedBodySource->exhausted());
+ // should we end request satisfaction now?
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
endRequestSatisfaction();
- }
}
void
virtual void noteBodyProducerAborted(BodyPipe::Pointer);
void endRequestSatisfaction();
+ /// called by StoreEntry when it has more buffer space available
+ void resumeBodyStorage();
private:
CbcPointer<Adaptation::Initiate> virginHeadSource;
swap_dirn = -1;
}
+#if USE_ADAPTATION
+void
+StoreEntry::deferProducer(const AsyncCall::Pointer &producer)
+{
+ if (!deferredProducer)
+ deferredProducer = producer;
+ else
+ debugs(20, 5, HERE << "Deferred producer call is allready set to: " <<
+ *deferredProducer << ", requested call: " << *producer);
+}
+
+void
+StoreEntry::kickProducer()
+{
+ if (deferredProducer != NULL) {
+ ScheduleCallHere(deferredProducer);
+ deferredProducer = NULL;
+ }
+}
+#endif
+
void
StoreEntry::destroyMemObject()
{
copying = false;
storeClientCopy2(entry, this);
+
+#if USE_ADAPTATION
+ if (entry)
+ entry->kickProducer();
+#endif
}
/*
else
mem->kickReads();
+#if USE_ADAPTATION
+ e->kickProducer();
+#endif
+
return 1;
}
assert (aHeader.lowestOffset() == 0);
assert (aHeader.write (StoreIOBuffer()));
assert (aHeader.lowestOffset() == 0);
- assert (aHeader.write (StoreIOBuffer(0, 1, NULL)));
+ assert (aHeader.write (StoreIOBuffer(0, 1, (char *)NULL)));
assert (aHeader.lowestOffset() == 0);
char * sampleData = xstrdup ("A");
assert (aHeader.write (StoreIOBuffer(1, 100, sampleData)));