From: wessels <> Date: Fri, 28 Apr 2006 01:27:37 +0000 (+0000) Subject: Replacing ClientBody class with BodyReader. X-Git-Tag: SQUID_3_0_PRE4~217 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3b299123c03cdf557bdfae37260f1f80014890f3;p=thirdparty%2Fsquid.git Replacing ClientBody class with BodyReader. The old ClientBody code did not allow us to insert ICAP into the flow of an HTTP request. Code in http.cc called read functions in client_side.cc when forwarding a request body. But with ICAP in the middle, HTTP needs to get the message body from ICAP, not client_side. The new BodyReader is similar to ClientBody. Now read and abort functions are pointers, rather than hard-coded in HTTP/FTP modules. --- diff --git a/src/BodyReader.cc b/src/BodyReader.cc new file mode 100644 index 0000000000..37a913b2f2 --- /dev/null +++ b/src/BodyReader.cc @@ -0,0 +1,147 @@ + + +#include "squid.h" +#include "MemBuf.h" +#include "BodyReader.h" + +BodyReader::BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d) : + _remaining(len), _available(0), + read_func(r), abort_func(a), kick_func(k), read_func_data(d), + read_callback(NULL), read_callback_data(NULL) +{ + theBuf.init(4096, 65536); + debugs(32,3,HERE << this << " " << "created new BodyReader for content-length " << len); + bytes_read = 0; +} + +BodyReader::~BodyReader() +{ + if (_remaining && abort_func) + abort_func(read_func_data, _remaining); + + if (callbackPending()) + doCallback(); + +} + +void +BodyReader::read(CBCB *callback, void *cbdata) +{ + assert(_remaining || theBuf.contentSize()); + debugs(32,3,HERE << this << " " << "remaining = " << _remaining); + debugs(32,3,HERE << this << " " << "available = " << _available); + + if (read_callback == NULL) { + read_callback = callback; + read_callback_data = cbdataReference(cbdata); + } else { + assert(read_callback == callback); + assert(read_callback_data == cbdata); + } + + if ((_available == 0) && (theBuf.contentSize() == 0)) { + debugs(32,3,HERE << this << " " << "read: no body data available, saving callback pointers"); + + if (kick_func) + kick_func(read_func_data); + + return; + } + + debugs(32,3,HERE << this << " " << "read_func=" << (int) read_func); + debugs(32,3,HERE << this << " " << "data=" << read_func_data); + size_t size = theBuf.potentialSpaceSize(); + + if (size > _available) + size = _available; + + if (size > 0) { + debugs(32,3,HERE << this << " " << "calling read_func for " << size << " bytes"); + + size_t nread = read_func(read_func_data, theBuf, size); + + if (nread > 0) { + _available -= nread; + _remaining -= nread; + } else { + debugs(32,3,HERE << this << " " << "Help, read_func() ret " << nread); + } + } + + if (theBuf.contentSize() > 0) { + debugs(32,3,HERE << this << " have " << theBuf.contentSize() << " bytes in theBuf, calling back"); + doCallback(); + } +} + +void +BodyReader::notify(size_t now_available) +{ + debugs(32,3,HERE << this << " " << "old available = " << _available); + debugs(32,3,HERE << this << " " << "now_available = " << now_available); + _available = now_available; + + if (!callbackPending()) { + debugs(32,3,HERE << this << " " << "no callback pending, nothing to do"); + return; + } + + debugs(32,3,HERE << this << " " << "have data and pending callback, calling read()"); + + read(read_callback, read_callback_data); +} + +bool +BodyReader::callbackPending() +{ + return read_callback ? true : false; +} + +/* + * doCallback + * + * Execute the read callback if there is a function registered + * and the read_callback_data is still valid. + */ +bool +BodyReader::doCallback() +{ + CBCB *t_callback = read_callback; + void *t_cbdata; + + if (t_callback == NULL) + return false; + + read_callback = NULL; + + if (!cbdataReferenceValidDone(read_callback_data, &t_cbdata)) + return false; + + debugs(32,3,HERE << this << " doing callback, theBuf size = " << theBuf.contentSize()); + + t_callback(theBuf, t_cbdata); + + return true; +} + +bool +BodyReader::consume(size_t size) +{ + debugs(32,3,HERE << this << " BodyReader::consume consuming " << size); + + if (theBuf.contentSize() < (mb_size_t) size) { + debugs(0,0,HERE << this << "BodyReader::consume failed"); + debugs(0,0,HERE << this << "BodyReader::consume size = " << size); + debugs(0,0,HERE << this << "BodyReader::consume contentSize() = " << theBuf.contentSize()); + return false; + } + + theBuf.consume(size); + + if (callbackPending() && _available > 0) { + debugs(32,3,HERE << this << " " << "data avail and pending callback, calling read()"); + read(read_callback, read_callback_data); + } + + return true; +} diff --git a/src/BodyReader.h b/src/BodyReader.h new file mode 100644 index 0000000000..0087587b70 --- /dev/null +++ b/src/BodyReader.h @@ -0,0 +1,51 @@ + +#ifndef SQUID_BODY_READER_H +#define SQUID_BODY_READER_H + +typedef void CBCB (MemBuf &mb, void *data); +typedef size_t BodyReadFunc (void *, MemBuf &mb, size_t size); +typedef void BodyAbortFunc (void *, size_t); +typedef void BodyKickFunc (void *); + +class BodyReader : public RefCountable +{ + +public: + typedef RefCount Pointer; + BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d); + ~BodyReader(); + void read(CBCB *, void *); + void notify(size_t now_available); + size_t remaining() { return _remaining; } + + bool callbackPending(); + bool consume(size_t size); + + int bytes_read; + +private: + size_t _remaining; + size_t _available; + MemBuf theBuf; + + /* + * These are for interacting with things that + * "provide" body content. ie, ConnStateData and + * ICAPReqMod after adapation. + */ + BodyReadFunc *read_func; + BodyAbortFunc *abort_func; + BodyKickFunc *kick_func; + void *read_func_data; + + /* + * These are for interacting with things that + * "consume" body content. ie, HttpStateData and + * ICAPReqMod before adaptation. + */ + CBCB *read_callback; + void *read_callback_data; + bool doCallback(); +}; + +#endif diff --git a/src/HttpRequest.cc b/src/HttpRequest.cc index 34f57c0e60..3817fd4196 100644 --- a/src/HttpRequest.cc +++ b/src/HttpRequest.cc @@ -1,6 +1,6 @@ /* - * $Id: HttpRequest.cc,v 1.61 2006/04/22 05:29:18 robertc Exp $ + * $Id: HttpRequest.cc,v 1.62 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 73 HTTP Request * AUTHOR: Duane Wessels @@ -86,7 +86,7 @@ HttpRequest::init() my_addr = no_addr; my_port = 0; client_port = 0; - body_connection = NULL; + body_reader = NULL; // hier errType = ERR_NONE; peer_login = NULL; // not allocated/deallocated by this class @@ -101,8 +101,8 @@ HttpRequest::init() void HttpRequest::clean() { - if (body_connection.getRaw() != NULL) - fatal ("request being destroyed with body connection intact\n"); + if (body_reader != NULL) + fatal ("request being destroyed with body reader intact\n"); if (auth_user_request) { auth_user_request->unlock(); diff --git a/src/HttpRequest.h b/src/HttpRequest.h index 48aeb95a4d..abf3e52e48 100644 --- a/src/HttpRequest.h +++ b/src/HttpRequest.h @@ -1,6 +1,6 @@ /* - * $Id: HttpRequest.h,v 1.19 2006/02/17 18:10:59 wessels Exp $ + * $Id: HttpRequest.h,v 1.20 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -37,6 +37,7 @@ #include "HttpMsg.h" #include "client_side.h" #include "HierarchyLogEntry.h" +#include "BodyReader.h" /* Http Request */ extern int httpRequestHdrAllowed(const HttpHeaderEntry * e, String * strConnection); @@ -104,7 +105,7 @@ public: unsigned short client_port; - ConnStateData::Pointer body_connection; /* used by clientReadBody() */ + BodyReader::Pointer body_reader; HierarchyLogEntry hier; diff --git a/src/ICAP/ICAPClientReqmodPrecache.cc b/src/ICAP/ICAPClientReqmodPrecache.cc index fd81eef476..0b4507c6db 100644 --- a/src/ICAP/ICAPClientReqmodPrecache.cc +++ b/src/ICAP/ICAPClientReqmodPrecache.cc @@ -26,8 +26,17 @@ ICAPClientReqmodPrecache::~ICAPClientReqmodPrecache() if (virgin != NULL) freeVirgin(); - if (adapted != NULL) + if (adapted != NULL) { + /* + * adapted->sink is equal to this. Remove the pointer since + * we are deleting this. + */ + + if (adapted->sink) + adapted->sink = NULL; + freeAdapted(); + } } void ICAPClientReqmodPrecache::startReqMod(ClientHttpRequest *aHttp, HttpRequest *request) @@ -116,18 +125,33 @@ void ICAPClientReqmodPrecache::noteSourceStart(MsgPipe *p) * tell the other side to use the original request/response * headers. */ + HttpRequest *req = dynamic_cast(adapted->data->header); + + if (req && req->content_length > 0) { + assert(req->body_reader == NULL); + req->body_reader = new BodyReader(req->content_length, readBody, abortBody, kickBody, this); + } + http->takeAdaptedHeaders(adapted->data->header); noteSourceProgress(p); } -// ICAP client sends more data +/* + * This is where we receive a notification from the other + * side of the MsgPipe that new adapted data is available. + * We, in turn, tell whoever is reading from the request's + * body_reader about the new data. + */ void ICAPClientReqmodPrecache::noteSourceProgress(MsgPipe *p) { debug(93,3)("ICAPClientReqmodPrecache::noteSourceProgress() called\n"); //tell ClientHttpRequest to store a fresh portion of the adapted response if (p->data->body->hasContent()) { - http->takeAdaptedBody(p->data->body); + HttpRequest *req = dynamic_cast(adapted->data->header); + assert(req); + debugs(32,3,HERE << "notifying body_reader, contentSize() = " << p->data->body->contentSize()); + req->body_reader->notify(p->data->body->contentSize()); } } @@ -159,6 +183,12 @@ void ICAPClientReqmodPrecache::stop(Notify notify) freeVirgin(); } +#if DONT_FREE_ADAPTED + /* + * NOTE: We do not clean up "adapted->sink" here because it may + * have an HTTP message body that needs to stay around a little + * while longer so that the HTTP server-side can forward it on. + */ if (adapted != NULL) { if (notify == notifyIcap) adapted->sendSinkAbort(); @@ -168,6 +198,8 @@ void ICAPClientReqmodPrecache::stop(Notify notify) freeAdapted(); } +#endif + if (http) { if (notify == notifyOwner) // tell ClientHttpRequest that we are aborting prematurely @@ -189,3 +221,60 @@ void ICAPClientReqmodPrecache::freeAdapted() { adapted = NULL; // refcounted } + +/* + * Something that needs to read the adapated request body + * calls this function, via the BodyReader class. We copy + * the body data from our bodybuf object to the BodyReader + * MemBuf, which was passed as a reference to this function. + */ +size_t +ICAPClientReqmodPrecache::readBody(void *data, MemBuf &mb, size_t size) +{ + ICAPClientReqmodPrecache *icap = static_cast(data); + assert(icap != NULL); + assert(icap->adapted != NULL); + assert(icap->adapted->data != NULL); + MemBuf *bodybuf = icap->adapted->data->body; + assert(bodybuf != NULL); + debugs(32,3,HERE << "readBody requested size " << size); + debugs(32,3,HERE << "readBody bodybuf size " << bodybuf->contentSize()); + + if ((mb_size_t) size > bodybuf->contentSize()) + size = bodybuf->contentSize(); + + debugs(32,3,HERE << "readBody actual size " << size); + + assert(size); + + mb.append(bodybuf->content(), size); + + bodybuf->consume(size); + + return size; +} + +void +ICAPClientReqmodPrecache::abortBody(void *data, size_t remaining) +{ + if (remaining >= 0) { + debugs(0,0,HERE << "ICAPClientReqmodPrecache::abortBody size " << remaining); + // more? + } + + ICAPClientReqmodPrecache *icap = static_cast(data); + icap->stop(notifyIcap); +} + +/* + * Restart reading the adapted response from the ICAP server in case + * the body buffer became full and we stopped reading. + */ +void +ICAPClientReqmodPrecache::kickBody(void *data) +{ + debugs(32,3,HERE << "ICAPClientReqmodPrecache::kickBody"); + ICAPClientReqmodPrecache *icap = static_cast(data); + assert(icap->adapted != NULL); + icap->adapted->sendSinkNeed(); +} diff --git a/src/ICAP/ICAPClientReqmodPrecache.h b/src/ICAP/ICAPClientReqmodPrecache.h index f46312768a..bf5d527357 100644 --- a/src/ICAP/ICAPClientReqmodPrecache.h +++ b/src/ICAP/ICAPClientReqmodPrecache.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPClientReqmodPrecache.h,v 1.2 2005/12/22 22:26:31 wessels Exp $ + * $Id: ICAPClientReqmodPrecache.h,v 1.3 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -79,6 +79,7 @@ public: ClientHttpRequest *http; MsgPipe::Pointer virgin; MsgPipe::Pointer adapted; + BodyReader::Pointer body_reader; private: typedef enum { notifyNone, notifyOwner, notifyIcap } Notify; @@ -86,6 +87,12 @@ private: void freeVirgin(); void freeAdapted(); CBDATA_CLASS2(ICAPClientReqmodPrecache); + + // Hooks to BodyReader so HttpStateData can get the + // adapted request body + static BodyReadFunc readBody; + static BodyAbortFunc abortBody; + static BodyKickFunc kickBody; }; #endif /* SQUID_ICAPCLIENTSIDEHOOK_H */ diff --git a/src/ICAP/ICAPConfig.cc b/src/ICAP/ICAPConfig.cc index 4d3c634a92..49381ac9d2 100644 --- a/src/ICAP/ICAPConfig.cc +++ b/src/ICAP/ICAPConfig.cc @@ -1,6 +1,6 @@ /* - * $Id: ICAPConfig.cc,v 1.8 2006/04/27 19:07:16 wessels Exp $ + * $Id: ICAPConfig.cc,v 1.9 2006/04/27 19:27:37 wessels Exp $ * * SQUID Web Proxy Cache http://www.squid-cache.org/ * ---------------------------------------------------------- @@ -47,6 +47,7 @@ extern ConfigParser LegacyParser; // from cache_cf.cc ICAPConfig TheICAPConfig; +extern ConfigParser LegacyParser; // found in cache_cf.cc ICAPServiceRep::Pointer ICAPConfig::findService(const String& key) diff --git a/src/ICAP/ICAPModXact.cc b/src/ICAP/ICAPModXact.cc index 3559d443ff..a0bbe6b58c 100644 --- a/src/ICAP/ICAPModXact.cc +++ b/src/ICAP/ICAPModXact.cc @@ -149,12 +149,13 @@ void ICAPModXact::handleCommConnected() requestBuf.init(); makeRequestHeaders(requestBuf); - debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" << + debugs(93, 9, "ICAPModXact ICAP status " << status() << " will write:\n" << (requestBuf.terminate(), requestBuf.content())); // write headers state.writing = State::writingHeaders; scheduleWrite(requestBuf); + virgin->sendSinkNeed(); } void ICAPModXact::handleCommWrote(size_t sz) @@ -172,13 +173,11 @@ void ICAPModXact::handleCommWroteHeaders() Must(state.writing == State::writingHeaders); if (virginBody.expected()) { - debugs(98, 5, HERE); state.writing = preview.enabled() ? State::writingPreview : State::writingPrime; virginWriteClaim.protectAll(); writeMore(); } else { - debugs(98, 5, HERE); stopWriting(); } } @@ -425,15 +424,21 @@ void ICAPModXact::startReading() void ICAPModXact::readMore() { - if (reader || doneReading()) + if (reader || doneReading()) { + debugs(32,3,HERE << "returning from readMore because reader or doneReading()"); return; + } // do not fill readBuf if we have no space to store the result - if (!adapted->data->body->hasPotentialSpace()) + if (!adapted->data->body->hasPotentialSpace()) { + debugs(93,1,HERE << "Not reading because ICAP reply buffer is full"); return; + } if (readBuf.hasSpace()) scheduleRead(); + else + debugs(93,1,HERE << "nothing to do because !readBuf.hasSpace()"); } // comm module read a portion of the ICAP response for us @@ -788,8 +793,13 @@ bool ICAPModXact::parsePresentBody() if (parsed) return true; - if (bodyParser->needsMoreData()) + debugs(32,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData()); + + if (bodyParser->needsMoreData()) { + debugs(32,3,HERE << this); Must(mayReadMore()); + readMore(); + } if (bodyParser->needsMoreSpace()) { Must(!doneSending()); // can hope for more space @@ -864,11 +874,10 @@ void ICAPModXact::noteSinkNeed(MsgPipe *p) if (state.sending == State::sendingVirgin) echoMore(); + else if (state.sending == State::sendingAdapted) + parseMore(); else - if (state.sending == State::sendingAdapted) - parseMore(); - else - Must(state.sending == State::sendingUndecided); + Must(state.sending == State::sendingUndecided); ICAPXaction_Exit(); } diff --git a/src/ICAP/ICAPXaction.cc b/src/ICAP/ICAPXaction.cc index d4bf8fa05f..2411127657 100644 --- a/src/ICAP/ICAPXaction.cc +++ b/src/ICAP/ICAPXaction.cc @@ -54,6 +54,7 @@ void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, void static void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data) { + debugs(93,3,HERE << data << " read returned " << size); ICAPXaction_fromData(data).noteCommRead(status, size); } @@ -238,8 +239,10 @@ void ICAPXaction::handleCommClosed() bool ICAPXaction::done() const { - if (stopReason != NULL) // mustStop() has been called + if (stopReason != NULL) { // mustStop() has been called + debugs(93,1,HERE << "ICAPXaction is done() because " << stopReason); return true; + } return doneAll(); } @@ -276,7 +279,7 @@ void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz) Must(commStatus == COMM_OK); Must(sz >= 0); - debugs(93, 5, HERE << "read " << sz << " bytes"); + debugs(93, 3, HERE << "read " << sz << " bytes"); /* * See comments in ICAPXaction.h about why we use commBuf @@ -396,7 +399,7 @@ void ICAPXaction::callException(const TextException &e) void ICAPXaction::callEnd() { if (done()) { - debugs(93, 5, "ICAPXaction::" << inCall << " ends xaction " << + debugs(93, 5, HERE << "ICAPXaction::" << inCall << " ends xaction " << status()); doStop(); // may delete us return; diff --git a/src/ICAP/MsgPipe.cc b/src/ICAP/MsgPipe.cc index 3c18f72010..0c6c7e00e9 100644 --- a/src/ICAP/MsgPipe.cc +++ b/src/ICAP/MsgPipe.cc @@ -32,8 +32,8 @@ MsgPipe::MsgPipe(const char *aName): name(aName), MsgPipe::~MsgPipe() { delete data; - delete source; - delete sink; + assert(source == NULL); + assert(sink == NULL); }; void MsgPipe::sendSourceStart() diff --git a/src/Makefile.am b/src/Makefile.am index 7f2b95c745..30f496371b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.am,v 1.138 2006/04/27 19:04:15 wessels Exp $ +# $Id: Makefile.am,v 1.139 2006/04/27 19:27:37 wessels Exp $ # # Uncomment and customize the following to suit your needs: # @@ -376,8 +376,8 @@ squid_SOURCES = \ client_side_reply.h \ client_side_request.cc \ client_side_request.h \ - ClientBody.cc \ - ClientBody.h \ + BodyReader.cc \ + BodyReader.h \ ClientRequestContext.h \ clientStream.cc \ clientStream.h \ @@ -625,7 +625,7 @@ recv_announce_SOURCES = recv-announce.cc SquidNew.cc ## SwapDir wants ConfigOption ufsdump_SOURCES = \ - ClientBody.cc \ + BodyReader.cc \ ConfigParser.cc \ debug.cc \ int.cc \ diff --git a/src/Makefile.in b/src/Makefile.in index 1f851cb61f..87f95d48d8 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -17,7 +17,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.in,v 1.376 2006/04/27 19:04:15 wessels Exp $ +# $Id: Makefile.in,v 1.377 2006/04/27 19:27:37 wessels Exp $ # # Uncomment and customize the following to suit your needs: # @@ -163,8 +163,8 @@ am__squid_SOURCES_DIST = access_log.cc AccessLogEntry.h acl.cc \ AuthUser.cc AuthUserRequest.cc cache_cf.cc CacheDigest.cc \ cache_manager.cc carp.cc cbdata.cc client_db.cc client_side.cc \ client_side.h client_side_reply.cc client_side_reply.h \ - client_side_request.cc client_side_request.h ClientBody.cc \ - ClientBody.h ClientRequestContext.h clientStream.cc \ + client_side_request.cc client_side_request.h BodyReader.cc \ + BodyReader.h ClientRequestContext.h clientStream.cc \ clientStream.h comm.cc comm.h CommIO.h comm_select.cc \ comm_poll.cc comm_epoll.cc comm_kqueue.cc CommRead.h \ ConfigOption.cc ConfigParser.cc ConfigParser.h \ @@ -277,7 +277,7 @@ am_squid_OBJECTS = access_log.$(OBJEXT) acl.$(OBJEXT) \ CacheDigest.$(OBJEXT) cache_manager.$(OBJEXT) carp.$(OBJEXT) \ cbdata.$(OBJEXT) client_db.$(OBJEXT) client_side.$(OBJEXT) \ client_side_reply.$(OBJEXT) client_side_request.$(OBJEXT) \ - ClientBody.$(OBJEXT) clientStream.$(OBJEXT) comm.$(OBJEXT) \ + BodyReader.$(OBJEXT) clientStream.$(OBJEXT) comm.$(OBJEXT) \ comm_select.$(OBJEXT) comm_poll.$(OBJEXT) comm_epoll.$(OBJEXT) \ comm_kqueue.$(OBJEXT) ConfigOption.$(OBJEXT) \ ConfigParser.$(OBJEXT) debug.$(OBJEXT) $(am__objects_5) \ @@ -568,7 +568,7 @@ am__tests_testUfs_SOURCES_DIST = tests/testUfs.cc tests/testMain.cc \ am_tests_testUfs_OBJECTS = tests/testUfs.$(OBJEXT) \ tests/testMain.$(OBJEXT) $(am__objects_27) tests_testUfs_OBJECTS = $(am_tests_testUfs_OBJECTS) -am__ufsdump_SOURCES_DIST = ClientBody.cc ConfigParser.cc debug.cc \ +am__ufsdump_SOURCES_DIST = BodyReader.cc ConfigParser.cc debug.cc \ int.cc ufsdump.cc store.cc StoreFileSystem.cc StoreMeta.cc \ StoreMeta.h StoreMetaMD5.cc StoreMetaMD5.h StoreMetaSTD.cc \ StoreMetaSTD.h StoreMetaUnpacker.cc StoreMetaUnpacker.h \ @@ -620,7 +620,7 @@ am__ufsdump_SOURCES_DIST = ClientBody.cc ConfigParser.cc debug.cc \ store_swapmeta.cc store_swapout.cc structs.h SwapDir.cc \ tools.cc typedefs.h unlinkd.cc url.cc urn.cc useragent.cc \ wais.cc wccp.cc whois.cc wordlist.cc win32.cc -am_ufsdump_OBJECTS = ClientBody.$(OBJEXT) ConfigParser.$(OBJEXT) \ +am_ufsdump_OBJECTS = BodyReader.$(OBJEXT) ConfigParser.$(OBJEXT) \ debug.$(OBJEXT) int.$(OBJEXT) ufsdump.$(OBJEXT) \ store.$(OBJEXT) StoreFileSystem.$(OBJEXT) StoreMeta.$(OBJEXT) \ StoreMetaMD5.$(OBJEXT) StoreMetaSTD.$(OBJEXT) \ @@ -1215,8 +1215,8 @@ squid_SOURCES = \ client_side_reply.h \ client_side_request.cc \ client_side_request.h \ - ClientBody.cc \ - ClientBody.h \ + BodyReader.cc \ + BodyReader.h \ ClientRequestContext.h \ clientStream.cc \ clientStream.h \ @@ -1449,7 +1449,7 @@ pinger_SOURCES = \ dnsserver_SOURCES = dnsserver.cc SquidNew.cc recv_announce_SOURCES = recv-announce.cc SquidNew.cc ufsdump_SOURCES = \ - ClientBody.cc \ + BodyReader.cc \ ConfigParser.cc \ debug.cc \ int.cc \ @@ -2566,7 +2566,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AuthUser.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AuthUserRequest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CacheDigest.Po@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ClientBody.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BodyReader.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConfigOption.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConfigParser.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DelayBucket.Po@am__quote@ diff --git a/src/client_side.cc b/src/client_side.cc index 34c972c4ea..41b7c1f952 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -1,6 +1,6 @@ /* - * $Id: client_side.cc,v 1.718 2006/04/22 05:29:19 robertc Exp $ + * $Id: client_side.cc,v 1.719 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 33 Client-side Routines * AUTHOR: Duane Wessels @@ -73,7 +73,6 @@ #include "client_side_reply.h" #include "ClientRequestContext.h" #include "MemBuf.h" -#include "ClientBody.h" #if LINGERING_CLOSE #define comm_close comm_lingering_close @@ -135,6 +134,8 @@ static ClientSocketContext *parseHttpRequest(ConnStateData::Pointer &, method_t #if USE_IDENT static IDCB clientIdentDone; #endif +static BodyReadFunc clientReadBody; +static BodyAbortFunc clientAbortBody; static CSCB clientSocketRecipient; static CSD clientSocketDetach; static void clientSetKeepaliveFlag(ClientHttpRequest *); @@ -163,7 +164,7 @@ static void trimTrailingSpaces(char *aString, size_t len); static ClientSocketContext *parseURIandHTTPVersion(char **url_p, HttpVersion * http_ver_p, ConnStateData::Pointer& conn, char *http_version_str); static int connReadWasError(ConnStateData::Pointer& conn, comm_err_t, int size, int xerrno); static int connFinishedWithConn(ConnStateData::Pointer& conn, int size); -static void connNoteUseOfBuffer(ConnStateData::Pointer & conn, size_t byteCount); +static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount); static int connKeepReadingIncompleteRequest(ConnStateData::Pointer & conn); static void connCancelIncompleteRequests(ConnStateData::Pointer & conn); @@ -614,18 +615,6 @@ ConnStateData::close() auth_user_request,this); auth_user_request->onConnectionClose(this); } - - /* - * This is awkward: body has a RefCount::Pointer to this. We must - * destroy body so that our own reference count will go to zero. - * Furthermore, there currently exists a potential loop because - * ~ConnStateData() will delete body if it is not NULL. - */ - if (body) { - ClientBody *tmp = body; - body = NULL; - delete tmp; - } } bool @@ -649,8 +638,7 @@ ConnStateData::~ConnStateData() cbdataReferenceDone(port); - if (body) - delete body; + body_reader = NULL; // refcounted } /* @@ -1523,22 +1511,31 @@ ClientSocketContext::doClose() void ClientSocketContext::initiateClose() { - if (!http || !http->getConn().getRaw()) { - doClose(); - return; - } - - if (http->getConn()->body_size_left > 0) { - debug(33, 5) ("ClientSocketContext::initiateClose: closing, but first we need to read the rest of the request\n"); - /* XXX We assumes the reply does fit in the TCP transmit window. - * If not the connection may stall while sending the reply - * (before reaching here) if the client does not try to read the - * response while sending the request body. As of yet we have - * not received any complaints indicating this may be an issue. + if (http != NULL) { + ConnStateData::Pointer conn = http->getConn(); + + if (conn != NULL) { + if (conn->bodySizeLeft() > 0) { + debug(33, 5) ("ClientSocketContext::initiateClose: closing, but first we need to read the rest of the request\n"); + /* + * XXX We assume the reply fits in the TCP transmit + * window. If not the connection may stall while sending + * the reply (before reaching here) if the client does not + * try to read the response while sending the request body. + * As of yet we have not received any complaints indicating + * this may be an issue. */ - http->getConn()->closing(true); - clientAbortBody(http->request); - return; + conn->closing(true); + /* + * Trigger the BodyReader abort handler, if necessary, + * by destroying it. It is a refcounted pointer, so + * set it to NULL and let the destructor be called when + * all references are gone. + */ + http->request->body_reader = NULL; // refcounted + return; + } + } } doClose(); @@ -2119,7 +2116,7 @@ connFinishedWithConn(ConnStateData::Pointer & conn, int size) } void -connNoteUseOfBuffer(ConnStateData::Pointer & conn, size_t byteCount) +connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) { assert(byteCount > 0 && byteCount <= conn->in.notYetUsed); conn->in.notYetUsed -= byteCount; @@ -2171,13 +2168,14 @@ clientMaybeReadData(ConnStateData::Pointer &conn, int do_next_read) static void clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_read) { - fde *F = &fd_table[fd]; - - /* Check if a half-closed connection was aborted in the middle */ + /* + * If (1) we are reading a message body, (2) and the connection + * is half-closed, and (3) we didn't get the entire HTTP request + * yet, then close this connection. + */ - if (F->flags.socket_eof) { - if (conn->in.notYetUsed != conn->body_size_left) { - /* != 0 when no request body */ + if (fd_table[fd].flags.socket_eof) { + if ((ssize_t) conn->in.notYetUsed < conn->bodySizeLeft()) { /* Partial request received. Abort client connection! */ debug(33, 3) ("clientAfterReadingRequests: FD %d aborted, partial request\n", fd); comm_close(fd); @@ -2195,7 +2193,7 @@ clientProcessRequest(ConnStateData::Pointer &conn, ClientSocketContext *context, HttpRequest *request = NULL; /* We have an initial client stream in place should it be needed */ /* setup our private context */ - connNoteUseOfBuffer(conn, http->req_sz); + connNoteUseOfBuffer(conn.getRaw(), http->req_sz); context->registerWithConn(); @@ -2304,9 +2302,17 @@ clientProcessRequest(ConnStateData::Pointer &conn, ClientSocketContext *context, /* Do we expect a request-body? */ if (request->content_length > 0) { - conn->body = new ClientBody(conn, request); - conn->body_size_left = request->content_length; - request->body_connection = conn; + request->body_reader = new BodyReader(request->content_length, + clientReadBody, + clientAbortBody, + NULL, + conn.getRaw()); + conn->body_reader = request->body_reader; + request->body_reader->notify(conn->in.notYetUsed); + + if (request->body_reader->remaining()) + conn->readSomeData(); + /* Is it too large? */ if (!clientIsRequestBodyValid(request->content_length) || @@ -2359,6 +2365,21 @@ connOkToAddRequest(ConnStateData::Pointer &conn) return result; } +/* + * bodySizeLeft + * + * Report on the number of bytes of body content that we + * know are yet to be read on this connection. + */ +ssize_t +ConnStateData::bodySizeLeft() +{ + if (body_reader != NULL) + return body_reader->remaining(); + + return 0; +} + /* * Attempt to parse one or more requests from the input buffer. * If a request is successfully parsed, even if the next request @@ -2377,7 +2398,7 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read) debug(33, 5) ("clientParseRequest: FD %d: attempting to parse\n", conn->fd); - while (conn->in.notYetUsed > 0 && conn->body_size_left == 0) { + while (conn->in.notYetUsed > 0 && conn->bodySizeLeft() == 0) { size_t req_line_sz; connStripBufferWhitespace (conn); @@ -2416,7 +2437,7 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read) parsed_req = true; if (context->mayUseConnection()) { - debug (33, 3) ("clientReadRequest: Not reading, as this request may need the connection\n"); + debug (33, 3) ("clientParseRequest: Not reading, as this request may need the connection\n"); do_next_read = 0; break; } @@ -2426,9 +2447,9 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read) break; } - continue; /* while offset > 0 && body_size_left == 0 */ + continue; /* while offset > 0 && conn->bodySizeLeft() == 0 */ } - } /* while offset > 0 && conn->body_size_left == 0 */ + } /* while offset > 0 && conn->bodySizeLeft() == 0 */ return parsed_req; } @@ -2437,6 +2458,7 @@ static void clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { + debugs(33,5,HERE << "clientReadRequest FD " << fd << " size " << size); ConnStateData::Pointer conn ((ConnStateData *)data); conn->reading(false); bool do_next_read = 1; /* the default _is_ to read data! - adrian */ @@ -2463,6 +2485,15 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, if (flag == COMM_OK) { if (size > 0) { + /* + * If this assertion fails, we need to handle the case + * where a persistent connection has some leftover body + * request data from a previous, aborted transaction. + * Probably just decrement size and adjust/memmove the + * 'buf' pointer accordingly. + */ + assert(conn->in.abortedSize == 0); + char *current_buf = conn->in.addressToReadInto(); kb_incr(&statCounter.client_http.kbytes_in, size); @@ -2472,6 +2503,9 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, conn->in.notYetUsed += size; conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ + + if (conn->body_reader != NULL) + conn->body_reader->notify(conn->in.notYetUsed); } else if (size == 0) { debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); @@ -2497,12 +2531,6 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, } } - - /* Process request body if any */ - if (conn->in.notYetUsed > 0 && conn->body && conn->body->hasCallback()) { - conn->body->process(); - } - /* Process next request */ if (conn->getConcurrentRequestCount() == 0) fd_note(conn->fd, "Reading next request"); @@ -2525,64 +2553,66 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, } } -/* file_read like function, for reading body content */ -void -clientReadBody(HttpRequest * request, char *buf, size_t size, CBCB * callback, - void *cbdata) +/* + * clientReadBody + * + * A request to receive some HTTP request body data. This is a + * 'read_func' of BodyReader class. Feels to me like this function + * belongs to ConnStateData class. + * + * clientReadBody is of type 'BodyReadFunc' + */ +size_t +clientReadBody(void *data, MemBuf &mb, size_t size) { - ConnStateData::Pointer conn = request->body_connection; + ConnStateData *conn = (ConnStateData *) data; + assert(conn); + debugs(32,3,HERE << "clientReadBody requested size " << size); + debugs(32,3,HERE << "clientReadBody FD " << conn->fd); + debugs(32,3,HERE << "clientReadBody in.notYetUsed " << conn->in.notYetUsed); - if (conn.getRaw() == NULL) { - debug(33, 5) ("clientReadBody: no body to read, request=%p\n", request); - callback(buf, 0, cbdata); /* Signal end of body */ - return; - } + if (size > conn->in.notYetUsed) + size = conn->in.notYetUsed; + + debugs(32,3,HERE << "clientReadBody actual size " << size); + + assert(size); + + mb.append(conn->in.buf, size); - debug(33, 2) ("clientReadBody: start FD %d body_size=%lu in.notYetUsed=%ld cb=%p req=%p\n", - conn->fd, (unsigned long int) conn->body_size_left, - (unsigned long int) conn->in.notYetUsed, callback, request); - conn->body->init(buf, size, callback, cbdata); - conn->body->process(); + connNoteUseOfBuffer(conn, size); + + return size; } -/* A dummy handler that throws away a request-body */ +/* + * clientAbortBody + * + * A dummy callback that consumes the remains of a request + * body for an aborted transaction. + * + * clientAbortBody is of type 'BodyAbortFunc' + */ static void -clientReadBodyAbortHandler(char *buf, ssize_t size, void *data) +clientAbortBody(void *data, size_t remaining) { - static char bodyAbortBuf[SQUID_TCP_SO_RCVBUF]; ConnStateData *conn = (ConnStateData *) data; - debug(33, 2) ("clientReadBodyAbortHandler: FD %d body_size=%lu in.notYetUsed=%lu\n", - conn->fd, (unsigned long int) conn->body_size_left, - (unsigned long) conn->in.notYetUsed); + debugs(32,3,HERE << "clientAbortBody FD " << conn->fd); + debugs(32,3,HERE << "clientAbortBody in.notYetUsed " << conn->in.notYetUsed); + debugs(32,3,HERE << "clientAbortBody remaining " << remaining); + conn->in.abortedSize += remaining; - if (size != 0 && conn->body_size_left != 0) { - debug(33, 3) ("clientReadBodyAbortHandler: FD %d shedule next read\n", - conn->fd); - conn->body->init(bodyAbortBuf, sizeof(bodyAbortBuf), clientReadBodyAbortHandler, data); + if (conn->in.notYetUsed) { + size_t to_discard = XMIN(conn->in.notYetUsed, conn->in.abortedSize); + debugs(32,3,HERE << "to_discard " << to_discard); + conn->in.abortedSize -= to_discard; + connNoteUseOfBuffer(conn, to_discard); } if (conn->closing()) comm_close(conn->fd); } -/* Abort a body request */ -int -clientAbortBody(HttpRequest * request) -{ - ConnStateData::Pointer conn = request->body_connection; - - if (conn.getRaw() == NULL || conn->body_size_left <= 0 || conn->body->getRequest() != request) - return 0; /* No body to abort */ - - if (conn->body->hasCallback()) - conn->body->negativeCallback(); - - clientReadBodyAbortHandler(NULL, -1, conn.getRaw()); /* Install abort handler */ - - /* ClientBody::process() */ - return 1; /* Aborted */ -} - /* general lifetime handler for HTTP requests */ static void requestTimeout(int fd, void *data) @@ -3192,7 +3222,7 @@ ConnStateData::operator delete (void *address) cbdataFree(t); } -ConnStateData::ConnStateData() : body_size_left(0), transparent_ (false), reading_ (false), closing_ (false) +ConnStateData::ConnStateData() : transparent_ (false), reading_ (false), closing_ (false) { openReference = this; } diff --git a/src/client_side.h b/src/client_side.h index 1979e765b4..11316e3021 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -1,6 +1,6 @@ /* - * $Id: client_side.h,v 1.14 2006/03/04 05:38:34 wessels Exp $ + * $Id: client_side.h,v 1.15 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -35,6 +35,7 @@ #define SQUID_CLIENTSIDE_H #include "StoreIOBuffer.h" +#include "BodyReader.h" #include "RefCount.h" class ConnStateData; @@ -43,9 +44,6 @@ class ClientHttpRequest; class clientStreamNode; -class ClientBody; -; - template class Range; @@ -159,19 +157,37 @@ public: char *buf; size_t notYetUsed; size_t allocatedSize; + /* + * abortedSize is the amount of data that should be read + * from the socket and immediately discarded. It may be + * set when there is a request body and that transaction + * gets aborted. The client side should read the remaining + * body content and just discard it, if the connection + * will be staying open. + */ + size_t abortedSize; } in; - ClientBody *body; - size_t body_size_left; - - auth_type_t auth_type; /* Is this connection based authentication? if so what type it is. */ - /* note this is ONLY connection based because NTLM is against HTTP spec */ - /* the user details for connection based authentication */ + BodyReader::Pointer body_reader; + ssize_t bodySizeLeft(); + + /* + * Is this connection based authentication? if so what type it + * is. + */ + auth_type_t auth_type; + /* + * note this is ONLY connection based because NTLM is against HTTP spec. + * the user details for connection based authentication + */ auth_user_request_t *auth_user_request; - /* TODO: generalise the connection owner concept */ - ClientSocketContext::Pointer currentobject; /* used by the owner of the connection. Opaque otherwise */ + /* + * used by the owner of the connection, opaque otherwise + * TODO: generalise the connection owner concept. + */ + ClientSocketContext::Pointer currentobject; struct sockaddr_in peer; diff --git a/src/client_side_request.cc b/src/client_side_request.cc index 813855fdc8..09e094183a 100644 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@ -1,6 +1,6 @@ /* - * $Id: client_side_request.cc,v 1.61 2006/04/23 11:10:31 robertc Exp $ + * $Id: client_side_request.cc,v 1.62 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 85 Client-side Request Routines * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c) @@ -238,9 +238,9 @@ ClientHttpRequest::~ClientHttpRequest() * found the end of the body yet */ - if (request && request->body_connection.getRaw() != NULL) { - clientAbortBody(request); /* abort body transter */ - request->body_connection = NULL; + if (request && request->body_reader != NULL) { + request->body_reader = NULL; // refcounted + debugs(32,3,HERE << "setting body_reader = NULL for request " << request); } /* the ICP check here was erroneous @@ -870,9 +870,10 @@ ClientRequestContext::clientRedirectDone(char *result) ; } - if (old_request->body_connection.getRaw() != NULL) { - new_request->body_connection = old_request->body_connection; - old_request->body_connection = NULL; + if (old_request->body_reader != NULL) { + new_request->body_reader = old_request->body_reader; + old_request->body_reader = NULL; + debugs(0,0,HERE << "setting body_reader = NULL for request " << old_request); } new_request->content_length = old_request->content_length; @@ -1114,17 +1115,116 @@ ClientHttpRequest::doIcap(ICAPServiceRep::Pointer service) icap = new ICAPClientReqmodPrecache(service); (void) cbdataReference(icap); icap->startReqMod(this, request); - icap->doneSending(); + + if (request->body_reader == NULL) { + debugs(32,3,HERE << "client request hasnt body..."); + icap->doneSending(); + + } + return 0; } +/* + * icapSendRequestBodyWrapper + * + * A callback wrapper for ::icapSendRequestBody() + * + * icapSendRequestBodyWrapper is of type CBCB + */ +void +ClientHttpRequest::icapSendRequestBodyWrapper(MemBuf &mb, void *data) +{ + ClientHttpRequest *chr = static_cast(data); + chr->icapSendRequestBody(mb); +} + + +/* + * icapSendRequestBody + * + * Sends some chunk of a request body to the ICAP side. Must make sure + * that the ICAP-side can accept the data we have. If there is more + * body data to read, then schedule another BodyReader callback. + */ +void +ClientHttpRequest::icapSendRequestBody(MemBuf &mb) +{ + ssize_t size_to_send = mb.contentSize(); + debugs(32,3,HERE << "have " << mb.contentSize() << " bytes in mb"); + + if (size_to_send == 0) { + /* + * An error occurred during this transaction. Tell ICAP that we're done. + */ + + if (icap) + icap->doneSending(); + + return; + } + + debugs(32,3,HERE << "icap->potentialSpaceSize() = " << icap->potentialSpaceSize()); + + if (size_to_send > icap->potentialSpaceSize()) + size_to_send = icap->potentialSpaceSize(); + + if (size_to_send) { + debugs(32,3,HERE << "sending " << size_to_send << " body bytes to ICAP"); + StoreIOBuffer sbuf(size_to_send, 0, mb.content()); + icap->sendMoreData(sbuf); + icap->body_reader->consume(size_to_send); + icap->body_reader->bytes_read += size_to_send; + debugs(32,3," HTTP client body bytes_read=" << icap->body_reader->bytes_read); + } else { + debugs(32,0,HERE << "cannot send body data to ICAP"); + debugs(32,0,HERE << "\tBodyReader MemBuf has " << mb.contentSize()); + debugs(32,0,HERE << "\tbut icap->potentialSpaceSize() is " << icap->potentialSpaceSize()); + return; + } + + /* + * If we sent some data this time, and there is more data to + * read, then schedule another read request via BodyReader. + */ + if (size_to_send && icap->body_reader->remaining()) { + debugs(32,3,HERE << "calling body_reader->read()"); + icap->body_reader->read(icapSendRequestBodyWrapper, this); + } else { + debugs(32,3,HERE << "No more request body bytes to send"); + icap->doneSending(); + } +} + /* * Called by ICAPAnchor when it has space available for us. */ void ClientHttpRequest::icapSpaceAvailable() { - debug(85,3)("ClientHttpRequest::icapSpaceAvailable() called\n"); + debugs(85,3,HERE << this << " ClientHttpRequest::icapSpaceAvailable() called\n"); + + if (request->body_reader != NULL && icap->body_reader == NULL) { + debugs(32,3,HERE << "reassigning HttpRequest->body_reader to ICAP"); + /* + * ICAP hooks on to the BodyReader that gets data from + * ConnStateData. We'll make a new BodyReader that + * HttpStateData can use if the adapted response has a + * request body. See ICAPClientReqmodPrecache::noteSourceStart() + */ + icap->body_reader = request->body_reader; + request->body_reader = NULL; + } + + if (icap->body_reader == NULL) + return; + + if (icap->body_reader->callbackPending()) + return; + + debugs(32,3,HERE << "Calling read() for body data"); + + icap->body_reader->read(icapSendRequestBodyWrapper, this); } void @@ -1139,8 +1239,6 @@ ClientHttpRequest::takeAdaptedHeaders(HttpMsg *msg) * Move the "body_connection" over, then unlink old and * link new to the http state. */ - new_req->body_connection = request->body_connection; - request->body_connection = NULL; HTTPMSGUNLOCK(request); request = HTTPMSGLOCK(new_req); /* @@ -1195,12 +1293,18 @@ ClientHttpRequest::abortAdapting() if ((NULL == storeEntry()) || storeEntry()->isEmpty()) { debug(85,3)("WARNING: ICAP REQMOD callout failed, proceeding with original request\n"); - doCallouts(); + + if (calloutContext) + doCallouts(); + #if ICAP_HARD_ERROR clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; + clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); + assert (repContext); + // Note if this code is ever used, clientBuildError() should be modified to // accept an errno arg repContext->setReplyToError(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, @@ -1209,8 +1313,11 @@ ClientHttpRequest::abortAdapting() NULL, getConn().getRaw() != NULL && getConn()->auth_user_request ? getConn()-> auth_user_request : request->auth_user_request, errno); + node = (clientStreamNode *)client_stream.tail->data; + clientStreamRead(node, this, node->readBuffer); + #endif return; diff --git a/src/client_side_request.h b/src/client_side_request.h index 6b60321bea..535f634147 100644 --- a/src/client_side_request.h +++ b/src/client_side_request.h @@ -1,6 +1,6 @@ /* - * $Id: client_side_request.h,v 1.25 2006/04/25 07:13:33 robertc Exp $ + * $Id: client_side_request.h,v 1.26 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -160,6 +160,8 @@ private: public: ICAPClientReqmodPrecache *icap; int doIcap(ICAPServiceRep::Pointer); + void icapSendRequestBody(MemBuf&); + static void icapSendRequestBodyWrapper(MemBuf&, void*); void icapSpaceAvailable(); void takeAdaptedHeaders(HttpMsg *); void takeAdaptedBody(MemBuf *); diff --git a/src/forward.cc b/src/forward.cc index 404de983b3..598f2259c6 100644 --- a/src/forward.cc +++ b/src/forward.cc @@ -1,6 +1,6 @@ /* - * $Id: forward.cc,v 1.137 2006/04/02 14:32:35 serassio Exp $ + * $Id: forward.cc,v 1.138 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 17 Request Forwarding * AUTHOR: Duane Wessels @@ -386,7 +386,7 @@ FwdState::checkRetriable() * even if the method is indempotent */ - if (request->body_connection.getRaw() != NULL) + if (request->body_reader != NULL) return false; /* RFC2616 9.1 Safe and Idempotent Methods */ diff --git a/src/ftp.cc b/src/ftp.cc index b2189077a0..b54d302ef6 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -1,6 +1,6 @@ /* - * $Id: ftp.cc,v 1.391 2006/04/23 11:10:31 robertc Exp $ + * $Id: ftp.cc,v 1.392 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 9 File Transfer Protocol (FTP) * AUTHOR: Harvest Derived @@ -211,7 +211,7 @@ public: static IOCB ftpReadControlReply; static IOWCB ftpWriteCommandCallback; static HttpReply *ftpAuthRequired(HttpRequest * request, const char *realm); - static void ftpRequestBody(char *buf, ssize_t size, void *data); + static CBCB ftpRequestBody; static wordlist *ftpParseControlReply(char *, size_t, int *, int *); #if ICAP_CLIENT @@ -2769,19 +2769,19 @@ ftpReadTransferDone(FtpStateData * ftpState) /* This will be called when there is data available to put */ void -FtpStateData::ftpRequestBody(char *buf, ssize_t size, void *data) +FtpStateData::ftpRequestBody(MemBuf &mb, void *data) { FtpStateData *ftpState = (FtpStateData *) data; - debug(9, 3) ("ftpRequestBody: buf=%p size=%d ftpState=%p\n", buf, (int) size, data); + debugs(9, 3, HERE << "ftpRequestBody: size=" << mb.contentSize() << " ftpState=%p" << data); - if (size > 0) { + if (mb.contentSize() > 0) { /* DataWrite */ - comm_write(ftpState->data.fd, buf, size, FtpStateData::ftpDataWriteCallback, ftpState); - } else if (size < 0) { + comm_write(ftpState->data.fd, mb.content(), mb.contentSize(), FtpStateData::ftpDataWriteCallback, ftpState); + } else if (mb.contentSize() < 0) { /* Error */ debug(9, 1) ("ftpRequestBody: request aborted"); ftpState->failed(ERR_READ_ERROR, 0); - } else if (size == 0) { + } else if (mb.contentSize() == 0) { /* End of transfer */ ftpState->dataComplete(); } @@ -2815,11 +2815,7 @@ FtpStateData::ftpDataWrite(int ftp, void *data) FtpStateData *ftpState = (FtpStateData *) data; debug(9, 3) ("ftpDataWrite\n"); /* This starts the body transfer */ - clientReadBody(ftpState->request, - ftpState->data.readBuf->content(), - ftpState->data.readBuf->contentSize(), - ftpRequestBody, - ftpState); + ftpState->request->body_reader->read(ftpRequestBody, ftpState); } static void diff --git a/src/http.cc b/src/http.cc index 27f6647a31..7322f3a164 100644 --- a/src/http.cc +++ b/src/http.cc @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.492 2006/04/22 09:02:44 robertc Exp $ + * $Id: http.cc,v 1.493 2006/04/27 19:27:37 wessels Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -136,18 +136,12 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdStat HttpStateData::~HttpStateData() { /* - * don't forget about ~ServerStateData() + * don't forget that ~ServerStateData() gets called automatically */ - if (request_body_buf) { - if (orig_request->body_connection != NULL) { - clientAbortBody(orig_request); - } - - if (request_body_buf) { - memFree(request_body_buf, MEM_8K_BUF); - request_body_buf = NULL; - } + if (orig_request->body_reader != NULL) { + orig_request->body_reader = NULL; + debugs(32,3,HERE << "setting body_reader = NULL for request " << orig_request); } if (!readBuf->isNull()) @@ -1760,7 +1754,9 @@ HttpStateData::sendRequest() flags.do_next_read = 1; maybeReadData(); - if (orig_request->body_connection != NULL) + debugs(32,3,HERE<< "request " << request << " body_reader = " << orig_request->body_reader.getRaw()); + + if (orig_request->body_reader != NULL) sendHeaderDone = HttpStateData::SendRequestEntityWrapper; else sendHeaderDone = HttpStateData::SendComplete; @@ -1850,25 +1846,28 @@ HttpStateData::sendRequestEntityDone() } } +/* + * RequestBodyHandlerWrapper + * + * BodyReader calls this when it has some body data for us. + * It is of type CBCB. + */ void -HttpStateData::RequestBodyHandlerWrapper(char *buf, ssize_t size, void *data) +HttpStateData::RequestBodyHandlerWrapper(MemBuf &mb, void *data) { HttpStateData *httpState = static_cast(data); - httpState->requestBodyHandler(buf, size); + httpState->requestBodyHandler(mb); } void -HttpStateData::requestBodyHandler(char *buf, ssize_t size) +HttpStateData::requestBodyHandler(MemBuf &mb) { - request_body_buf = NULL; - if (eof || fd < 0) { debugs(11, 1, HERE << "Transaction aborted while reading HTTP body"); - memFree8K(buf); return; } - if (size > 0) { + if (mb.contentSize() > 0) { if (flags.headers_parsed && !flags.abuse_detected) { flags.abuse_detected = 1; debug(11, 1) ("httpSendRequestEntryDone: Likely proxy abuse detected '%s' -> '%s'\n", @@ -1876,21 +1875,27 @@ HttpStateData::requestBodyHandler(char *buf, ssize_t size) storeUrl(entry)); if (getReply()->sline.status == HTTP_INVALID_HEADER) { - memFree8K(buf); comm_close(fd); return; } } - comm_old_write(fd, buf, size, SendRequestEntityWrapper, this, memFree8K); - } else if (size == 0) { + /* + * mb's content will be consumed in the SendRequestEntityWrapper + * callback after comm_write is done. + */ + flags.consume_body_data = 1; + + comm_old_write(fd, mb.content(), mb.contentSize(), SendRequestEntityWrapper, this, NULL); + } else if (orig_request->body_reader == NULL) { + /* Failed to get whole body, probably aborted */ + SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this); + } else if (orig_request->body_reader->remaining() == 0) { /* End of body */ - memFree8K(buf); sendRequestEntityDone(); } else { /* Failed to get whole body, probably aborted */ - memFree8K(buf); - HttpStateData::SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this); + SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, this); } } @@ -1906,11 +1911,19 @@ HttpStateData::sendRequestEntity(int fd, size_t size, comm_err_t errflag) { debug(11, 5) ("httpSendRequestEntity: FD %d: size %d: errflag %d.\n", fd, (int) size, errflag); + debugs(32,3,HERE << "httpSendRequestEntity called"); + assert(orig_request->body_reader != NULL); if (size > 0) { fd_bytes(fd, size, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, size); kb_incr(&statCounter.server.http.kbytes_out, size); + + if (flags.consume_body_data) { + orig_request->body_reader->consume(size); + orig_request->body_reader->bytes_read += size; + debugs(32,3," HTTP server body bytes_read=" << orig_request->body_reader->bytes_read); + } } if (errflag == COMM_ERR_CLOSING) @@ -1930,8 +1943,16 @@ HttpStateData::sendRequestEntity(int fd, size_t size, comm_err_t errflag) return; } - request_body_buf = (char *)memAllocate(MEM_8K_BUF); - clientReadBody(orig_request, request_body_buf, 8192, RequestBodyHandlerWrapper, this); + size_t r = orig_request->body_reader->remaining(); + debugs(32,3,HERE << "body remaining = " << r); + + if (r) { + debugs(32,3,HERE << "reading more body data"); + orig_request->body_reader->read(RequestBodyHandlerWrapper, this); + } else { + debugs(32,3,HERE << "done reading body data"); + sendRequestEntityDone(); + } } void diff --git a/src/http.h b/src/http.h index 437a62d2a1..7953eaa2be 100644 --- a/src/http.h +++ b/src/http.h @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.23 2006/03/07 18:47:48 wessels Exp $ + * $Id: http.h,v 1.24 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -38,6 +38,7 @@ #include "comm.h" #include "forward.h" #include "Server.h" +#include "BodyReader.h" #if ICAP_CLIENT #include "ICAP/ICAPServiceRep.h" @@ -84,7 +85,6 @@ public: HttpRequest *orig_request; int fd; http_state_flags flags; - char *request_body_buf; off_t currentOffset; size_t read_sz; int body_bytes_read; /* to find end of response, independent of StoreEntry */ @@ -125,7 +125,7 @@ private: void transactionComplete(); void writeReplyBody(const char *data, int len); void sendRequestEntityDone(); - void requestBodyHandler(char *buf, ssize_t size); + void requestBodyHandler(MemBuf &); void sendRequestEntity(int fd, size_t size, comm_err_t errflag); mb_size_t buildRequestPrefix(HttpRequest * request, HttpRequest * orig_request, diff --git a/src/protos.h b/src/protos.h index 0669db39dc..ab61ef8ee4 100644 --- a/src/protos.h +++ b/src/protos.h @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.522 2006/04/23 11:10:32 robertc Exp $ + * $Id: protos.h,v 1.523 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -105,8 +105,6 @@ SQUIDCEXTERN void clientdbFreeMemory(void); SQUIDCEXTERN int clientdbEstablished(struct IN_ADDR, int); SQUIDCEXTERN void clientOpenListenSockets(void); SQUIDCEXTERN void clientHttpConnectionsClose(void); -SQUIDCEXTERN void clientReadBody(HttpRequest * req, char *buf, size_t size, CBCB * callback, void *data); -SQUIDCEXTERN int clientAbortBody(HttpRequest * req); SQUIDCEXTERN void httpRequestFree(void *); extern void clientAccessCheck(void *); diff --git a/src/structs.h b/src/structs.h index d11d4ef1f9..7a398eee7f 100644 --- a/src/structs.h +++ b/src/structs.h @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.539 2006/04/23 11:10:32 robertc Exp $ + * $Id: structs.h,v 1.540 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -931,6 +931,9 @@ unsigned int request_sent: unsigned int do_next_read: 1; + +unsigned int consume_body_data: + 1; }; struct _ipcache_addrs diff --git a/src/typedefs.h b/src/typedefs.h index 405e6bd870..886345e70c 100644 --- a/src/typedefs.h +++ b/src/typedefs.h @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.181 2006/04/23 11:10:32 robertc Exp $ + * $Id: typedefs.h,v 1.182 2006/04/27 19:27:37 wessels Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -275,7 +275,6 @@ class wordlist; typedef void UH(void *data, wordlist *); typedef int READ_HANDLER(int, char *, int); typedef int WRITE_HANDLER(int, const char *, int); -typedef void CBCB(char *buf, ssize_t size, void *data); typedef void STIOCB(void *their_data, int errflag, storeIOState *); typedef void STFNCB(void *their_data, int errflag, storeIOState *);