]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Replacing ClientBody class with BodyReader.
authorwessels <>
Fri, 28 Apr 2006 01:27:37 +0000 (01:27 +0000)
committerwessels <>
Fri, 28 Apr 2006 01:27:37 +0000 (01:27 +0000)
The old ClientBody code did not allow us to insert ICAP into the
flow of an HTTP request.  Code in http.cc called read functions in
client_side.cc when forwarding a request body.  But with ICAP in
the middle, HTTP needs to get the message body from ICAP, not
client_side.

The new BodyReader is similar to ClientBody.  Now read and abort
functions are pointers, rather than hard-coded in HTTP/FTP modules.

23 files changed:
src/BodyReader.cc [new file with mode: 0644]
src/BodyReader.h [new file with mode: 0644]
src/HttpRequest.cc
src/HttpRequest.h
src/ICAP/ICAPClientReqmodPrecache.cc
src/ICAP/ICAPClientReqmodPrecache.h
src/ICAP/ICAPConfig.cc
src/ICAP/ICAPModXact.cc
src/ICAP/ICAPXaction.cc
src/ICAP/MsgPipe.cc
src/Makefile.am
src/Makefile.in
src/client_side.cc
src/client_side.h
src/client_side_request.cc
src/client_side_request.h
src/forward.cc
src/ftp.cc
src/http.cc
src/http.h
src/protos.h
src/structs.h
src/typedefs.h

diff --git a/src/BodyReader.cc b/src/BodyReader.cc
new file mode 100644 (file)
index 0000000..37a913b
--- /dev/null
@@ -0,0 +1,147 @@
+
+
+#include "squid.h"
+#include "MemBuf.h"
+#include "BodyReader.h"
+
+BodyReader::BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d) :
+        _remaining(len), _available(0),
+        read_func(r), abort_func(a), kick_func(k), read_func_data(d),
+        read_callback(NULL), read_callback_data(NULL)
+{
+    theBuf.init(4096, 65536);
+    debugs(32,3,HERE << this << " " << "created new BodyReader for content-length " << len);
+    bytes_read = 0;
+}
+
+BodyReader::~BodyReader()
+{
+    if (_remaining && abort_func)
+        abort_func(read_func_data, _remaining);
+
+    if (callbackPending())
+        doCallback();
+
+}
+
+void
+BodyReader::read(CBCB *callback, void *cbdata)
+{
+    assert(_remaining || theBuf.contentSize());
+    debugs(32,3,HERE << this << " " << "remaining = " << _remaining);
+    debugs(32,3,HERE << this << " " << "available = " << _available);
+
+    if (read_callback == NULL) {
+        read_callback = callback;
+        read_callback_data = cbdataReference(cbdata);
+    } else {
+        assert(read_callback == callback);
+        assert(read_callback_data == cbdata);
+    }
+
+    if ((_available == 0) && (theBuf.contentSize() == 0)) {
+        debugs(32,3,HERE << this << " " << "read: no body data available, saving callback pointers");
+
+        if (kick_func)
+            kick_func(read_func_data);
+
+        return;
+    }
+
+    debugs(32,3,HERE << this << " " << "read_func=" << (int) read_func);
+    debugs(32,3,HERE << this << " " << "data=" << read_func_data);
+    size_t size = theBuf.potentialSpaceSize();
+
+    if (size > _available)
+        size = _available;
+
+    if (size > 0) {
+        debugs(32,3,HERE << this << " " << "calling read_func for " << size << " bytes");
+
+        size_t nread = read_func(read_func_data, theBuf, size);
+
+        if (nread > 0) {
+            _available -= nread;
+            _remaining -= nread;
+        } else {
+            debugs(32,3,HERE << this << " " << "Help, read_func() ret " << nread);
+        }
+    }
+
+    if (theBuf.contentSize() > 0) {
+        debugs(32,3,HERE << this << " have " << theBuf.contentSize() << " bytes in theBuf, calling back");
+        doCallback();
+    }
+}
+
+void
+BodyReader::notify(size_t now_available)
+{
+    debugs(32,3,HERE << this << " " << "old available = " << _available);
+    debugs(32,3,HERE << this << " " << "now_available = " << now_available);
+    _available = now_available;
+
+    if (!callbackPending()) {
+        debugs(32,3,HERE << this << " " << "no callback pending, nothing to do");
+        return;
+    }
+
+    debugs(32,3,HERE << this << " " << "have data and pending callback, calling read()");
+
+    read(read_callback, read_callback_data);
+}
+
+bool
+BodyReader::callbackPending()
+{
+    return read_callback ? true : false;
+}
+
+/*
+ * doCallback
+ *
+ * Execute the read callback if there is a function registered
+ * and the read_callback_data is still valid.
+ */
+bool
+BodyReader::doCallback()
+{
+    CBCB *t_callback = read_callback;
+    void *t_cbdata;
+
+    if (t_callback == NULL)
+        return false;
+
+    read_callback = NULL;
+
+    if (!cbdataReferenceValidDone(read_callback_data, &t_cbdata))
+        return false;
+
+    debugs(32,3,HERE << this << " doing callback, theBuf size = " << theBuf.contentSize());
+
+    t_callback(theBuf, t_cbdata);
+
+    return true;
+}
+
+bool
+BodyReader::consume(size_t size)
+{
+    debugs(32,3,HERE << this << " BodyReader::consume consuming " << size);
+
+    if (theBuf.contentSize() < (mb_size_t) size) {
+        debugs(0,0,HERE << this << "BodyReader::consume failed");
+        debugs(0,0,HERE << this << "BodyReader::consume size = " << size);
+        debugs(0,0,HERE << this << "BodyReader::consume contentSize() = " << theBuf.contentSize());
+        return false;
+    }
+
+    theBuf.consume(size);
+
+    if (callbackPending() && _available > 0) {
+        debugs(32,3,HERE << this << " " << "data avail and pending callback, calling read()");
+        read(read_callback, read_callback_data);
+    }
+
+    return true;
+}
diff --git a/src/BodyReader.h b/src/BodyReader.h
new file mode 100644 (file)
index 0000000..0087587
--- /dev/null
@@ -0,0 +1,51 @@
+
+#ifndef SQUID_BODY_READER_H
+#define SQUID_BODY_READER_H
+
+typedef void CBCB (MemBuf &mb, void *data);
+typedef size_t BodyReadFunc (void *, MemBuf &mb, size_t size);
+typedef void BodyAbortFunc (void *, size_t);
+typedef void BodyKickFunc (void *);
+
+class BodyReader : public RefCountable
+{
+
+public:
+    typedef RefCount<BodyReader> Pointer;
+    BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d);
+    ~BodyReader();
+    void read(CBCB *, void *);
+    void notify(size_t now_available);
+    size_t remaining() { return _remaining; }
+
+    bool callbackPending();
+    bool consume(size_t size);
+
+    int bytes_read;
+
+private:
+    size_t _remaining;
+    size_t _available;
+    MemBuf theBuf;
+
+    /*
+     * These are for interacting with things that
+     * "provide" body content.  ie, ConnStateData and
+            * ICAPReqMod after adapation.
+     */
+    BodyReadFunc *read_func;
+    BodyAbortFunc *abort_func;
+    BodyKickFunc *kick_func;
+    void *read_func_data;
+
+    /*
+     * These are for interacting with things that
+     * "consume" body content. ie, HttpStateData and
+     * ICAPReqMod before adaptation.
+     */
+    CBCB *read_callback;
+    void *read_callback_data;
+    bool doCallback();
+};
+
+#endif
index 34f57c0e609fe2c36538fc81ef1145aeb896d167..3817fd41960c1584137e5de2ce59e24f69d32ac5 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: HttpRequest.cc,v 1.61 2006/04/22 05:29:18 robertc Exp $
+ * $Id: HttpRequest.cc,v 1.62 2006/04/27 19:27:37 wessels Exp $
  *
  * DEBUG: section 73    HTTP Request
  * AUTHOR: Duane Wessels
@@ -86,7 +86,7 @@ HttpRequest::init()
     my_addr = no_addr;
     my_port = 0;
     client_port = 0;
-    body_connection = NULL;
+    body_reader = NULL;
     // hier
     errType = ERR_NONE;
     peer_login = NULL;         // not allocated/deallocated by this class
