--- /dev/null
+
+#include "squid.h"
+#include "BodyPipe.h"
+
+CBDATA_CLASS_INIT(BodyPipe);
+
+// inform the pipe that we are done and clear the Pointer
+void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
+{
+ debugs(91,7, HERE << this << " will not produce for " << pipe <<
+ "; atEof: " << atEof);
+ assert(pipe != NULL); // be strict: the caller state may depend on this
+ pipe->clearProducer(atEof);
+ pipe = NULL;
+}
+
+// inform the pipe that we are done and clear the Pointer
+void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
+{
+ debugs(91,7, HERE << this << " will not consume from " << pipe);
+ assert(pipe != NULL); // be strict: the caller state may depend on this
+ pipe->clearConsumer();
+ pipe = NULL;
+}
+
+/* BodyPipe */
+
+BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
+ theProducer(aProducer), theConsumer(0),
+ thePutSize(0), theGetSize(0), mustAutoConsume(false), isCheckedOut(false)
+{
+ // TODO: teach MemBuf to start with zero minSize
+ // TODO: limit maxSize by theBodySize, when known?
+ theBuf.init(2*1024, MaxCapacity);
+ debugs(91,7, HERE << "created BodyPipe" << status());
+}
+
+BodyPipe::~BodyPipe()
+{
+ debugs(91,7, HERE << "destroying BodyPipe" << status());
+ assert(!theProducer);
+ assert(!theConsumer);
+ theBuf.clean();
+}
+
+void BodyPipe::setBodySize(size_t aBodySize)
+{
+ assert(!bodySizeKnown());
+ assert(aBodySize >= 0);
+ assert(thePutSize <= aBodySize);
+
+ // If this assert fails, we need to add code to check for eof and inform
+ // the consumer about the eof condition via scheduleBodyEndNotification,
+ // because just setting a body size limit may trigger the eof condition.
+ assert(!theConsumer);
+
+ theBodySize = aBodySize;
+ debugs(91,7, HERE << "set body size" << status());
+}
+
+size_t BodyPipe::bodySize() const
+{
+ assert(bodySizeKnown());
+ return static_cast<size_t>(theBodySize);
+}
+
+bool BodyPipe::expectMoreAfter(size_t offset) const
+{
+ assert(theGetSize <= offset);
+ return offset < thePutSize || // buffer has more now or
+ (!productionEnded() && mayNeedMoreData()); // buffer will have more
+}
+
+bool BodyPipe::exhausted() const
+{
+ return !expectMoreAfter(theGetSize);
+}
+
+size_t BodyPipe::unproducedSize() const
+{
+ return bodySize() - thePutSize; // bodySize() asserts that size is known
+}
+
+void
+BodyPipe::clearProducer(bool atEof)
+{
+ if (theProducer) {
+ debugs(91,7, HERE << "clearing BodyPipe producer" << status());
+ theProducer = NULL;
+ if (atEof) {
+ if (!bodySizeKnown())
+ theBodySize = thePutSize;
+ else
+ if (bodySize() != thePutSize)
+ debugs(91,1, HERE << "aborting on premature eof" << status());
+ } else {
+ // asserta that we can detect the abort if the consumer joins later
+ assert(!bodySizeKnown() || bodySize() != thePutSize);
+ }
+ scheduleBodyEndNotification();
+ }
+}
+
+size_t
+BodyPipe::putMoreData(const char *buf, size_t size)
+{
+ if (bodySizeKnown())
+ size = XMIN(size, unproducedSize());
+
+ const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
+ if ((size = XMIN(size, spaceSize))) {
+ theBuf.append(buf, size);
+ postAppend(size);
+ return size;
+ }
+ return 0;
+}
+
+bool
+BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
+{
+ assert(!theConsumer);
+ assert(aConsumer);
+
+ // TODO: convert this into an exception and remove IfNotLate suffix
+ // If there is something consumed already, we are in an auto-consuming mode
+ // and it is too late to attach a real consumer to the pipe.
+ if (theGetSize > 0) {
+ assert(mustAutoConsume);
+ return false;
+ }
+
+ theConsumer = aConsumer;
+ debugs(91,7, HERE << "set consumer" << status());
+ if (theBuf.hasContent())
+ AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
+ if (!theProducer)
+ scheduleBodyEndNotification();
+
+ return true;
+}
+
+void
+BodyPipe::clearConsumer() {
+ if (theConsumer) {
+ debugs(91,7, HERE << "clearing consumer" << status());
+ theConsumer = NULL;
+ if (!exhausted())
+ AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
+ }
+}
+
+size_t
+BodyPipe::getMoreData(MemBuf &buf)
+{
+ if (!theBuf.hasContent())
+ return 0; // did not touch the possibly uninitialized buf
+
+ if (buf.isNull())
+ buf.init();
+ const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
+ buf.append(theBuf.content(), size);
+ theBuf.consume(size);
+ postConsume(size);
+ return size; // cannot be zero if we called buf.init above
+}
+
+void
+BodyPipe::consume(size_t size)
+{
+ theBuf.consume(size);
+ postConsume(size);
+}
+
+void
+BodyPipe::enableAutoConsumption() {
+ mustAutoConsume = true;
+ debugs(91,5, HERE << "enabled auto consumption" << status());
+ if (!theConsumer && theBuf.hasContent())
+ AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
+}
+
+MemBuf &
+BodyPipe::checkOut() {
+ assert(!isCheckedOut);
+ isCheckedOut = true;
+ return theBuf;
+}
+
+void
+BodyPipe::checkIn(Checkout &checkout)
+{
+ assert(isCheckedOut);
+ isCheckedOut = false;
+ const size_t currentSize = theBuf.contentSize();
+ if (checkout.checkedOutSize > currentSize)
+ postConsume(checkout.checkedOutSize - currentSize);
+ else
+ if (checkout.checkedOutSize < currentSize)
+ postAppend(currentSize - checkout.checkedOutSize);
+}
+
+void
+BodyPipe::undoCheckOut(Checkout &checkout)
+{
+ assert(isCheckedOut);
+ const size_t currentSize = theBuf.contentSize();
+ // We can only undo if size did not change, and even that carries
+ // some risk. If this becomes a problem, the code checking out
+ // raw buffers should always check them in (possibly unchanged)
+ // instead of relying on the automated undo mechanism of Checkout.
+ // The code can always use a temporary buffer to accomplish that.
+ assert(checkout.checkedOutSize == currentSize);
+}
+
+// TODO: Optimize: inform consumer/producer about more data/space only if
+// they used the data/space since we notified them last time.
+
+void
+BodyPipe::postConsume(size_t size) {
+ assert(!isCheckedOut);
+ theGetSize += size;
+ debugs(91,7, HERE << "consumed " << size << " bytes" << status());
+ if (mayNeedMoreData())
+ AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
+}
+
+void
+BodyPipe::postAppend(size_t size) {
+ assert(!isCheckedOut);
+ thePutSize += size;
+ debugs(91,7, HERE << "added " << size << " bytes" << status());
+
+ // We should not consume here even if mustAutoConsume because the
+ // caller may not be ready for the data to be consumed during this call.
+ AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
+
+ if (!mayNeedMoreData())
+ clearProducer(true); // reached end-of-body
+}
+
+
+void
+BodyPipe::scheduleBodyEndNotification()
+{
+ if (bodySizeKnown() && bodySize() == thePutSize)
+ AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
+ else
+ AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
+}
+
+void BodyPipe::tellMoreBodySpaceAvailable()
+{
+ if (theProducer != NULL)
+ theProducer->noteMoreBodySpaceAvailable(*this);
+}
+
+void BodyPipe::tellBodyConsumerAborted()
+{
+ if (theProducer != NULL)
+ theProducer->noteBodyConsumerAborted(*this);
+}
+
+void BodyPipe::tellMoreBodyDataAvailable()
+{
+ if (theConsumer != NULL)
+ theConsumer->noteMoreBodyDataAvailable(*this);
+ else
+ if (mustAutoConsume && theBuf.hasContent())
+ consume(theBuf.contentSize());
+}
+
+void BodyPipe::tellBodyProductionEnded()
+{
+ if (theConsumer != NULL)
+ theConsumer->noteBodyProductionEnded(*this);
+}
+
+void BodyPipe::tellBodyProducerAborted()
+{
+ if (theConsumer != NULL)
+ theConsumer->noteBodyProducerAborted(*this);
+}
+
+// a short temporary string describing buffer status for debugging
+const char *BodyPipe::status() const
+{
+ static MemBuf buf;
+ buf.reset();
+
+ buf.append(" [", 2);
+
+ buf.Printf("%d<=%d", (int)theGetSize, (int)thePutSize);
+ if (theBodySize >= 0)
+ buf.Printf("<=%d", (int)theBodySize);
+ else
+ buf.append("<=?", 3);
+
+ buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
+
+ buf.Printf(" pipe%p", this);
+ if (theProducer)
+ buf.Printf(" prod%p", theProducer);
+ if (theConsumer)
+ buf.Printf(" cons%p", theConsumer);
+
+ if (mustAutoConsume)
+ buf.append(" A", 2);
+ if (isCheckedOut)
+ buf.append(" L", 2); // Locked
+
+ buf.append("]", 1);
+
+ buf.terminate();
+
+ return buf.content();
+}
+
+
+/* BodyPipeCheckout */
+
+BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
+ buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
+ checkedOutSize(buf.contentSize()), checkedIn(false)
+{
+}
+
+BodyPipeCheckout::~BodyPipeCheckout()
+{
+ if (!checkedIn)
+ pipe.undoCheckOut(*this);
+}
+
+void
+BodyPipeCheckout::checkIn()
+{
+ assert(!checkedIn);
+ pipe.checkIn(*this);
+ checkedIn = true;
+}
+
+
+BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
+ buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
+ checkedIn(c.checkedIn)
+{
+ assert(false); // prevent copying
+}
+
+BodyPipeCheckout &
+BodyPipeCheckout::operator =(const BodyPipeCheckout &)
+{
+ assert(false); // prevent assignment
+ return *this;
+}
--- /dev/null
+
+#ifndef SQUID_BODY_PIPE_H
+#define SQUID_BODY_PIPE_H
+
+#include "MemBuf.h"
+#include "AsyncCall.h"
+
+class BodyPipe;
+
+// Interface for those who want to produce body content for others.
+// BodyProducer is expected to create the BodyPipe.
+// One pipe cannot have more than one producer.
+class BodyProducer {
+ public:
+ virtual ~BodyProducer() {}
+
+ virtual void noteMoreBodySpaceAvailable(BodyPipe &bp) = 0;
+ virtual void noteBodyConsumerAborted(BodyPipe &bp) = 0;
+
+ protected:
+ void stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof);
+};
+
+// Interface for those who want to consume body content from others.
+// BodyConsumer is expected to register with an existing BodyPipe
+// by calling BodyPipe::setConsumer().
+// One pipe cannot have more than one consumer.
+class BodyConsumer {
+ public:
+ virtual ~BodyConsumer() {}
+
+ virtual void noteMoreBodyDataAvailable(BodyPipe &bp) = 0;
+ virtual void noteBodyProductionEnded(BodyPipe &bp) = 0;
+ virtual void noteBodyProducerAborted(BodyPipe &bp) = 0;
+
+ protected:
+ void stopConsumingFrom(RefCount<BodyPipe> &pipe);
+};
+
+// Makes raw buffer checkin/checkout interface efficient and exception-safe.
+// Either append or consume operations can be performed on a checkedout buffer.
+class BodyPipeCheckout {
+ public:
+ friend class BodyPipe;
+
+ public:
+ BodyPipeCheckout(BodyPipe &pipe); // checks out
+ ~BodyPipeCheckout(); // aborts checkout unless checkedIn
+
+ void checkIn();
+
+ public:
+ BodyPipe &pipe;
+ MemBuf &buf;
+ const size_t offset; // of current content, relative to the body start
+
+ protected:
+ const size_t checkedOutSize;
+ bool checkedIn;
+
+ private:
+ BodyPipeCheckout(const BodyPipeCheckout &); // prevent copying
+ BodyPipeCheckout &operator =(const BodyPipeCheckout &); // prevent assignment
+};
+
+// Connects those who produces message body content with those who
+// consume it. For example, connects ConnStateData with FtpStateData OR
+// ICAPModXact with HttpStateData.
+class BodyPipe: public RefCountable {
+ public:
+ typedef RefCount<BodyPipe> Pointer;
+ typedef BodyProducer Producer;
+ typedef BodyConsumer Consumer;
+ typedef BodyPipeCheckout Checkout;
+
+ enum { MaxCapacity = SQUID_TCP_SO_RCVBUF };
+
+ friend class BodyPipeCheckout;
+
+ public:
+ BodyPipe(Producer *aProducer);
+ ~BodyPipe(); // asserts that producer and consumer are cleared
+
+ void setBodySize(size_t aSize); // set body size
+ bool bodySizeKnown() const { return theBodySize >= 0; }
+ size_t bodySize() const;
+ size_t consumedSize() const { return theGetSize; }
+ bool productionEnded() const { return !theProducer; }
+
+ // called by producers
+ void clearProducer(bool atEof); // aborts or sends eof
+ size_t putMoreData(const char *buf, size_t size);
+ bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); }
+ bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; }
+ size_t unproducedSize() const; // size of still unproduced data
+
+ // called by consumers
+ bool setConsumerIfNotLate(Consumer *aConsumer);
+ void clearConsumer(); // aborts if still piping
+ size_t getMoreData(MemBuf &buf);
+ void consume(size_t size);
+ bool expectMoreAfter(size_t offset) const;
+ bool exhausted() const; // saw eof/abort and all data consumed
+
+ // start or continue consuming when there is no consumer
+ void enableAutoConsumption();
+
+ const MemBuf &buf() const { return theBuf; }
+
+ const char *status() const; // for debugging only
+
+ protected:
+ // lower-level interface used by Checkout
+ MemBuf &checkOut(); // obtain raw buffer
+ void checkIn(Checkout &checkout); // return updated raw buffer
+ void undoCheckOut(Checkout &checkout); // undo checkout efffect
+
+ void scheduleBodyEndNotification();
+
+ // keep counters in sync and notify producer or consumer
+ void postConsume(size_t size);
+ void postAppend(size_t size);
+
+ public: /* public to enable callbacks, but treat as private */
+
+ /* these methods are calling producer and sibscriber note*()
+ * callbacks with this BodyPipe as a parameter, which allows
+ * a single producer or consumer to support multiple pipes. */
+
+ void tellMoreBodySpaceAvailable();
+ void tellBodyConsumerAborted();
+ void tellMoreBodyDataAvailable();
+ void tellBodyProductionEnded();
+ void tellBodyProducerAborted();
+
+ AsyncCallWrapper(91,5, BodyPipe, tellMoreBodySpaceAvailable);
+ AsyncCallWrapper(91,5, BodyPipe, tellBodyConsumerAborted);
+ AsyncCallWrapper(91,5, BodyPipe, tellMoreBodyDataAvailable);
+ AsyncCallWrapper(91,5, BodyPipe, tellBodyProductionEnded);
+ AsyncCallWrapper(91,5, BodyPipe, tellBodyProducerAborted);
+
+ private:
+ ssize_t theBodySize; // expected total content length, if known
+ Producer *theProducer; // content producer, if any
+ Consumer *theConsumer; // content consumer, if any
+
+ size_t thePutSize; // ever-increasing total
+ size_t theGetSize; // ever-increasing total
+
+ MemBuf theBuf; // produced but not yet consumed content, if any
+
+ bool mustAutoConsume; // consume when there is no consumer
+ bool isCheckedOut; // to keep track of checkout violations
+
+ CBDATA_CLASS2(BodyPipe);
+};
+
+#endif /* SQUID_BODY_PIPE_H */
+++ /dev/null
-
-
-#include "squid.h"
-#include "MemBuf.h"
-#include "BodyReader.h"
-
-BodyReader::BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d) :
- _remaining(len), _available(0),
- read_func(r), abort_func(a), kick_func(k), read_func_data(d),
- read_callback(NULL), read_callback_data(NULL)
-{
- theBuf.init(4096, 65536);
- debugs(32,3,HERE << this << " " << "created new BodyReader for content-length " << len);
- bytes_read = 0;
-}
-
-BodyReader::~BodyReader()
-{
- if (_remaining && abort_func)
- abort_func(read_func_data, _remaining);
-
- if (callbackPending())
- doCallback();
-
-}
-
-void
-BodyReader::read(CBCB *callback, void *cbdata)
-{
- assert(_remaining || theBuf.contentSize());
- debugs(32,3,HERE << this << " " << "remaining = " << _remaining);
- debugs(32,3,HERE << this << " " << "available = " << _available);
-
- if (read_callback == NULL) {
- read_callback = callback;
- read_callback_data = cbdataReference(cbdata);
- } else {
- assert(read_callback == callback);
- assert(read_callback_data == cbdata);
- }
-
- if ((_available == 0) && (theBuf.contentSize() == 0)) {
- debugs(32,3,HERE << this << " " << "read: no body data available, saving callback pointers");
-
- if (kick_func)
- kick_func(read_func_data);
-
- return;
- }
-
- debugs(32,3,HERE << this << " " << "read_func=" << read_func);
- debugs(32,3,HERE << this << " " << "data=" << read_func_data);
- size_t size = theBuf.potentialSpaceSize();
-
- debugs(32, 3, "BodyReader::read: available: " << _available << ", size " << size << ", remaining: " << _remaining);
-
- if (size > _available)
- size = _available;
-
- if (size > _remaining)
- size = _remaining;
-
- if (size > 0) {
- debugs(32,3,HERE << this << " " << "calling read_func for " << size << " bytes");
-
- size_t nread = read_func(read_func_data, theBuf, size);
-
- if (nread > 0) {
- _available -= nread;
- reduce_remaining(nread);
- } else {
- debugs(32,3,HERE << this << " " << "Help, read_func() ret " << nread);
- }
- }
-
- if (theBuf.contentSize() > 0) {
- debugs(32,3,HERE << this << " have " << theBuf.contentSize() << " bytes in theBuf, calling back");
- doCallback();
- }
-}
-
-void
-BodyReader::notify(size_t now_available)
-{
- debugs(32,3,HERE << this << " " << "old available = " << _available);
- debugs(32,3,HERE << this << " " << "now_available = " << now_available);
- _available = now_available;
-
- if (!callbackPending()) {
- debugs(32,3,HERE << this << " " << "no callback pending, nothing to do");
- return;
- }
-
- debugs(32,3,HERE << this << " " << "have data and pending callback, calling read()");
-
- read(read_callback, read_callback_data);
-}
-
-bool
-BodyReader::callbackPending()
-{
- return read_callback ? true : false;
-}
-
-/*
- * doCallback
- *
- * Execute the read callback if there is a function registered
- * and the read_callback_data is still valid.
- */
-bool
-BodyReader::doCallback()
-{
- CBCB *t_callback = read_callback;
- void *t_cbdata;
-
- if (t_callback == NULL)
- return false;
-
- read_callback = NULL;
-
- if (!cbdataReferenceValidDone(read_callback_data, &t_cbdata))
- return false;
-
- debugs(32,3,HERE << this << " doing callback, theBuf size = " << theBuf.contentSize());
-
- t_callback(theBuf, t_cbdata);
-
- return true;
-}
-
-bool
-BodyReader::consume(size_t size)
-{
- debugs(32,3,HERE << this << " BodyReader::consume consuming " << size);
-
- if (theBuf.contentSize() < (mb_size_t) size) {
- debugs(0,0,HERE << this << "BodyReader::consume failed");
- debugs(0,0,HERE << this << "BodyReader::consume size = " << size);
- debugs(0,0,HERE << this << "BodyReader::consume contentSize() = " << theBuf.contentSize());
- return false;
- }
-
- theBuf.consume(size);
-
- if (callbackPending() && _available > 0) {
- debugs(32,3,HERE << this << " " << "data avail and pending callback, calling read()");
- read(read_callback, read_callback_data);
- }
-
- return true;
-}
-
-void
-BodyReader::reduce_remaining(size_t size)
-{
- assert(size <= _remaining);
- _remaining -= size;
-}
+++ /dev/null
-
-#ifndef SQUID_BODY_READER_H
-#define SQUID_BODY_READER_H
-
-typedef void CBCB (MemBuf &mb, void *data);
-typedef size_t BodyReadFunc (void *, MemBuf &mb, size_t size);
-typedef void BodyAbortFunc (void *, size_t);
-typedef void BodyKickFunc (void *);
-
-class BodyReader : public RefCountable
-{
-
-public:
- typedef RefCount<BodyReader> Pointer;
- BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d);
- ~BodyReader();
- void read(CBCB *, void *);
- void notify(size_t now_available);
- size_t remaining() { return _remaining; }
-
- bool callbackPending();
- bool consume(size_t size);
-
- int bytes_read;
-
- /* reduce the number of bytes that the BodyReader is looking for.
- * Will trigger an assertion if it tries to reduce below zero
- */
- void reduce_remaining(size_t size);
-
-private:
- size_t _remaining;
- size_t _available;
- MemBuf theBuf;
-
- /*
- * These are for interacting with things that
- * "provide" body content. ie, ConnStateData and
- * ICAPReqMod after adapation.
- */
- BodyReadFunc *read_func;
- BodyAbortFunc *abort_func;
- BodyKickFunc *kick_func;
- void *read_func_data;
-
- /*
- * These are for interacting with things that
- * "consume" body content. ie, HttpStateData and
- * ICAPReqMod before adaptation.
- */
- CBCB *read_callback;
- void *read_callback_data;
- bool doCallback();
-};
-
-#endif
+++ /dev/null
-#include "squid.h"
-#include "client_side.h"
-#include "ClientBody.h"
-#include "HttpRequest.h"
-
-
-ClientBody::ClientBody(ConnStateData::Pointer & aConn, HttpRequest *Request) : conn(aConn), request(NULL), buf (NULL), bufsize(0), callback(NULL), cbdata(NULL)
-{
- request = HTTPMSGLOCK(Request);
-}
-
-ClientBody::~ClientBody()
-{
- if (cbdata)
- cbdataReferenceDone(cbdata);
-
- HTTPMSGUNLOCK(request);
-
- conn = NULL; // refcounted
-}
-
-/* Called by clientReadRequest to process body content */
-void
-ClientBody::process()
-{
-
- debug(33, 2) ("clientBody::process: start FD %d body_size=%lu in.notYetUsed=%lu cb=%p req=%p\n",
- conn->fd,
- (unsigned long int) conn->body_size_left,
- (unsigned long int) conn->in.notYetUsed,
- callback,
- request);
-
- if (conn->in.notYetUsed)
- processBuffer();
- else
- conn->readSomeData();
-}
-
-void
-ClientBody::processBuffer()
-{
- /* Some sanity checks... */
- assert(conn->body_size_left > 0);
- assert(conn->in.notYetUsed > 0);
- assert(callback != NULL);
- assert(buf != NULL);
- /* How much do we have to process? */
- size_t size = conn->in.notYetUsed;
-
- if (size > conn->body_size_left) /* only process the body part */
- size = conn->body_size_left;
-
- if (size > bufsize) /* don't copy more than requested */
- size = bufsize;
-
- xmemcpy(buf, conn->in.buf, size);
-
- conn->body_size_left -= size;
-
- /* Move any remaining data */
- conn->in.notYetUsed -= size;
-
- if (conn->in.notYetUsed > 0)
- xmemmove(conn->in.buf, conn->in.buf + size, conn->in.notYetUsed);
-
- /* Remove request link if this is the last part of the body, as
- * clientReadRequest automatically continues to process next request */
- if (conn->body_size_left <= 0 && request != NULL)
- request->body_connection = NULL;
-
- request->flags.body_sent = 1;
-
- doCallback(size);
-
- debug(33, 2) ("ClientBody::process: end FD %d size=%lu body_size=%lu in.notYetUsed=%lu cb=%p req=%p\n",
- conn->fd, (unsigned long int)size, (unsigned long int) conn->body_size_left,
- (unsigned long) conn->in.notYetUsed, callback, request);
-}
-
-void
-ClientBody::init(char *Buf, size_t Bufsize, CBCB *Callback, void *Cbdata)
-{
- buf = Buf;
- bufsize = Bufsize;
- callback = Callback;
- cbdata = cbdataReference(Cbdata);
-}
-
-void
-ClientBody::doCallback(size_t theSize)
-{
- char *theBuf = buf;
- CBCB *theCallback = callback;
- void *theCbdata = cbdata;
-
- buf = NULL;
- bufsize = 0;
- callback = NULL;
- cbdata = NULL;
-
- void *someCbdata;
-
- if (cbdataReferenceValidDone(theCbdata, &someCbdata))
- theCallback(theBuf, theSize, someCbdata);
-}
-
-void
-ClientBody::negativeCallback()
-{
- doCallback((size_t)-1);
-}
+++ /dev/null
-#ifndef SQUID_CLIENTBODY_H
-#define SQUID_CLIENTBODY_H
-
-class ClientBody
-{
-
-public:
- ClientBody (ConnStateData::Pointer &, HttpRequest *);
- ~ClientBody();
- void process();
- void processBuffer();
- void init(char *, size_t, CBCB *, void *);
-bool hasCallback() const { return callback ? true : false; };
-
- void doCallback(size_t);
- void negativeCallback();
- HttpRequest * getRequest() { return request; };
-
-private:
- ConnStateData::Pointer conn;
- HttpRequest *request;
- char *buf;
- size_t bufsize;
- CBCB *callback;
- void *cbdata;
-};
-
-
-#endif