@@ -101,8 +101,8 @@ HttpRequest::init()
 void
 HttpRequest::clean()
 {
-    if (body_connection.getRaw() != NULL)
-        fatal ("request being destroyed with body connection intact\n");
+    if (body_reader != NULL)
+        fatal ("request being destroyed with body reader intact\n");
 
     if (auth_user_request) {
         auth_user_request->unlock();
index 48aeb95a4d8e2f605ec2f14a53b4cc93b6b0f4fb..abf3e52e48ffe1c082dc8d131fca3848aa70bdb7 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: HttpRequest.h,v 1.19 2006/02/17 18:10:59 wessels Exp $
+ * $Id: HttpRequest.h,v 1.20 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -37,6 +37,7 @@
 #include "HttpMsg.h"
 #include "client_side.h"
 #include "HierarchyLogEntry.h"
+#include "BodyReader.h"
 
 /*  Http Request */
 extern int httpRequestHdrAllowed(const HttpHeaderEntry * e, String * strConnection);
@@ -104,7 +105,7 @@ public:
 
     unsigned short client_port;
 
-    ConnStateData::Pointer body_connection;    /* used by clientReadBody() */
+    BodyReader::Pointer body_reader;
 
     HierarchyLogEntry hier;
 
index fd81eef4764a2afd0b3f46ab6d34baba6c72d90c..0b4507c6dbe8908b06f9c2b615fdd11783fc3acc 100644 (file)
@@ -26,8 +26,17 @@ ICAPClientReqmodPrecache::~ICAPClientReqmodPrecache()
     if (virgin != NULL)
         freeVirgin();
 
-    if (adapted != NULL)
+    if (adapted != NULL) {
+        /*
+         * adapted->sink is equal to this.  Remove the pointer since
+         * we are deleting this.
+         */
+
+        if (adapted->sink)
+            adapted->sink = NULL;
+
         freeAdapted();
+    }
 }
 
 void ICAPClientReqmodPrecache::startReqMod(ClientHttpRequest *aHttp, HttpRequest *request)
@@ -116,18 +125,33 @@ void ICAPClientReqmodPrecache::noteSourceStart(MsgPipe *p)
      * tell the other side to use the original request/response
      * headers.
      */
+    HttpRequest *req = dynamic_cast<HttpRequest*>(adapted->data->header);
+
+    if (req && req->content_length > 0) {
+        assert(req->body_reader == NULL);
+        req->body_reader = new BodyReader(req->content_length, readBody, abortBody, kickBody, this);
+    }
+
     http->takeAdaptedHeaders(adapted->data->header);
     noteSourceProgress(p);
 }
 
-// ICAP client sends more data
+/*
+ * This is where we receive a notification from the other
+ * side of the MsgPipe that new adapted data is available.
+ * We, in turn, tell whoever is reading from the request's
+ * body_reader about the new data.
+ */
 void ICAPClientReqmodPrecache::noteSourceProgress(MsgPipe *p)
 {
     debug(93,3)("ICAPClientReqmodPrecache::noteSourceProgress() called\n");
     //tell ClientHttpRequest to store a fresh portion of the adapted response
 
     if (p->data->body->hasContent()) {
-        http->takeAdaptedBody(p->data->body);
+        HttpRequest *req = dynamic_cast<HttpRequest*>(adapted->data->header);
+        assert(req);
+        debugs(32,3,HERE << "notifying body_reader, contentSize() = " << p->data->body->contentSize());
+        req->body_reader->notify(p->data->body->contentSize());
     }
 }
 
@@ -159,6 +183,12 @@ void ICAPClientReqmodPrecache::stop(Notify notify)
         freeVirgin();
     }
 
+#if DONT_FREE_ADAPTED
+    /*
+     * NOTE: We do not clean up "adapted->sink" here because it may
+     * have an HTTP message body that needs to stay around a little
+     * while longer so that the HTTP server-side can forward it on.
+     */
     if (adapted != NULL) {
         if (notify == notifyIcap)
             adapted->sendSinkAbort();
@@ -168,6 +198,8 @@ void ICAPClientReqmodPrecache::stop(Notify notify)
         freeAdapted();
     }
 
+#endif
+
     if (http) {
         if (notify == notifyOwner)
             // tell ClientHttpRequest that we are aborting prematurely
@@ -189,3 +221,60 @@ void ICAPClientReqmodPrecache::freeAdapted()
 {
     adapted = NULL;    // refcounted
 }
+
+/*
+ * Something that needs to read the adapated request body
+ * calls this function, via the BodyReader class.  We copy
+ * the body data from our bodybuf object to the BodyReader
+ * MemBuf, which was passed as a reference to this function.
+ */
+size_t
+ICAPClientReqmodPrecache::readBody(void *data, MemBuf &mb, size_t size)
+{
+    ICAPClientReqmodPrecache *icap = static_cast<ICAPClientReqmodPrecache *>(data);
+    assert(icap != NULL);
+    assert(icap->adapted != NULL);
+    assert(icap->adapted->data != NULL);
+    MemBuf *bodybuf = icap->adapted->data->body;
+    assert(bodybuf != NULL);
+    debugs(32,3,HERE << "readBody requested size " << size);
+    debugs(32,3,HERE << "readBody bodybuf size " << bodybuf->contentSize());
+
+    if ((mb_size_t) size > bodybuf->contentSize())
+        size = bodybuf->contentSize();
+
+    debugs(32,3,HERE << "readBody actual size " << size);
+
+    assert(size);
+
+    mb.append(bodybuf->content(), size);
+
+    bodybuf->consume(size);
+
+    return size;
+}
+
+void
+ICAPClientReqmodPrecache::abortBody(void *data, size_t remaining)
+{
+    if (remaining >= 0) {
+        debugs(0,0,HERE << "ICAPClientReqmodPrecache::abortBody size " << remaining);
+        // more?
+    }
+
+    ICAPClientReqmodPrecache *icap = static_cast<ICAPClientReqmodPrecache *>(data);
+    icap->stop(notifyIcap);
+}
+
+/*
+ * Restart reading the adapted response from the ICAP server in case
+ * the body buffer became full and we stopped reading.
+ */
+void
+ICAPClientReqmodPrecache::kickBody(void *data)
+{
+    debugs(32,3,HERE << "ICAPClientReqmodPrecache::kickBody");
+    ICAPClientReqmodPrecache *icap = static_cast<ICAPClientReqmodPrecache *>(data);
+    assert(icap->adapted != NULL);
+    icap->adapted->sendSinkNeed();
+}
index f46312768a556edc688c193619183edb20d6b185..bf5d5273578f5f6bf9b5b34dc9dca2d32d6657e2 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: ICAPClientReqmodPrecache.h,v 1.2 2005/12/22 22:26:31 wessels Exp $
+ * $Id: ICAPClientReqmodPrecache.h,v 1.3 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -79,6 +79,7 @@ public:
     ClientHttpRequest *http;
     MsgPipe::Pointer virgin;
     MsgPipe::Pointer adapted;
+    BodyReader::Pointer body_reader;
 
 private:
     typedef enum { notifyNone, notifyOwner, notifyIcap } Notify;
@@ -86,6 +87,12 @@ private:
     void freeVirgin();
     void freeAdapted();
     CBDATA_CLASS2(ICAPClientReqmodPrecache);
+
+    // Hooks to BodyReader so HttpStateData can get the
+    // adapted request body
+    static BodyReadFunc readBody;
+    static BodyAbortFunc abortBody;
+    static BodyKickFunc kickBody;
 };
 
 #endif /* SQUID_ICAPCLIENTSIDEHOOK_H */
index 4d3c634a924948bb4454c16d4a85ca002be6a372..49381ac9d29af3e9a5cde06c0ff50f0e245b5b47 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: ICAPConfig.cc,v 1.8 2006/04/27 19:07:16 wessels Exp $
+ * $Id: ICAPConfig.cc,v 1.9 2006/04/27 19:27:37 wessels Exp $
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
  * ----------------------------------------------------------
@@ -47,6 +47,7 @@
 
 extern ConfigParser LegacyParser;      // from cache_cf.cc
 ICAPConfig TheICAPConfig;
+extern ConfigParser LegacyParser;      // found in cache_cf.cc
 
 ICAPServiceRep::Pointer
 ICAPConfig::findService(const String& key)
index 3559d443ffba309f8abb1dd6e5813d4def5fed9e..a0bbe6b58c5340ba595fd5acde389e08b38047d6 100644 (file)
@@ -149,12 +149,13 @@ void ICAPModXact::handleCommConnected()
     requestBuf.init();
 
     makeRequestHeaders(requestBuf);
-    debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" <<
+    debugs(93, 9, "ICAPModXact ICAP status " << status() << " will write:\n" <<
            (requestBuf.terminate(), requestBuf.content()));
 
     // write headers
     state.writing = State::writingHeaders;
     scheduleWrite(requestBuf);
+    virgin->sendSinkNeed();
 }
 
 void ICAPModXact::handleCommWrote(size_t sz)
@@ -172,13 +173,11 @@ void ICAPModXact::handleCommWroteHeaders()
     Must(state.writing == State::writingHeaders);
 
     if (virginBody.expected()) {
-        debugs(98, 5, HERE);
         state.writing = preview.enabled() ?
                         State::writingPreview : State::writingPrime;
         virginWriteClaim.protectAll();
         writeMore();
     } else {
-        debugs(98, 5, HERE);
         stopWriting();
     }
 }
@@ -425,15 +424,21 @@ void ICAPModXact::startReading()
 
 void ICAPModXact::readMore()
 {
-    if (reader || doneReading())
+    if (reader || doneReading()) {
+        debugs(32,3,HERE << "returning from readMore because reader or doneReading()");
         return;
+    }
 
     // do not fill readBuf if we have no space to store the result
-    if (!adapted->data->body->hasPotentialSpace())
+    if (!adapted->data->body->hasPotentialSpace()) {
+        debugs(93,1,HERE << "Not reading because ICAP reply buffer is full");
         return;
+    }
 
     if (readBuf.hasSpace())
         scheduleRead();
+    else
+        debugs(93,1,HERE << "nothing to do because !readBuf.hasSpace()");
 }
 
 // comm module read a portion of the ICAP response for us
@@ -788,8 +793,13 @@ bool ICAPModXact::parsePresentBody()
     if (parsed)
         return true;
 
-    if (bodyParser->needsMoreData())
+    debugs(32,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData());
+
+    if (bodyParser->needsMoreData()) {
+        debugs(32,3,HERE << this);
         Must(mayReadMore());
+        readMore();
+    }
 
     if (bodyParser->needsMoreSpace()) {
         Must(!doneSending()); // can hope for more space
@@ -864,11 +874,10 @@ void ICAPModXact::noteSinkNeed(MsgPipe *p)
 
     if (state.sending == State::sendingVirgin)
         echoMore();
+    else if (state.sending == State::sendingAdapted)
+        parseMore();
     else
-        if (state.sending == State::sendingAdapted)
-            parseMore();
-        else
-            Must(state.sending == State::sendingUndecided);
+        Must(state.sending == State::sendingUndecided);
 
     ICAPXaction_Exit();
 }
index d4bf8fa05fdab9da6169898c7b33e5c9f56b8c51..2411127657fe51e9691998c9d9c4e77efce4cc67 100644 (file)
@@ -54,6 +54,7 @@ void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, void
 static
 void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data)
 {
+    debugs(93,3,HERE << data << " read returned " << size);
     ICAPXaction_fromData(data).noteCommRead(status, size);
 }
 
@@ -238,8 +239,10 @@ void ICAPXaction::handleCommClosed()
 
 bool ICAPXaction::done() const
 {
-    if (stopReason != NULL) // mustStop() has been called
+    if (stopReason != NULL) { // mustStop() has been called
+        debugs(93,1,HERE << "ICAPXaction is done() because " << stopReason);
         return true;
+    }
 
     return doneAll();
 }
@@ -276,7 +279,7 @@ void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz)
     Must(commStatus == COMM_OK);
     Must(sz >= 0);
 
-    debugs(93, 5, HERE << "read " << sz << " bytes");
+    debugs(93, 3, HERE << "read " << sz << " bytes");
 
     /*
      * See comments in ICAPXaction.h about why we use commBuf
@@ -396,7 +399,7 @@ void ICAPXaction::callException(const TextException &e)
 void ICAPXaction::callEnd()
 {
     if (done()) {
-        debugs(93, 5, "ICAPXaction::" << inCall << " ends xaction " <<
+        debugs(93, 5, HERE << "ICAPXaction::" << inCall << " ends xaction " <<
                status());
         doStop(); // may delete us
         return;
index 3c18f72010d59b23f098331cb53c09358ea96dd1..0c6c7e00e90eb022016b29542c89ad269f20382f 100644 (file)
@@ -32,8 +32,8 @@ MsgPipe::MsgPipe(const char *aName): name(aName),
 MsgPipe::~MsgPipe()
 {
     delete data;
-    delete source;
-    delete sink;
+    assert(source == NULL);
+    assert(sink == NULL);
 };
 
 void MsgPipe::sendSourceStart()
index 7f2b95c74590bac04ff6f384a5d46b8e89a767c5..30f496371b9f1df6daa583fee6ef60eaf3f2a35f 100644 (file)
@@ -1,7 +1,7 @@
 #
 #  Makefile for the Squid Object Cache server
 #
-#  $Id: Makefile.am,v 1.138 2006/04/27 19:04:15 wessels Exp $
+#  $Id: Makefile.am,v 1.139 2006/04/27 19:27:37 wessels Exp $
 #
 #  Uncomment and customize the following to suit your needs:
 #
@@ -376,8 +376,8 @@ squid_SOURCES = \
        client_side_reply.h \
        client_side_request.cc \
        client_side_request.h \
-       ClientBody.cc \
-       ClientBody.h \
+       BodyReader.cc \
+       BodyReader.h \
        ClientRequestContext.h \
        clientStream.cc \
        clientStream.h \
@@ -625,7 +625,7 @@ recv_announce_SOURCES = recv-announce.cc SquidNew.cc
 ## SwapDir wants ConfigOption
 
 ufsdump_SOURCES = \
-       ClientBody.cc \
+       BodyReader.cc \
        ConfigParser.cc \
        debug.cc \
        int.cc \
index 1f851cb61f78671c953cea9ee55783b1f2d7dfac..87f95d48d82f76b99b4518ac8c0445a12f6b5327 100644 (file)
@@ -17,7 +17,7 @@
 #
 #  Makefile for the Squid Object Cache server
 #
-#  $Id: Makefile.in,v 1.376 2006/04/27 19:04:15 wessels Exp $
+#  $Id: Makefile.in,v 1.377 2006/04/27 19:27:37 wessels Exp $
 #
 #  Uncomment and customize the following to suit your needs:
 #
@@ -163,8 +163,8 @@ am__squid_SOURCES_DIST = access_log.cc AccessLogEntry.h acl.cc \
        AuthUser.cc AuthUserRequest.cc cache_cf.cc CacheDigest.cc \
        cache_manager.cc carp.cc cbdata.cc client_db.cc client_side.cc \
        client_side.h client_side_reply.cc client_side_reply.h \
-       client_side_request.cc client_side_request.h ClientBody.cc \
-       ClientBody.h ClientRequestContext.h clientStream.cc \
+       client_side_request.cc client_side_request.h BodyReader.cc \
+       BodyReader.h ClientRequestContext.h clientStream.cc \
        clientStream.h comm.cc comm.h CommIO.h comm_select.cc \
        comm_poll.cc comm_epoll.cc comm_kqueue.cc CommRead.h \
        ConfigOption.cc ConfigParser.cc ConfigParser.h \
@@ -277,7 +277,7 @@ am_squid_OBJECTS = access_log.$(OBJEXT) acl.$(OBJEXT) \
        CacheDigest.$(OBJEXT) cache_manager.$(OBJEXT) carp.$(OBJEXT) \
        cbdata.$(OBJEXT) client_db.$(OBJEXT) client_side.$(OBJEXT) \
        client_side_reply.$(OBJEXT) client_side_request.$(OBJEXT) \
-       ClientBody.$(OBJEXT) clientStream.$(OBJEXT) comm.$(OBJEXT) \
+       BodyReader.$(OBJEXT) clientStream.$(OBJEXT) comm.$(OBJEXT) \
        comm_select.$(OBJEXT) comm_poll.$(OBJEXT) comm_epoll.$(OBJEXT) \
        comm_kqueue.$(OBJEXT) ConfigOption.$(OBJEXT) \
        ConfigParser.$(OBJEXT) debug.$(OBJEXT) $(am__objects_5) \
@@ -568,7 +568,7 @@ am__tests_testUfs_SOURCES_DIST = tests/testUfs.cc tests/testMain.cc \
 am_tests_testUfs_OBJECTS = tests/testUfs.$(OBJEXT) \
        tests/testMain.$(OBJEXT) $(am__objects_27)
 tests_testUfs_OBJECTS = $(am_tests_testUfs_OBJECTS)
-am__ufsdump_SOURCES_DIST = ClientBody.cc ConfigParser.cc debug.cc \
+am__ufsdump_SOURCES_DIST = BodyReader.cc ConfigParser.cc debug.cc \
        int.cc ufsdump.cc store.cc StoreFileSystem.cc StoreMeta.cc \
        StoreMeta.h StoreMetaMD5.cc StoreMetaMD5.h StoreMetaSTD.cc \
        StoreMetaSTD.h StoreMetaUnpacker.cc StoreMetaUnpacker.h \
@@ -620,7 +620,7 @@ am__ufsdump_SOURCES_DIST = ClientBody.cc ConfigParser.cc debug.cc \
        store_swapmeta.cc store_swapout.cc structs.h SwapDir.cc \
        tools.cc typedefs.h unlinkd.cc url.cc urn.cc useragent.cc \
        wais.cc wccp.cc whois.cc wordlist.cc win32.cc
-am_ufsdump_OBJECTS = ClientBody.$(OBJEXT) ConfigParser.$(OBJEXT) \
+am_ufsdump_OBJECTS = BodyReader.$(OBJEXT) ConfigParser.$(OBJEXT) \
        debug.$(OBJEXT) int.$(OBJEXT) ufsdump.$(OBJEXT) \
        store.$(OBJEXT) StoreFileSystem.$(OBJEXT) StoreMeta.$(OBJEXT) \
        StoreMetaMD5.$(OBJEXT) StoreMetaSTD.$(OBJEXT) \
@@ -1215,8 +1215,8 @@ squid_SOURCES = \
        client_side_reply.h \
        client_side_request.cc \
        client_side_request.h \
-       ClientBody.cc \
-       ClientBody.h \
+       BodyReader.cc \
+       BodyReader.h \
        ClientRequestContext.h \
        clientStream.cc \
        clientStream.h \
@@ -1449,7 +1449,7 @@ pinger_SOURCES = \
 dnsserver_SOURCES = dnsserver.cc SquidNew.cc
 recv_announce_SOURCES = recv-announce.cc SquidNew.cc
 ufsdump_SOURCES = \
-       ClientBody.cc \
+       BodyReader.cc \
        ConfigParser.cc \
        debug.cc \
        int.cc \
@@ -2566,7 +2566,7 @@ distclean-compile:
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AuthUser.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AuthUserRequest.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CacheDigest.Po@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ClientBody.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BodyReader.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConfigOption.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConfigParser.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DelayBucket.Po@am__quote@
index 34c972c4ea6254f1539210b719c5c5e05f43a941..41b7c1f9523b10ee70fd31f411b729b751e88ea2 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side.cc,v 1.718 2006/04/22 05:29:19 robertc Exp $
+ * $Id: client_side.cc,v 1.719 2006/04/27 19:27:37 wessels Exp $
  *
  * DEBUG: section 33    Client-side Routines
  * AUTHOR: Duane Wessels
@@ -73,7 +73,6 @@
 #include "client_side_reply.h"
 #include "ClientRequestContext.h"
 #include "MemBuf.h"
-#include "ClientBody.h"
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
@@ -135,6 +134,8 @@ static ClientSocketContext *parseHttpRequest(ConnStateData::Pointer &, method_t
 #if USE_IDENT
 static IDCB clientIdentDone;
 #endif
+static BodyReadFunc clientReadBody;
+static BodyAbortFunc clientAbortBody;
 static CSCB clientSocketRecipient;
 static CSD clientSocketDetach;
 static void clientSetKeepaliveFlag(ClientHttpRequest *);
@@ -163,7 +164,7 @@ static void trimTrailingSpaces(char *aString, size_t len);
 static ClientSocketContext *parseURIandHTTPVersion(char **url_p, HttpVersion * http_ver_p, ConnStateData::Pointer& conn, char *http_version_str);
 static int connReadWasError(ConnStateData::Pointer& conn, comm_err_t, int size, int xerrno);
 static int connFinishedWithConn(ConnStateData::Pointer& conn, int size);
-static void connNoteUseOfBuffer(ConnStateData::Pointer & conn, size_t byteCount);
+static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount);
 static int connKeepReadingIncompleteRequest(ConnStateData::Pointer & conn);
 static void connCancelIncompleteRequests(ConnStateData::Pointer & conn);
 
@@ -614,18 +615,6 @@ ConnStateData::close()
                     auth_user_request,this);
         auth_user_request->onConnectionClose(this);
     }
-
-    /*
-     * This is awkward: body has a RefCount::Pointer to this.  We must
-     * destroy body so that our own reference count will go to zero.
-     * Furthermore, there currently exists a potential loop because
-     * ~ConnStateData() will delete body if it is not NULL.
-     */
-    if (body) {
-        ClientBody *tmp = body;
-        body = NULL;
-        delete tmp;
-    }
 }
 
 bool
@@ -649,8 +638,7 @@ ConnStateData::~ConnStateData()
 
     cbdataReferenceDone(port);
 
-    if (body)
-        delete body;
+    body_reader = NULL;                // refcounted
 }
 
 /*
@@ -1523,22 +1511,31 @@ ClientSocketContext::doClose()
 void
 ClientSocketContext::initiateClose()
 {
-    if (!http || !http->getConn().getRaw()) {
-        doClose();
-        return;
-    }
-
-    if (http->getConn()->body_size_left > 0)  {
-        debug(33, 5) ("ClientSocketContext::initiateClose: closing, but first we need to read the rest of the request\n");
-        /* XXX We assumes the reply does fit in the TCP transmit window.
-         * If not the connection may stall while sending the reply
-         * (before reaching here) if the client does not try to read the
-                * response while sending the request body. As of yet we have
-                * not received any complaints indicating this may be an issue.
+    if (http != NULL) {
+        ConnStateData::Pointer conn = http->getConn();
+
+        if (conn != NULL) {
+            if (conn->bodySizeLeft() > 0) {
+                debug(33, 5) ("ClientSocketContext::initiateClose: closing, but first we need to read the rest of the request\n");
+                /*
+                * XXX We assume the reply fits in the TCP transmit
+                * window.  If not the connection may stall while sending
+                * the reply (before reaching here) if the client does not
+                * try to read the response while sending the request body.
+                * As of yet we have not received any complaints indicating
+                * this may be an issue.
                 */
-        http->getConn()->closing(true);
-        clientAbortBody(http->request);
-        return;
+                conn->closing(true);
+                /*
+                 * Trigger the BodyReader abort handler, if necessary,
+                 * by destroying it.  It is a refcounted pointer, so
+                 * set it to NULL and let the destructor be called when
+                 * all references are gone.
+                 */
+                http->request->body_reader = NULL;     // refcounted
+                return;
+            }
+        }
     }
 
     doClose();
@@ -2119,7 +2116,7 @@ connFinishedWithConn(ConnStateData::Pointer & conn, int size)
 }
 
 void
-connNoteUseOfBuffer(ConnStateData::Pointer & conn, size_t byteCount)
+connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount)
 {
     assert(byteCount > 0 && byteCount <= conn->in.notYetUsed);
     conn->in.notYetUsed -= byteCount;
@@ -2171,13 +2168,14 @@ clientMaybeReadData(ConnStateData::Pointer &conn, int do_next_read)
 static void
 clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_read)
 {
-    fde *F = &fd_table[fd];
-
-    /* Check if a half-closed connection was aborted in the middle */
+    /*
+     * If (1) we are reading a message body, (2) and the connection
+     * is half-closed, and (3) we didn't get the entire HTTP request
+     * yet, then close this connection.
+     */
 
-    if (F->flags.socket_eof) {
-        if (conn->in.notYetUsed != conn->body_size_left) {
-            /* != 0 when no request body */
+    if (fd_table[fd].flags.socket_eof) {
+        if ((ssize_t) conn->in.notYetUsed < conn->bodySizeLeft()) {
             /* Partial request received. Abort client connection! */
             debug(33, 3) ("clientAfterReadingRequests: FD %d aborted, partial request\n", fd);
             comm_close(fd);
@@ -2195,7 +2193,7 @@ clientProcessRequest(ConnStateData::Pointer &conn, ClientSocketContext *context,
     HttpRequest *request = NULL;
     /* We have an initial client stream in place should it be needed */
     /* setup our private context */
-    connNoteUseOfBuffer(conn, http->req_sz);
+    connNoteUseOfBuffer(conn.getRaw(), http->req_sz);
 
     context->registerWithConn();
 
@@ -2304,9 +2302,17 @@ clientProcessRequest(ConnStateData::Pointer &conn, ClientSocketContext *context,
     /* Do we expect a request-body? */
 
     if (request->content_length > 0) {
-        conn->body = new ClientBody(conn, request);
-        conn->body_size_left = request->content_length;
-        request->body_connection = conn;
+        request->body_reader = new BodyReader(request->content_length,
+                                              clientReadBody,
+                                              clientAbortBody,
+                                              NULL,
+                                              conn.getRaw());
+        conn->body_reader = request->body_reader;
+        request->body_reader->notify(conn->in.notYetUsed);
+
+        if (request->body_reader->remaining())
+            conn->readSomeData();
+
         /* Is it too large? */
 
         if (!clientIsRequestBodyValid(request->content_length) ||
@@ -2359,6 +2365,21 @@ connOkToAddRequest(ConnStateData::Pointer &conn)
     return result;
 }
 
+/*
+ * bodySizeLeft
+ *
+ * Report on the number of bytes of body content that we
+ * know are yet to be read on this connection.
+ */
+ssize_t
+ConnStateData::bodySizeLeft()
+{
+    if (body_reader != NULL)
+        return body_reader->remaining();
+
+    return 0;
+}
+
 /*
  * Attempt to parse one or more requests from the input buffer.
  * If a request is successfully parsed, even if the next request
@@ -2377,7 +2398,7 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read)
 
     debug(33, 5) ("clientParseRequest: FD %d: attempting to parse\n", conn->fd);
 
-    while (conn->in.notYetUsed > 0 && conn->body_size_left == 0) {
+    while (conn->in.notYetUsed > 0 && conn->bodySizeLeft() == 0) {
         size_t req_line_sz;
         connStripBufferWhitespace (conn);
 
@@ -2416,7 +2437,7 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read)
             parsed_req = true;
 
             if (context->mayUseConnection()) {
-                debug (33, 3) ("clientReadRequest: Not reading, as this request may need the connection\n");
+                debug (33, 3) ("clientParseRequest: Not reading, as this request may need the connection\n");
                 do_next_read = 0;
                 break;
             }
@@ -2426,9 +2447,9 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read)
                 break;
             }
 
-            continue;          /* while offset > 0 && body_size_left == 0 */
+            continue;          /* while offset > 0 && conn->bodySizeLeft() == 0 */
         }
-    }                          /* while offset > 0 && conn->body_size_left == 0 */
+    }                          /* while offset > 0 && conn->bodySizeLeft() == 0 */
 
     return parsed_req;
 }
@@ -2437,6 +2458,7 @@ static void
 clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
                   void *data)
 {
+    debugs(33,5,HERE << "clientReadRequest FD " << fd << " size " << size);
     ConnStateData::Pointer conn ((ConnStateData *)data);
     conn->reading(false);
     bool do_next_read = 1; /* the default _is_ to read data! - adrian */
@@ -2463,6 +2485,15 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
 
     if (flag == COMM_OK) {
         if (size > 0) {
+            /*
+             * If this assertion fails, we need to handle the case 
+             * where a persistent connection has some leftover body
+             * request data from a previous, aborted transaction.
+             * Probably just decrement size and adjust/memmove the
+             * 'buf' pointer accordingly.
+             */
+            assert(conn->in.abortedSize == 0);
+
             char *current_buf = conn->in.addressToReadInto();
             kb_incr(&statCounter.client_http.kbytes_in, size);
 
@@ -2472,6 +2503,9 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
             conn->in.notYetUsed += size;
 
             conn->in.buf[conn->in.notYetUsed] = '\0';   /* Terminate the string */
+
+            if (conn->body_reader != NULL)
+                conn->body_reader->notify(conn->in.notYetUsed);
         } else if (size == 0) {
             debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd);
 
@@ -2497,12 +2531,6 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
         }
     }
 
-
-    /* Process request body if any */
-    if (conn->in.notYetUsed > 0 && conn->body && conn->body->hasCallback()) {
-        conn->body->process();
-    }
-
     /* Process next request */
     if (conn->getConcurrentRequestCount() == 0)
         fd_note(conn->fd, "Reading next request");
@@ -2525,64 +2553,66 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
     }
 }
 
-/* file_read like function, for reading body content */
-void
-clientReadBody(HttpRequest * request, char *buf, size_t size, CBCB * callback,
-               void *cbdata)
+/*
+ * clientReadBody
+ *
+ * A request to receive some HTTP request body data.  This is a
+ * 'read_func' of BodyReader class.  Feels to me like this function
+ * belongs to ConnStateData class.
+ *
+ * clientReadBody is of type 'BodyReadFunc'
+ */
+size_t
+clientReadBody(void *data, MemBuf &mb, size_t size)
 {
-    ConnStateData::Pointer conn = request->body_connection;
+    ConnStateData *conn = (ConnStateData *) data;
+    assert(conn);
+    debugs(32,3,HERE << "clientReadBody requested size " << size);
+    debugs(32,3,HERE << "clientReadBody FD " << conn->fd);
+    debugs(32,3,HERE << "clientReadBody in.notYetUsed " << conn->in.notYetUsed);
 
-    if (conn.getRaw() == NULL) {
-        debug(33, 5) ("clientReadBody: no body to read, request=%p\n", request);
-        callback(buf, 0, cbdata);      /* Signal end of body */
-        return;
-    }
+    if (size > conn->in.notYetUsed)
+        size = conn->in.notYetUsed;
+
+    debugs(32,3,HERE << "clientReadBody actual size " << size);
+
+    assert(size);
+
+    mb.append(conn->in.buf, size);
 
-    debug(33, 2) ("clientReadBody: start FD %d body_size=%lu in.notYetUsed=%ld cb=%p req=%p\n",
-                  conn->fd, (unsigned long int) conn->body_size_left,
-                  (unsigned long int) conn->in.notYetUsed, callback, request);
-    conn->body->init(buf, size, callback, cbdata);
-    conn->body->process();
+    connNoteUseOfBuffer(conn, size);
+
+    return size;
 }
 
-/* A dummy handler that throws away a request-body */
+/*
+ * clientAbortBody
+ *
+ * A dummy callback that consumes the remains of a request
+ * body for an aborted transaction.
+ *
+ * clientAbortBody is of type 'BodyAbortFunc'
+ */
 static void
-clientReadBodyAbortHandler(char *buf, ssize_t size, void *data)
+clientAbortBody(void *data, size_t remaining)
 {
-    static char bodyAbortBuf[SQUID_TCP_SO_RCVBUF];
     ConnStateData *conn = (ConnStateData *) data;
-    debug(33, 2) ("clientReadBodyAbortHandler: FD %d body_size=%lu in.notYetUsed=%lu\n",
-                  conn->fd, (unsigned long int) conn->body_size_left,
-                  (unsigned long) conn->in.notYetUsed);
+    debugs(32,3,HERE << "clientAbortBody FD " << conn->fd);
+    debugs(32,3,HERE << "clientAbortBody in.notYetUsed " << conn->in.notYetUsed);
+    debugs(32,3,HERE << "clientAbortBody remaining " << remaining);
+    conn->in.abortedSize += remaining;
 
-    if (size != 0 && conn->body_size_left != 0) {
-        debug(33, 3) ("clientReadBodyAbortHandler: FD %d shedule next read\n",
-                      conn->fd);
-        conn->body->init(bodyAbortBuf, sizeof(bodyAbortBuf), clientReadBodyAbortHandler, data);
+    if (conn->in.notYetUsed) {
+        size_t to_discard = XMIN(conn->in.notYetUsed, conn->in.abortedSize);
+        debugs(32,3,HERE << "to_discard " << to_discard);
+        conn->in.abortedSize -= to_discard;
+        connNoteUseOfBuffer(conn, to_discard);
     }
 
     if (conn->closing())
         comm_close(conn->fd);
 }
 
-/* Abort a body request */
-int
-clientAbortBody(HttpRequest * request)
-{
-    ConnStateData::Pointer conn = request->body_connection;
-
-    if (conn.getRaw() == NULL || conn->body_size_left <= 0 || conn->body->getRequest() != request)
-        return 0;              /* No body to abort */
-
-    if (conn->body->hasCallback())
-        conn->body->negativeCallback();
-
-    clientReadBodyAbortHandler(NULL, -1, conn.getRaw());       /* Install abort handler */
-
-    /* ClientBody::process() */
-    return 1;                  /* Aborted */
-}
-
 /* general lifetime handler for HTTP requests */
 static void
 requestTimeout(int fd, void *data)
@@ -3192,7 +3222,7 @@ ConnStateData::operator delete (void *address)
     cbdataFree(t);
 }
 
-ConnStateData::ConnStateData() : body_size_left(0), transparent_ (false), reading_ (false), closing_ (false)
+ConnStateData::ConnStateData() : transparent_ (false), reading_ (false), closing_ (false)
 {
     openReference = this;
 }
index 1979e765b465b96ec3a5929973780e7535805a9c..11316e302121e5e50adf728f090cee042ee57cbc 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side.h,v 1.14 2006/03/04 05:38:34 wessels Exp $
+ * $Id: client_side.h,v 1.15 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -35,6 +35,7 @@
 #define SQUID_CLIENTSIDE_H
 
 #include "StoreIOBuffer.h"
+#include "BodyReader.h"
 #include "RefCount.h"
 
 class ConnStateData;
@@ -43,9 +44,6 @@ class ClientHttpRequest;
 
 class clientStreamNode;
 
-class ClientBody;
-;
-
 template <class T>
 
 class Range;
@@ -159,19 +157,37 @@ public:
         char *buf;
         size_t notYetUsed;
         size_t allocatedSize;
+        /*
+         * abortedSize is the amount of data that should be read
+         * from the socket and immediately discarded.  It may be
+         * set when there is a request body and that transaction
+         * gets aborted.  The client side should read the remaining
+         * body content and just discard it, if the connection
+         * will be staying open.
+         */
+        size_t abortedSize;
     }
 
     in;
 
-    ClientBody *body;
-    size_t body_size_left;
-
-    auth_type_t auth_type;     /* Is this connection based authentication? if so what type it is. */
-    /* note this is ONLY connection based because NTLM is against HTTP spec */
-    /* the user details for connection based authentication */
+    BodyReader::Pointer body_reader;
+    ssize_t bodySizeLeft();
+
+    /*
+     * Is this connection based authentication? if so what type it
+     * is.
+     */
+    auth_type_t auth_type;
+    /*
+     * note this is ONLY connection based because NTLM is against HTTP spec.
+     * the user details for connection based authentication
+     */
     auth_user_request_t *auth_user_request;
-    /* TODO: generalise the connection owner concept */
-    ClientSocketContext::Pointer currentobject;        /* used by the owner of the connection. Opaque otherwise */
+    /*
+     * used by the owner of the connection, opaque otherwise
+     * TODO: generalise the connection owner concept.
+     */
+    ClientSocketContext::Pointer currentobject;
 
     struct sockaddr_in peer;
 
index 813855fdc821c2519d1e31e82088e05f610c76e8..09e094183ab879a428d6a40f9de20b69dcdf9193 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side_request.cc,v 1.61 2006/04/23 11:10:31 robertc Exp $
+ * $Id: client_side_request.cc,v 1.62 2006/04/27 19:27:37 wessels Exp $
  * 
  * DEBUG: section 85    Client-side Request Routines
  * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c)
@@ -238,9 +238,9 @@ ClientHttpRequest::~ClientHttpRequest()
      * found the end of the body yet
      */
 
-    if (request && request->body_connection.getRaw() != NULL) {
-        clientAbortBody(request);      /* abort body transter */
-        request->body_connection = NULL;
+    if (request && request->body_reader != NULL) {
+        request->body_reader = NULL;   // refcounted
+        debugs(32,3,HERE << "setting body_reader = NULL for request " << request);
     }
 
     /* the ICP check here was erroneous
@@ -870,9 +870,10 @@ ClientRequestContext::clientRedirectDone(char *result)
             ;
         }
 
-        if (old_request->body_connection.getRaw() != NULL) {
-            new_request->body_connection = old_request->body_connection;
-            old_request->body_connection = NULL;
+        if (old_request->body_reader != NULL) {
+            new_request->body_reader = old_request->body_reader;
+            old_request->body_reader = NULL;
+            debugs(0,0,HERE << "setting body_reader = NULL for request " << old_request);
         }
 
         new_request->content_length = old_request->content_length;
@@ -1114,17 +1115,116 @@ ClientHttpRequest::doIcap(ICAPServiceRep::Pointer service)
     icap = new ICAPClientReqmodPrecache(service);
     (void) cbdataReference(icap);
     icap->startReqMod(this, request);
-    icap->doneSending();
+
+    if (request->body_reader == NULL) {
+        debugs(32,3,HERE << "client request hasnt body...");
+        icap->doneSending();
+
+    }
+
     return 0;
 }
 
+/*
+ * icapSendRequestBodyWrapper
+ *
+ * A callback wrapper for ::icapSendRequestBody()
+ *
+ * icapSendRequestBodyWrapper is of type CBCB
+ */
+void
+ClientHttpRequest::icapSendRequestBodyWrapper(MemBuf &mb, void *data)
+{
+    ClientHttpRequest *chr = static_cast<ClientHttpRequest*>(data);
+    chr->icapSendRequestBody(mb);
+}
+
+
+/*
+ * icapSendRequestBody
+ *
+ * Sends some chunk of a request body to the ICAP side.  Must make sure
+ * that the ICAP-side can accept the data we have.  If there is more
+ * body data to read, then schedule another BodyReader callback.
+ */
+void
+ClientHttpRequest::icapSendRequestBody(MemBuf &mb)
+{
+    ssize_t size_to_send  = mb.contentSize();
+    debugs(32,3,HERE << "have " << mb.contentSize() << " bytes in mb");
+
+    if (size_to_send == 0) {
+        /*
+         * An error occurred during this transaction.  Tell ICAP that we're done.
+         */
+
+        if (icap)
+            icap->doneSending();
+
+        return;
+    }
+
+    debugs(32,3,HERE << "icap->potentialSpaceSize() = " << icap->potentialSpaceSize());
+
+    if (size_to_send > icap->potentialSpaceSize())
+        size_to_send = icap->potentialSpaceSize();
+
+    if (size_to_send) {
+        debugs(32,3,HERE << "sending " << size_to_send << " body bytes to ICAP");
+        StoreIOBuffer sbuf(size_to_send, 0, mb.content());
+        icap->sendMoreData(sbuf);
+        icap->body_reader->consume(size_to_send);
+        icap->body_reader->bytes_read += size_to_send;
+        debugs(32,3," HTTP client body bytes_read=" << icap->body_reader->bytes_read);
+    } else {
+        debugs(32,0,HERE << "cannot send body data to ICAP");
+        debugs(32,0,HERE << "\tBodyReader MemBuf has " << mb.contentSize());
+        debugs(32,0,HERE << "\tbut icap->potentialSpaceSize() is " << icap->potentialSpaceSize());
+        return;
+    }
+
+    /*
+     * If we sent some data this time, and there is more data to
+     * read, then schedule another read request via BodyReader.
+     */
+    if (size_to_send && icap->body_reader->remaining()) {
+        debugs(32,3,HERE << "calling body_reader->read()");
+        icap->body_reader->read(icapSendRequestBodyWrapper, this);
+    } else {
+        debugs(32,3,HERE << "No more request body bytes to send");
+        icap->doneSending();
+    }
+}
+
 /*
  * Called by ICAPAnchor when it has space available for us.
  */
 void
 ClientHttpRequest::icapSpaceAvailable()
 {
-    debug(85,3)("ClientHttpRequest::icapSpaceAvailable() called\n");
+    debugs(85,3,HERE << this << " ClientHttpRequest::icapSpaceAvailable() called\n");
+
+    if (request->body_reader != NULL && icap->body_reader == NULL) {
+        debugs(32,3,HERE << "reassigning HttpRequest->body_reader to ICAP");
+        /*
+         * ICAP hooks on to the BodyReader that gets data from
+         * ConnStateData.  We'll make a new BodyReader that
+         * HttpStateData can use if the adapted response has a
+         * request body.  See ICAPClientReqmodPrecache::noteSourceStart()
+         */
+        icap->body_reader = request->body_reader;
+        request->body_reader = NULL;
+    }
+
+    if (icap->body_reader == NULL)
+        return;
+
+    if (icap->body_reader->callbackPending())
+        return;
+
+    debugs(32,3,HERE << "Calling read() for body data");
+
+    icap->body_reader->read(icapSendRequestBodyWrapper, this);
 }
 
 void
@@ -1139,8 +1239,6 @@ ClientHttpRequest::takeAdaptedHeaders(HttpMsg *msg)
          * Move the "body_connection" over, then unlink old and
          * link new to the http state.
          */
-        new_req->body_connection = request->body_connection;
-        request->body_connection = NULL;
         HTTPMSGUNLOCK(request);
         request = HTTPMSGLOCK(new_req);
         /*
@@ -1195,12 +1293,18 @@ ClientHttpRequest::abortAdapting()
 
     if ((NULL == storeEntry()) || storeEntry()->isEmpty()) {
         debug(85,3)("WARNING: ICAP REQMOD callout failed, proceeding with original request\n");
-        doCallouts();
+
+        if (calloutContext)
+            doCallouts();
+
 #if ICAP_HARD_ERROR
 
         clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data;
+
         clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
+
         assert (repContext);
+
         // Note if this code is ever used, clientBuildError() should be modified to
         // accept an errno arg
         repContext->setReplyToError(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR,
@@ -1209,8 +1313,11 @@ ClientHttpRequest::abortAdapting()
                                     NULL, getConn().getRaw() != NULL
                                     && getConn()->auth_user_request ? getConn()->
                                     auth_user_request : request->auth_user_request, errno);
+
         node = (clientStreamNode *)client_stream.tail->data;
+
         clientStreamRead(node, this, node->readBuffer);
+
 #endif
 
         return;
index 6b60321bea5f43aca7d9a68e036c2ced9269fb85..535f634147f8327f4f3c066c7a5ae335f87e0548 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side_request.h,v 1.25 2006/04/25 07:13:33 robertc Exp $
+ * $Id: client_side_request.h,v 1.26 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -160,6 +160,8 @@ private:
 public:
     ICAPClientReqmodPrecache *icap;
     int doIcap(ICAPServiceRep::Pointer);
+    void icapSendRequestBody(MemBuf&);
+    static void icapSendRequestBodyWrapper(MemBuf&, void*);
     void icapSpaceAvailable();
     void takeAdaptedHeaders(HttpMsg *);
     void takeAdaptedBody(MemBuf *);
index 404de983b348676adcf157b8ebe3066c0bcd34db..598f2259c6e4dacef09b6e69594a4033e0da6cf9 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: forward.cc,v 1.137 2006/04/02 14:32:35 serassio Exp $
+ * $Id: forward.cc,v 1.138 2006/04/27 19:27:37 wessels Exp $
  *
  * DEBUG: section 17    Request Forwarding
  * AUTHOR: Duane Wessels
@@ -386,7 +386,7 @@ FwdState::checkRetriable()
      * even if the method is indempotent
      */
 
-    if (request->body_connection.getRaw() != NULL)
+    if (request->body_reader != NULL)
         return false;
 
     /* RFC2616 9.1 Safe and Idempotent Methods */
index b2189077a0b8c99047bc728f90b987c57b34d778..b54d302ef6c5c077000183ff7422f4cd5d1828d4 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: ftp.cc,v 1.391 2006/04/23 11:10:31 robertc Exp $
+ * $Id: ftp.cc,v 1.392 2006/04/27 19:27:37 wessels Exp $
  *
  * DEBUG: section 9     File Transfer Protocol (FTP)
  * AUTHOR: Harvest Derived
@@ -211,7 +211,7 @@ public:
     static IOCB ftpReadControlReply;
     static IOWCB ftpWriteCommandCallback;
     static HttpReply *ftpAuthRequired(HttpRequest * request, const char *realm);
-    static void ftpRequestBody(char *buf, ssize_t size, void *data);
+    static CBCB ftpRequestBody;
     static wordlist *ftpParseControlReply(char *, size_t, int *, int *);
 
 #if ICAP_CLIENT
@@ -2769,19 +2769,19 @@ ftpReadTransferDone(FtpStateData * ftpState)
 
 /* This will be called when there is data available to put */
 void
-FtpStateData::ftpRequestBody(char *buf, ssize_t size, void *data)
+FtpStateData::ftpRequestBody(MemBuf &mb, void *data)
 {
     FtpStateData *ftpState = (FtpStateData *) data;
-    debug(9, 3) ("ftpRequestBody: buf=%p size=%d ftpState=%p\n", buf, (int) size, data);
+    debugs(9, 3, HERE << "ftpRequestBody: size=" << mb.contentSize() << " ftpState=%p" << data);
 
-    if (size > 0) {
+    if (mb.contentSize() > 0) {
         /* DataWrite */
-        comm_write(ftpState->data.fd, buf, size, FtpStateData::ftpDataWriteCallback, ftpState);
-    } else if (size < 0) {
+        comm_write(ftpState->data.fd, mb.content(), mb.contentSize(), FtpStateData::ftpDataWriteCallback, ftpState);
+    } else if (mb.contentSize() < 0) {
         /* Error */
         debug(9, 1) ("ftpRequestBody: request aborted");
         ftpState->failed(ERR_READ_ERROR, 0);
-    } else if (size == 0) {
+    } else if (mb.contentSize() == 0) {
         /* End of transfer */
         ftpState->dataComplete();
     }
@@ -2815,11 +2815,7 @@ FtpStateData::ftpDataWrite(int ftp, void *data)
     FtpStateData *ftpState = (FtpStateData *) data;
     debug(9, 3) ("ftpDataWrite\n");
     /* This starts the body transfer */
-    clientReadBody(ftpState->request,
-                   ftpState->data.readBuf->content(),
-                   ftpState->data.readBuf->contentSize(),
-                   ftpRequestBody,
-                   ftpState);
+    ftpState->request->body_reader->read(ftpRequestBody, ftpState);
 }
 
 static void
index 27f6647a31d9a1d9015d091bd9c187b4a54deed9..7322f3a16470d4cccdc8d7d79b771401f1918b1b 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: http.cc,v 1.492 2006/04/22 09:02:44 robertc Exp $
+ * $Id: http.cc,v 1.493 2006/04/27 19:27:37 wessels Exp $
  *
  * DEBUG: section 11    Hypertext Transfer Protocol (HTTP)
  * AUTHOR: Harvest Derived
@@ -136,18 +136,12 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdStat
 HttpStateData::~HttpStateData()
 {
     /*
-     * don't forget about ~ServerStateData()
+     * don't forget that ~ServerStateData() gets called automatically
      */
 
-    if (request_body_buf) {
-        if (orig_request->body_connection != NULL) {
-            clientAbortBody(orig_request);
-        }
-
-        if (request_body_buf) {
-            memFree(request_body_buf, MEM_8K_BUF);
-            request_body_buf = NULL;
-        }
+    if (orig_request->body_reader != NULL) {
+        orig_request->body_reader = NULL;
+        debugs(32,3,HERE << "setting body_reader = NULL for request " << orig_request);
     }
 
     if (!readBuf->isNull())
@@ -1760,7 +1754,9 @@ HttpStateData::sendRequest()
     flags.do_next_read = 1;
     maybeReadData();
 
-    if (orig_request->body_connection != NULL)
+    debugs(32,3,HERE<< "request " << request << " body_reader = " << orig_request->body_reader.getRaw());
+
+    if (orig_request->body_reader != NULL)
         sendHeaderDone = HttpStateData::SendRequestEntityWrapper;
     else
         sendHeaderDone = HttpStateData::SendComplete;
@@ -1850,25 +1846,28 @@ HttpStateData::sendRequestEntityDone()
     }
 }
 
+/*
+ * RequestBodyHandlerWrapper
+ *
+ * BodyReader calls this when it has some body data for us.
+ * It is of type CBCB.
+ */
 void
-HttpStateData::RequestBodyHandlerWrapper(char *buf, ssize_t size, void *data)
+HttpStateData::RequestBodyHandlerWrapper(MemBuf &mb, void *data)
 {
     HttpStateData *httpState = static_cast<HttpStateData *>(data);
-    httpState->requestBodyHandler(buf, size);
+    httpState->requestBodyHandler(mb);
 }
 
 void
-HttpStateData::requestBodyHandler(char *buf, ssize_t size)
+HttpStateData::requestBodyHandler(MemBuf &mb)
 {
-    request_body_buf = NULL;
-
     if (eof || fd < 0) {
         debugs(11, 1, HERE << "Transaction aborted while reading HTTP body");
-        memFree8K(buf);
         return;
     }
 
-    if (size > 0) {
+    if (mb.contentSize() > 0) {
         if (flags.headers_parsed && !flags.abuse_detected) {
             flags.abuse_detected = 1;
             debug(11, 1) ("httpSendRequestEntryDone: Likely proxy abuse detected '%s' -> '%s'\n",
@@ -1876,21 +1875,27 @@ HttpStateData::requestBodyHandler(char *buf, ssize_t size)
                           storeUrl(entry));
 
             if (getReply()->sline.status == HTTP_INVALID_HEADER) {
-                memFree8K(buf);
                 comm_close(fd);
                 return;
             }
         }
 
-        comm_old_write(fd, buf, size, SendRequestEntityWrapper, this, memFree8K);
-    } else if (size == 0) {
+        /*
+         * mb's content will be consumed in the SendRequestEntityWrapper
+         * callback after comm_write is done.
+         */
+        flags.consume_body_data = 1;
+
+        comm_old_write(fd, mb.content(), mb.contentSize(), SendRequestEntityWrapper, this, NULL);
+    } else if (orig_request->body_reader == NULL) {
+        /* Failed to get whole body, probably aborted */
+        SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this);
+    } else if (orig_request->body_reader->remaining() == 0) {
         /* End of body */
-        memFree8K(buf);
         sendRequestEntityDone();
     } else {
         /* Failed to get whole body, probably aborted */
-        memFree8K(buf);
-        HttpStateData::SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this);
+        SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this);
     }
 }
 
@@ -1906,11 +1911,19 @@ HttpStateData::sendRequestEntity(int fd, size_t size, comm_err_t errflag)
 {
     debug(11, 5) ("httpSendRequestEntity: FD %d: size %d: errflag %d.\n",
                   fd, (int) size, errflag);
+    debugs(32,3,HERE << "httpSendRequestEntity called");
+    assert(orig_request->body_reader != NULL);
 
     if (size > 0) {
         fd_bytes(fd, size, FD_WRITE);
         kb_incr(&statCounter.server.all.kbytes_out, size);
         kb_incr(&statCounter.server.http.kbytes_out, size);
+
+        if (flags.consume_body_data) {
+            orig_request->body_reader->consume(size);
+            orig_request->body_reader->bytes_read += size;
+            debugs(32,3," HTTP server body bytes_read=" << orig_request->body_reader->bytes_read);
+        }
     }
 
     if (errflag == COMM_ERR_CLOSING)
@@ -1930,8 +1943,16 @@ HttpStateData::sendRequestEntity(int fd, size_t size, comm_err_t errflag)
         return;
     }
 
-    request_body_buf = (char *)memAllocate(MEM_8K_BUF);
-    clientReadBody(orig_request, request_body_buf, 8192, RequestBodyHandlerWrapper, this);
+    size_t r = orig_request->body_reader->remaining();
+    debugs(32,3,HERE << "body remaining = " << r);
+
+    if (r) {
+        debugs(32,3,HERE << "reading more body data");
+        orig_request->body_reader->read(RequestBodyHandlerWrapper, this);
+    } else {
+        debugs(32,3,HERE << "done reading body data");
+        sendRequestEntityDone();
+    }
 }
 
 void
index 437a62d2a140884516302f5a4fc6a94cd896ed32..7953eaa2be8e8e64a2ecac71972440fdf5ac7a3c 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: http.h,v 1.23 2006/03/07 18:47:48 wessels Exp $
+ * $Id: http.h,v 1.24 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -38,6 +38,7 @@
 #include "comm.h"
 #include "forward.h"
 #include "Server.h"
+#include "BodyReader.h"
 
 #if ICAP_CLIENT
 #include "ICAP/ICAPServiceRep.h"
@@ -84,7 +85,6 @@ public:
     HttpRequest *orig_request;
     int fd;
     http_state_flags flags;
-    char *request_body_buf;
     off_t currentOffset;
     size_t read_sz;
     int body_bytes_read;       /* to find end of response, independent of StoreEntry */
@@ -125,7 +125,7 @@ private:
     void transactionComplete();
     void writeReplyBody(const char *data, int len);
     void sendRequestEntityDone();
-    void requestBodyHandler(char *buf, ssize_t size);
+    void requestBodyHandler(MemBuf &);
     void sendRequestEntity(int fd, size_t size, comm_err_t errflag);
     mb_size_t buildRequestPrefix(HttpRequest * request,
                                  HttpRequest * orig_request,
index 0669db39dc5d27da9aaabf1fd56c46c8a9416b2c..ab61ef8ee4da211a731b806fd77bbb0132472fac 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: protos.h,v 1.522 2006/04/23 11:10:32 robertc Exp $
+ * $Id: protos.h,v 1.523 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -105,8 +105,6 @@ SQUIDCEXTERN void clientdbFreeMemory(void);
 SQUIDCEXTERN int clientdbEstablished(struct IN_ADDR, int);
 SQUIDCEXTERN void clientOpenListenSockets(void);
 SQUIDCEXTERN void clientHttpConnectionsClose(void);
-SQUIDCEXTERN void clientReadBody(HttpRequest * req, char *buf, size_t size, CBCB * callback, void *data);
-SQUIDCEXTERN int clientAbortBody(HttpRequest * req);
 SQUIDCEXTERN void httpRequestFree(void *);
 
 extern void clientAccessCheck(void *);
index d11d4ef1f9d0937e454798f8b0a2139d08e42aaa..7a398eee7f6907a5002b6a388af19fe43c1f1519 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: structs.h,v 1.539 2006/04/23 11:10:32 robertc Exp $
+ * $Id: structs.h,v 1.540 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -931,6 +931,9 @@ unsigned int request_sent:
 
 unsigned int do_next_read:
     1;
+
+unsigned int consume_body_data:
+    1;
 };
 
 struct _ipcache_addrs
index 405e6bd870027b2ba615e4dd76e81d38ba6db9f7..886345e70c93d1f4d232afe004867d4dd71aa634 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: typedefs.h,v 1.181 2006/04/23 11:10:32 robertc Exp $
+ * $Id: typedefs.h,v 1.182 2006/04/27 19:27:37 wessels Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -275,7 +275,6 @@ class wordlist;
 typedef void UH(void *data, wordlist *);
 typedef int READ_HANDLER(int, char *, int);
 typedef int WRITE_HANDLER(int, const char *, int);
-typedef void CBCB(char *buf, ssize_t size, void *data);
 
 typedef void STIOCB(void *their_data, int errflag, storeIOState *);
 typedef void STFNCB(void *their_data, int errflag, storeIOState *);