From: rousskov <> Date: Wed, 13 Feb 2008 06:55:26 +0000 (+0000) Subject: Merging async-call branch changes to HEAD: X-Git-Tag: BASIC_TPROXY4~96 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=dc56a9b1420c7f77b90b5abf111d6cb98cd0d7a1;p=thirdparty%2Fsquid.git Merging async-call branch changes to HEAD: Async-call work replaces event-based asynchronous calls with stand-alone implementation. The common async call API allows Squid core do call, debug, and troubleshoot all callback handlers in a uniform way. An async "job" API is introduced to manage independent logical threads or work such as protocol transaction handlers on client, server, and ICAP sides. These jobs should communicate with each other using async calls to minimize dependencies and avoid reentrant callback loops. These changes will eventually improve overall code quality, debugging quality, and Squid robustness. Below you will find log messages from the async-call branch that are relevant to the file(s) being committed. Convert the comm_* calls to use CommCalls. Use the AsyncJob::deleteThis method as "delete this" replacement instead of the previously commited block "if (inCall) musStop(...) else delete this" ICAPInitiate::sendAnswer dialers take care of message locking now. --- diff --git a/src/Server.cc b/src/Server.cc index 8179112bc3..a65238553b 100644 --- a/src/Server.cc +++ b/src/Server.cc @@ -1,5 +1,5 @@ /* - * $Id: Server.cc,v 1.24 2008/02/08 18:30:18 rousskov Exp $ + * $Id: Server.cc,v 1.25 2008/02/12 23:55:26 rousskov Exp $ * * DEBUG: * AUTHOR: Duane Wessels @@ -45,7 +45,7 @@ extern ICAPConfig TheICAPConfig; #endif -ServerStateData::ServerStateData(FwdState *theFwdState): requestSender(NULL) +ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),requestSender(NULL) #if ICAP_CLIENT , icapAccessCheckPending(false) #endif @@ -178,7 +178,8 @@ void ServerStateData::quitIfAllDone() { } debugs(11,3, HERE << "transaction done"); - delete this; + + deleteThis("ServerStateData::quitIfAllDone"); } // FTP side overloads this to work around multiple calls to fwd->complete @@ -221,10 +222,10 @@ ServerStateData::abortOnBadEntry(const char *abortReason) // more request or adapted response body is available void -ServerStateData::noteMoreBodyDataAvailable(BodyPipe &bp) +ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp) { #if ICAP_CLIENT - if (adaptedBodySource == &bp) { + if (adaptedBodySource == bp) { handleMoreAdaptedBodyAvailable(); return; } @@ -234,10 +235,10 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe &bp) // the entire request or adapted response body was provided, successfully void -ServerStateData::noteBodyProductionEnded(BodyPipe &bp) +ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp) { #if ICAP_CLIENT - if (adaptedBodySource == &bp) { + if (adaptedBodySource == bp) { handleAdaptedBodyProductionEnded(); return; } @@ -247,10 +248,10 @@ ServerStateData::noteBodyProductionEnded(BodyPipe &bp) // premature end of the request or adapted response body production void -ServerStateData::noteBodyProducerAborted(BodyPipe &bp) +ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp) { #if ICAP_CLIENT - if (adaptedBodySource == &bp) { + if (adaptedBodySource == bp) { handleAdaptedBodyProducerAborted(); return; } @@ -302,29 +303,22 @@ ServerStateData::handleRequestBodyProducerAborted() // kids extend this } -void -ServerStateData::sentRequestBodyWrapper(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data) -{ - ServerStateData *server = static_cast(data); - server->sentRequestBody(fd, size, errflag); -} - // called when we wrote request headers(!) or a part of the body void -ServerStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) +ServerStateData::sentRequestBody(const CommIoCbParams &io) { - debugs(11, 5, "sentRequestBody: FD " << fd << ": size " << size << ": errflag " << errflag << "."); + debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << "."); debugs(32,3,HERE << "sentRequestBody called"); requestSender = NULL; - if (size > 0) { - fd_bytes(fd, size, FD_WRITE); - kb_incr(&statCounter.server.all.kbytes_out, size); + if (io.size > 0) { + fd_bytes(io.fd, io.size, FD_WRITE); + kb_incr(&statCounter.server.all.kbytes_out, io.size); // kids should increment their counters } - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; if (!requestBodySource) { @@ -332,8 +326,8 @@ ServerStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) return; // do nothing; } - if (errflag) { - debugs(11, 1, "sentRequestBody error: FD " << fd << ": " << xstrerr(errno)); + if (io.flag) { + debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(errno)); ErrorState *err; err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); err->xerrno = errno; @@ -361,8 +355,10 @@ ServerStateData::sendMoreRequestBody() MemBuf buf; if (requestBodySource->getMoreData(buf)) { debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes"); - requestSender = &ServerStateData::sentRequestBodyWrapper; - comm_write_mbuf(dataDescriptor(), &buf, requestSender, this); + typedef CommCbMemFunT Dialer; + requestSender = asyncCall(93,3, "ServerStateData::sentRequestBody", + Dialer(this, &ServerStateData::sentRequestBody)); + comm_write_mbuf(dataDescriptor(), &buf, requestSender); } else { debugs(9,3, HERE << "will wait for more request body bytes or eof"); requestSender = NULL; @@ -418,7 +414,7 @@ ServerStateData::startIcap(ICAPServiceRep::Pointer service, HttpRequest *cause) adaptedHeadSource = initiateIcap( new ICAPModXactLauncher(this, vrep, cause, service)); - return true; + return adaptedHeadSource != NULL; } // properly cleans up ICAP-related state @@ -488,7 +484,7 @@ ServerStateData::adaptVirginReplyBody(const char *data, ssize_t len) // can supply more virgin response body data void -ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &) +ServerStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) { if (responseBodyBuffer) { addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again @@ -502,7 +498,7 @@ ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &) // the consumer of our virgin response body aborted void -ServerStateData::noteBodyConsumerAborted(BodyPipe &bp) +ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer) { stopProducingFor(virginBodyDestination, false); @@ -537,7 +533,6 @@ ServerStateData::noteIcapAnswer(HttpMsg *msg) if (doneWithIcap()) // we may still be sending virgin response handleIcapCompleted(); } - } // will not receive adapted response headers (and, hence, body) @@ -603,7 +598,6 @@ ServerStateData::handleIcapCompleted() } completeForwarding(); - quitIfAllDone(); } diff --git a/src/Server.h b/src/Server.h index a360ed679a..6369d63a8f 100644 --- a/src/Server.h +++ b/src/Server.h @@ -1,6 +1,6 @@ /* - * $Id: Server.h,v 1.12 2008/02/08 18:30:18 rousskov Exp $ + * $Id: Server.h,v 1.13 2008/02/12 23:55:26 rousskov Exp $ * * AUTHOR: Duane Wessels * @@ -49,6 +49,8 @@ #include "StoreIOBuffer.h" #include "forward.h" #include "BodyPipe.h" +#include "ICAP/AsyncJob.h" +#include "CommCalls.h" #if ICAP_CLIENT #include "ICAP/ICAPServiceRep.h" @@ -75,9 +77,9 @@ public: // BodyConsumer: consume request body or adapted response body. // The implementation just calls the corresponding HTTP or ICAP handle*() // method, depending on the pipe. - virtual void noteMoreBodyDataAvailable(BodyPipe &); - virtual void noteBodyProductionEnded(BodyPipe &); - virtual void noteBodyProducerAborted(BodyPipe &); + virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer); + virtual void noteBodyProductionEnded(BodyPipe::Pointer); + virtual void noteBodyProducerAborted(BodyPipe::Pointer); // read response data from the network virtual void maybeReadVirginBody() = 0; @@ -97,11 +99,19 @@ public: virtual void noteIcapQueryAbort(bool final); // BodyProducer: provide virgin response body to ICAP. - virtual void noteMoreBodySpaceAvailable(BodyPipe &); - virtual void noteBodyConsumerAborted(BodyPipe &); + virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer ); + virtual void noteBodyConsumerAborted(BodyPipe::Pointer ); #endif virtual void processReplyBody() = 0; +//AsyncJob virtual methods + virtual bool doneAll() const { return +#if ICAP_CLIENT + ICAPInitiator::doneAll() && + BodyProducer::doneAll() && +#endif + BodyConsumer::doneAll() && false;} + public: // should be protected void serverComplete(); // call when no server communication is expected @@ -123,9 +133,8 @@ protected: // sending of the request body to the server void sendMoreRequestBody(); // has body; kids overwrite to increment I/O stats counters - virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag) = 0; + virtual void sentRequestBody(const CommIoCbParams &io) = 0; virtual void doneSendingRequestBody() = 0; - static IOCB sentRequestBodyWrapper; virtual void closeServer() = 0; // end communication with the server virtual bool doneWithServer() const = 0; // did we end communication? @@ -173,7 +182,7 @@ public: // should not be protected: BodyPipe::Pointer requestBodySource; // to consume request body - IOCB *requestSender; // set if we are expecting comm_write to call us back + AsyncCall::Pointer requestSender; // set if we are expecting comm_write to call us back #if ICAP_CLIENT BodyPipe::Pointer virginBodyDestination; // to provide virgin response body diff --git a/src/ftp.cc b/src/ftp.cc index 8c5dde6e69..0a194b6183 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -1,5 +1,5 @@ /* - * $Id: ftp.cc,v 1.444 2008/01/19 07:15:29 amosjeffries Exp $ + * $Id: ftp.cc,v 1.445 2008/02/12 23:55:26 rousskov Exp $ * * DEBUG: section 9 File Transfer Protocol (FTP) * AUTHOR: Harvest Derived @@ -116,6 +116,8 @@ class FtpStateData : public ServerStateData public: void *operator new (size_t); void operator delete (void *); + void *toCbdata() { return this; } + FtpStateData(FwdState *); ~FtpStateData(); char user[MAX_URL]; @@ -172,6 +174,7 @@ public: struct _ftp_flags flags; private: + AsyncCall::Pointer closeHandler; CBDATA_CLASS(FtpStateData); public: @@ -192,7 +195,7 @@ public: char *htmlifyListEntry(const char *line); void parseListing(); void dataComplete(); - void dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno); + void dataRead(const CommIoCbParams &io); int checkAuth(const HttpHeader * req_hdr); void checkUrlpath(); void buildTitleUrl(); @@ -207,18 +210,19 @@ public: void processReplyBody(); void writeCommand(const char *buf); - static PF ftpSocketClosed; static CNCB ftpPasvCallback; - static IOCB dataReadWrapper; static PF ftpDataWrite; - static PF ftpTimeout; - static IOCB ftpReadControlReply; - static IOCB ftpWriteCommandCallback; + void ftpTimeout(const CommTimeoutCbParams &io); + void ftpSocketClosed(const CommCloseCbParams &io); + void ftpReadControlReply(const CommIoCbParams &io); + void ftpWriteCommandCallback(const CommIoCbParams &io); + void ftpAcceptDataConnection(const CommAcceptCbParams &io); + static HttpReply *ftpAuthRequired(HttpRequest * request, const char *realm); static wordlist *ftpParseControlReply(char *, size_t, int *, size_t *); // sending of the request body to the server - virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag); + virtual void sentRequestBody(const CommIoCbParams&); virtual void doneSendingRequestBody(); virtual void haveParsedReplyHeaders(); @@ -379,15 +383,14 @@ FTPSM *FTP_SM_FUNCS[] = ftpReadMkdir /* SENT_MKDIR */ }; -void -FtpStateData::ftpSocketClosed(int fdnotused, void *data) +void +FtpStateData::ftpSocketClosed(const CommCloseCbParams &io) { - FtpStateData *ftpState = (FtpStateData *)data; - ftpState->ctrl.fd = -1; - delete ftpState; + ctrl.fd = -1; + deleteThis("FtpStateData::ftpSocketClosed"); } -FtpStateData::FtpStateData(FwdState *theFwdState) : ServerStateData(theFwdState) +FtpStateData::FtpStateData(FwdState *theFwdState) : AsyncJob("FtpStateData"), ServerStateData(theFwdState) { const char *url = entry->url(); debugs(9, 3, HERE << "'" << url << "'" ); @@ -403,7 +406,10 @@ FtpStateData::FtpStateData(FwdState *theFwdState) : ServerStateData(theFwdState) flags.rest_supported = 1; - comm_add_close_handler(ctrl.fd, ftpSocketClosed, this); + typedef CommCbMemFunT Dialer; + closeHandler = asyncCall(9, 5, "FtpStateData::ftpSocketClosed", + Dialer(this,&FtpStateData::ftpSocketClosed)); + comm_add_close_handler(ctrl.fd, closeHandler); if (request->method == METHOD_PUT) flags.put = 1; @@ -497,20 +503,18 @@ FtpStateData::loginParser(const char *login, int escaped) } void -FtpStateData::ftpTimeout(int fd, void *data) +FtpStateData::ftpTimeout(const CommTimeoutCbParams &io) { - FtpStateData *ftpState = (FtpStateData *)data; - StoreEntry *entry = ftpState->entry; - debugs(9, 4, HERE << "FD " << fd << ": '" << entry->url() << "'" ); + debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" ); - if (SENT_PASV == ftpState->state && fd == ftpState->data.fd) { + if (SENT_PASV == state && io.fd == data.fd) { /* stupid ftp.netscape.com */ - ftpState->fwd->dontRetry(false); - ftpState->fwd->ftpPasvFailed(true); - debugs(9, DBG_IMPORTANT, "Timeout in SENT_PASV state" ); + fwd->dontRetry(false); + fwd->ftpPasvFailed(true); + debugs(9, DBG_IMPORTANT, "ftpTimeout: timeout in SENT_PASV state" ); } - ftpState->failed(ERR_READ_TIMEOUT, 0); + failed(ERR_READ_TIMEOUT, 0); /* failed() closes ctrl.fd and frees ftpState */ } @@ -1212,14 +1216,6 @@ FtpStateData::dataComplete() scheduleReadControlReply(0); } -void -FtpStateData::dataReadWrapper(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno, void *data) -{ - FtpStateData *ftpState = (FtpStateData *)data; - ftpState->data.read_pending = false; - ftpState->dataRead(fd, buf, len, errflag, xerrno); -} - void FtpStateData::maybeReadVirginBody() { @@ -1238,30 +1234,38 @@ FtpStateData::maybeReadVirginBody() data.read_pending = true; - commSetTimeout(data.fd, Config.Timeout.read, ftpTimeout, this); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(this,&FtpStateData::ftpTimeout)); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); debugs(9,5,HERE << "queueing read on FD " << data.fd); - entry->delayAwareRead(data.fd, data.readBuf->space(), read_sz, dataReadWrapper, this); + typedef CommCbMemFunT Dialer; + entry->delayAwareRead(data.fd, data.readBuf->space(), read_sz, + asyncCall(9, 5, "FtpStateData::dataRead", + Dialer(this, &FtpStateData::dataRead))); } void -FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno) +FtpStateData::dataRead(const CommIoCbParams &io) { int j; int bin; - debugs(9, 3, HERE << "FD " << fd << " Read " << len << " bytes"); + data.read_pending = false; + + debugs(9, 3, HERE << "ftpDataRead: FD " << io.fd << " Read " << io.size << " bytes"); - if (len > 0) { - kb_incr(&statCounter.server.all.kbytes_in, len); - kb_incr(&statCounter.server.ftp.kbytes_in, len); + if (io.size > 0) { + kb_incr(&statCounter.server.all.kbytes_in, io.size); + kb_incr(&statCounter.server.ftp.kbytes_in, io.size); } - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - assert(fd == data.fd); + assert(io.fd == data.fd); #if DELAY_POOLS @@ -1274,36 +1278,41 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe return; } - if (errflag == COMM_OK && len > 0) { + if (io.flag == COMM_OK && io.size > 0) { #if DELAY_POOLS - delayId.bytesIn(len); + delayId.bytesIn(io.size); #endif } - if (errflag == COMM_OK && len > 0) { - debugs(9,5,HERE << "appended " << len << " bytes to readBuf"); - data.readBuf->appended(len); + if (io.flag == COMM_OK && io.size > 0) { + debugs(9,5,HERE << "appended " << io.size << " bytes to readBuf"); + data.readBuf->appended(io.size); #if DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); - delayId.bytesIn(len); + delayId.bytesIn(io.size); #endif IOStats.Ftp.reads++; - for (j = len - 1, bin = 0; j; bin++) + for (j = io.size - 1, bin = 0; j; bin++) j >>= 1; IOStats.Ftp.read_hist[bin]++; } - if (errflag != COMM_OK || len < 0) { - debugs(50, ignoreErrno(xerrno) ? 3 : DBG_IMPORTANT, HERE << "read error: " << xstrerr(xerrno)); + if (io.flag != COMM_OK || io.size < 0) { + debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT, + "ftpDataRead: read error: " << xstrerr(io.xerrno)); + + if (ignoreErrno(io.xerrno)) { + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(this,&FtpStateData::ftpTimeout)); + commSetTimeout(io.fd, Config.Timeout.read, timeoutCall); - if (ignoreErrno(xerrno)) { - commSetTimeout(fd, Config.Timeout.read, ftpTimeout, this); maybeReadVirginBody(); } else { if (!flags.http_header_sent && !fwd->ftpPasvFailed() && flags.pasv_supported) { @@ -1315,8 +1324,8 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe /* failed closes ctrl.fd and frees ftpState */ return; } - } else if (len == 0) { - debugs(9,3, HERE << "Calling dataComplete() because len == 0"); + } else if (io.size == 0) { + debugs(9,3, HERE << "Calling dataComplete() because io.size == 0"); /* * DPW 2007-04-23 * Dangerous curves ahead. This call to dataComplete was @@ -1336,7 +1345,7 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe void FtpStateData::processReplyBody() { - debugs(9, 3, HERE); + debugs(9, 3, HERE << "FtpStateData::processReplyBody starting."); if (request->method == METHOD_HEAD && (flags.isdir || theSize != -1)) { serverComplete(); @@ -1590,34 +1599,35 @@ FtpStateData::writeCommand(const char *buf) ctrl.last_command = ebuf; + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(9, 5, "FtpStateData::ftpWriteCommandCallback", + Dialer(this, &FtpStateData::ftpWriteCommandCallback)); comm_write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), - FtpStateData::ftpWriteCommandCallback, - this, NULL); + call); scheduleReadControlReply(0); } void -FtpStateData::ftpWriteCommandCallback(int fd, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data) +FtpStateData::ftpWriteCommandCallback(const CommIoCbParams &io) { - FtpStateData *ftpState = (FtpStateData *)data; - debugs(9, 5, HERE << "wrote " << size << " bytes"); + debugs(9, 5, "ftpWriteCommandCallback: wrote " << io.size << " bytes"); - if (size > 0) { - fd_bytes(fd, size, FD_WRITE); - kb_incr(&statCounter.server.all.kbytes_out, size); - kb_incr(&statCounter.server.ftp.kbytes_out, size); + if (io.size > 0) { + fd_bytes(io.fd, io.size, FD_WRITE); + kb_incr(&statCounter.server.all.kbytes_out, io.size); + kb_incr(&statCounter.server.ftp.kbytes_out, io.size); } - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - if (errflag) { - debugs(9, DBG_IMPORTANT, HERE << "FD " << fd << ": " << xstrerr(xerrno)); - ftpState->failed(ERR_WRITE_ERROR, xerrno); + if (io.flag) { + debugs(9, DBG_IMPORTANT, "ftpWriteCommandCallback: FD " << io.fd << ": " << xstrerr(io.xerrno)); + failed(ERR_WRITE_ERROR, io.xerrno); /* failed closes ctrl.fd and frees ftpState */ return; } @@ -1727,54 +1737,59 @@ FtpStateData::scheduleReadControlReply(int buffered_ok) handleControlReply(); } else { /* XXX What about Config.Timeout.read? */ - comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, ftpReadControlReply, this); + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer reader=asyncCall(9, 5, "FtpStateData::ftpReadControlReply", + Dialer(this, &FtpStateData::ftpReadControlReply)); + comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader); /* * Cancel the timeout on the Data socket (if any) and * establish one on the control socket. */ - if (data.fd > -1) - commSetTimeout(data.fd, -1, NULL, NULL); + if (data.fd > -1){ + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(data.fd, -1, nullCall); + } + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(this,&FtpStateData::ftpTimeout)); - commSetTimeout(ctrl.fd, Config.Timeout.read, ftpTimeout, - this); + commSetTimeout(ctrl.fd, Config.Timeout.read, timeoutCall); } } -void -FtpStateData::ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno, void *data) +void FtpStateData::ftpReadControlReply(const CommIoCbParams &io) { - FtpStateData *ftpState = (FtpStateData *)data; - StoreEntry *entry = ftpState->entry; - debugs(9, 3, HERE "FD " << fd << ", Read " << len << " bytes"); + debugs(9, 3, "ftpReadControlReply: FD " << io.fd << ", Read " << io.size << " bytes"); - if (len > 0) { - kb_incr(&statCounter.server.all.kbytes_in, len); - kb_incr(&statCounter.server.ftp.kbytes_in, len); + if (io.size > 0) { + kb_incr(&statCounter.server.all.kbytes_in, io.size); + kb_incr(&statCounter.server.ftp.kbytes_in, io.size); } - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - ftpState->abortTransaction("entry aborted during control reply read"); + abortTransaction("entry aborted during control reply read"); return; } - assert(ftpState->ctrl.offset < ftpState->ctrl.size); + assert(ctrl.offset < ctrl.size); - if (errflag == COMM_OK && len > 0) { - fd_bytes(fd, len, FD_READ); + if (io.flag == COMM_OK && io.size > 0) { + fd_bytes(io.fd, io.size, FD_READ); } + if (io.flag != COMM_OK || io.size < 0) { + debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT, + "ftpReadControlReply: read error: " << xstrerr(io.xerrno)); - if (errflag != COMM_OK || len < 0) { - debugs(50, ignoreErrno(xerrno) ? 3 : DBG_IMPORTANT, "ftpReadControlReply: read error: " << xstrerr(xerrno)); - - if (ignoreErrno(xerrno)) { - ftpState->scheduleReadControlReply(0); + if (ignoreErrno(io.xerrno)) { + scheduleReadControlReply(0); } else { - ftpState->failed(ERR_READ_ERROR, xerrno); + failed(ERR_READ_ERROR, io.xerrno); /* failed closes ctrl.fd and frees ftpState */ return; } @@ -1782,22 +1797,22 @@ FtpStateData::ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t errf return; } - if (len == 0) { + if (io.size == 0) { if (entry->store_status == STORE_PENDING) { - ftpState->failed(ERR_FTP_FAILURE, 0); + failed(ERR_FTP_FAILURE, 0); /* failed closes ctrl.fd and frees ftpState */ return; } /* XXX this may end up having to be serverComplete() .. */ - ftpState->abortTransaction("zero control reply read"); + abortTransaction("zero control reply read"); return; } - len += ftpState->ctrl.offset; - ftpState->ctrl.offset = len; - assert(len <= ftpState->ctrl.size); - ftpState->handleControlReply(); + unsigned int len =io.size + ctrl.offset; + ctrl.offset = len; + assert(len <= ctrl.size); + handleControlReply(); } void @@ -2512,7 +2527,11 @@ ftpSendPassive(FtpStateData * ftpState) * ugly hack for ftp servers like ftp.netscape.com that sometimes * dont acknowledge PASV commands. */ - commSetTimeout(ftpState->data.fd, 15, FtpStateData::ftpTimeout, ftpState); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(ftpState,&FtpStateData::ftpTimeout)); + + commSetTimeout(ftpState->data.fd, 15, timeoutCall); } void @@ -2857,26 +2876,18 @@ ftpReadEPRT(FtpStateData * ftpState) \par * "read" handler to accept FTP data connections. * - \param fd Handle/FD for the listening connection which has received a connect request. - \param details Some state data for the listening connection - \param newfd Handle/FD to the connection which has just been opened. - \param flag Error details for the listening connection. - \param xerrno ?? - \param data ?? + \param io comm accept(2) callback parameters */ -static void -ftpAcceptDataConnection(int fd, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { char ntoapeer[MAX_IPSTRLEN]; - FtpStateData *ftpState = (FtpStateData *)data; - debugs(9, 3, HERE); + debugs(9, 3, "ftpAcceptDataConnection"); - if (flag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) { - ftpState->abortTransaction("entry aborted when accepting data conn"); + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { + abortTransaction("entry aborted when accepting data conn"); return; } @@ -2886,52 +2897,58 @@ ftpAcceptDataConnection(int fd, int newfd, ConnectionDetail *details, * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { - details->peer.NtoA(ntoapeer,MAX_IPSTRLEN); - - if (strcmp(fd_table[ftpState->ctrl.fd].ipaddr, ntoapeer) != 0) { - debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << - details->peer << "), expecting " << fd_table[ftpState->ctrl.fd].ipaddr); - - comm_close(newfd); - comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); + io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); + + if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { + debugs(9, DBG_IMPORTANT, + "FTP data connection from unexpected server (" << + io.details.peer << "), expecting " << + fd_table[ctrl.fd].ipaddr); + + comm_close(io.nfd); + typedef CommCbMemFunT acceptDialer; + AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", + acceptDialer(this, &FtpStateData::ftpAcceptDataConnection)); + comm_accept(data.fd, acceptCall); return; } } - if (flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, HERE << "Comm Error for FD " << newfd << ": " << xstrerr(xerrno)); + if (io.flag != COMM_OK) { + debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: comm_accept(" << io.nfd << "): " << xstrerr(io.xerrno)); /** \todo XXX Need to set error message */ - ftpFail(ftpState); + ftpFail(this); return; } /**\par * Replace the Listen socket with the accepted data socket */ - debugs(9, 3, HERE << "Connected data socket on FD " << newfd); - - /* remember that details is state for fd, it will be erased by the following comm_close() */ - ftpState->data.port = details->peer.GetPort(); - - details->peer.NtoA(ftpState->data.host,SQUIDHOSTNAMELEN); + comm_close(data.fd); - comm_close(ftpState->data.fd); + data.fd = io.nfd; + data.port = io.details.peer.GetPort(); + io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); - ftpState->data.fd = newfd; + debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << + "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << + "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << + "data-peer= " << fd_table[data.fd].ipaddr); - debugs(9, 3, "FTP connection to " << details->peer << " FD table says: " << - " ctrl-peer= " << fd_table[ftpState->ctrl.fd].ipaddr << ", " << - " data-peer= " << fd_table[ftpState->data.fd].ipaddr ); - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ctrl.fd, -1, nullCall); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, ftpState); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(this,&FtpStateData::ftpTimeout)); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); /*\todo XXX We should have a flag to track connect state... * host NULL -> not connected, port == local port * host set -> connected, port == remote port */ /* Restart state (SENT_NLST/LIST/RETR) */ - FTP_SM_FUNCS[ftpState->state] (ftpState); + FTP_SM_FUNCS[state] (this); } static void @@ -3006,17 +3023,26 @@ void FtpStateData::readStor() { * Cancel the timeout on the Control socket and * establish one on the data socket. */ - commSetTimeout(ctrl.fd, -1, NULL, NULL); - commSetTimeout(data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, - this); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ctrl.fd, -1, nullCall); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(this,&FtpStateData::ftpTimeout)); + + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); state = WRITING_DATA; debugs(9, 3, HERE << "writing data channel"); } else if (code == 150) { /*\par * When client code is 150 with a hostname, Accept data channel. */ - debugs(9, 3, HERE << "accepting data channel"); - comm_accept(data.fd, ftpAcceptDataConnection, this); + debugs(9, 3, "ftpReadStor: accepting data channel"); + typedef CommCbMemFunT acceptDialer; + AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", + acceptDialer(this, &FtpStateData::ftpAcceptDataConnection)); + + comm_accept(data.fd, acceptCall); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); @@ -3138,17 +3164,27 @@ ftpReadList(FtpStateData * ftpState) * Cancel the timeout on the Control socket and establish one * on the data socket */ - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ftpState->ctrl.fd, -1, nullCall); return; } else if (code == 150) { /* Accept data channel */ - comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); + typedef CommCbMemFunT acceptDialer; + AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", + acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection)); + + comm_accept(ftpState->data.fd, acceptCall); /* * Cancel the timeout on the Control socket and establish one * on the data socket */ - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, ftpState); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ftpState->ctrl.fd, -1, nullCall); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(ftpState,&FtpStateData::ftpTimeout)); + commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); @@ -3189,17 +3225,25 @@ ftpReadRetr(FtpStateData * ftpState) * Cancel the timeout on the Control socket and establish one * on the data socket */ - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ftpState->ctrl.fd, -1, nullCall); } else if (code == 150) { /* Accept data channel */ - comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); + typedef CommCbMemFunT acceptDialer; + AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", + acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection)); + comm_accept(ftpState->data.fd, acceptCall); /* * Cancel the timeout on the Control socket and establish one * on the data socket */ - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, - ftpState); + AsyncCall::Pointer nullCall = NULL; + commSetTimeout(ftpState->ctrl.fd, -1, nullCall); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout", + TimeoutDialer(ftpState,&FtpStateData::ftpTimeout)); + commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ @@ -3244,11 +3288,11 @@ FtpStateData::handleRequestBodyProducerAborted() /* This will be called when the put write is completed */ void -FtpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) +FtpStateData::sentRequestBody(const CommIoCbParams &io) { - if (size > 0) - kb_incr(&statCounter.server.ftp.kbytes_out, size); - ServerStateData::sentRequestBody(fd, size, errflag); + if (io.size > 0) + kb_incr(&statCounter.server.ftp.kbytes_out, io.size); + ServerStateData::sentRequestBody(io); } static void @@ -3713,7 +3757,8 @@ FtpStateData::closeServer() if (ctrl.fd > -1) { fwd->unregister(ctrl.fd); - comm_remove_close_handler(ctrl.fd, ftpSocketClosed, this); + comm_remove_close_handler(ctrl.fd, closeHandler); + closeHandler = NULL; comm_close(ctrl.fd); ctrl.fd = -1; } @@ -3761,5 +3806,5 @@ FtpStateData::abortTransaction(const char *reason) } fwd->handleUnregisteredServerEnd(); - delete this; + deleteThis("FtpStateData::abortTransaction"); } diff --git a/src/http.cc b/src/http.cc index 88ab0ffa93..3b48de45d7 100644 --- a/src/http.cc +++ b/src/http.cc @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.546 2008/02/03 10:00:30 amosjeffries Exp $ + * $Id: http.cc,v 1.547 2008/02/12 23:55:26 rousskov Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -71,14 +71,12 @@ CBDATA_CLASS_INIT(HttpStateData); static const char *const crlf = "\r\n"; -static PF httpStateFree; -static PF httpTimeout; static void httpMaybeRemovePublic(StoreEntry *, http_status); static void copyOneHeaderFromClientsideRequestToUpstreamRequest(const HttpHeaderEntry *e, String strConnection, HttpRequest * request, HttpRequest * orig_request, HttpHeader * hdr_out, int we_do_ranges, http_state_flags); -HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdState), - header_bytes_read(0), reply_bytes_read(0), httpChunkDecoder(NULL) +HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), ServerStateData(theFwdState), + header_bytes_read(0), reply_bytes_read(0), httpChunkDecoder(NULL) { debugs(11,5,HERE << "HttpStateData " << this << " created"); ignoreCacheControl = false; @@ -134,7 +132,10 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdStat /* * register the handler to free HTTP state data when the FD closes */ - comm_add_close_handler(fd, httpStateFree, this); + typedef CommCbMemFunT Dialer; + closeHandler = asyncCall(9, 5, "httpStateData::httpStateConnClosed", + Dialer(this,&HttpStateData::httpStateConnClosed)); + comm_add_close_handler(fd, closeHandler); } HttpStateData::~HttpStateData() @@ -161,13 +162,20 @@ HttpStateData::dataDescriptor() const { return fd; } - +/* static void httpStateFree(int fd, void *data) { HttpStateData *httpState = static_cast(data); debugs(11, 5, "httpStateFree: FD " << fd << ", httpState=" << data); delete httpState; +}*/ + +void +HttpStateData::httpStateConnClosed(const CommCloseCbParams ¶ms) +{ + debugs(11, 5, "httpStateFree: FD " << params.fd << ", httpState=" << params.data); + deleteThis("HttpStateData::httpStateConnClosed"); } int @@ -183,15 +191,13 @@ httpCachable(const HttpRequestMethod& method) return 1; } -static void -httpTimeout(int fd, void *data) +void +HttpStateData::httpTimeout(const CommTimeoutCbParams ¶ms) { - HttpStateData *httpState = static_cast(data); - StoreEntry *entry = httpState->entry; debugs(11, 4, "httpTimeout: FD " << fd << ": '" << entry->url() << "'" ); if (entry->store_status == STORE_PENDING) { - httpState->fwd->fail(errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT, httpState->fwd->request)); + fwd->fail(errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT, fwd->request)); } comm_close(fd); @@ -946,6 +952,7 @@ HttpStateData::persistentConnStatus() const /* * This is the callback after some data has been read from the network */ +/* void HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { @@ -956,19 +963,23 @@ HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, httpState->readReply (len, flag, xerrno); PROF_stop(HttpStateData_readReply); } - +*/ /* XXX this function is too long! */ void -HttpStateData::readReply (size_t len, comm_err_t flag, int xerrno) +HttpStateData::readReply (const CommIoCbParams &io) { int bin; int clen; - flags.do_next_read = 0; + int len = io.size; + assert(fd == io.fd); + + flags.do_next_read = 0; + debugs(11, 5, "httpReadReply: FD " << fd << ": len " << len << "."); // Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us - if (flag == COMM_ERR_CLOSING) { + if (io.flag == COMM_ERR_CLOSING) { debugs(11, 3, "http socket closing"); return; } @@ -979,15 +990,15 @@ HttpStateData::readReply (size_t len, comm_err_t flag, int xerrno) } // handle I/O errors - if (flag != COMM_OK || len < 0) { + if (io.flag != COMM_OK || len < 0) { debugs(11, 2, "httpReadReply: FD " << fd << ": read failure: " << xstrerror() << "."); - if (ignoreErrno(xerrno)) { + if (ignoreErrno(io.xerrno)) { flags.do_next_read = 1; } else { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_BAD_GATEWAY, fwd->request); - err->xerrno = xerrno; + err->xerrno = io.xerrno; fwd->fail(err); flags.do_next_read = 0; comm_close(fd); @@ -1155,7 +1166,7 @@ HttpStateData::decodeAndWriteReplyBody() void HttpStateData::processReplyBody() { - + AsyncCall::Pointer call; IPAddress client_addr; if (!flags.headers_parsed) { @@ -1194,15 +1205,15 @@ HttpStateData::processReplyBody() (void) 0; } else switch (persistentConnStatus()) { - case INCOMPLETE_MSG: debugs(11, 5, "processReplyBody: INCOMPLETE_MSG"); /* Wait for more data or EOF condition */ - if (flags.keepalive_broken) { - commSetTimeout(fd, 10, NULL, NULL); + call = NULL; + commSetTimeout(fd, 10, call); } else { - commSetTimeout(fd, Config.Timeout.read, NULL, NULL); + call = NULL; + commSetTimeout(fd, Config.Timeout.read, call); } flags.do_next_read = 1; @@ -1211,10 +1222,12 @@ HttpStateData::processReplyBody() case COMPLETE_PERSISTENT_MSG: debugs(11, 5, "processReplyBody: COMPLETE_PERSISTENT_MSG"); /* yes we have to clear all these! */ - commSetTimeout(fd, -1, NULL, NULL); + call = NULL; + commSetTimeout(fd, -1, call); flags.do_next_read = 0; - comm_remove_close_handler(fd, httpStateFree, this); + comm_remove_close_handler(fd, closeHandler); + closeHandler = NULL; fwd->unregister(fd); #if LINUX_TPROXY @@ -1270,8 +1283,11 @@ HttpStateData::maybeReadVirginBody() } if (flags.do_next_read) { - flags.do_next_read = 0; - entry->delayAwareRead(fd, readBuf->space(read_sz), read_sz, ReadReplyWrapper, this); + flags.do_next_read = 0; + typedef CommCbMemFunT Dialer; + entry->delayAwareRead(fd, readBuf->space(read_sz), read_sz, + asyncCall(11, 5, "HttpStateData::readReply", + Dialer(this, &HttpStateData::readReply))); } } @@ -1279,29 +1295,28 @@ HttpStateData::maybeReadVirginBody() * This will be called when request write is complete. */ void -HttpStateData::SendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data) +HttpStateData::sendComplete(const CommIoCbParams &io) { - HttpStateData *httpState = static_cast(data); - debugs(11, 5, "httpSendComplete: FD " << fd << ": size " << size << ": errflag " << errflag << "."); + debugs(11, 5, "httpSendComplete: FD " << fd << ": size " << io.size << ": errflag " << io.flag << "."); #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); #endif - if (size > 0) { - fd_bytes(fd, size, FD_WRITE); - kb_incr(&statCounter.server.all.kbytes_out, size); - kb_incr(&statCounter.server.http.kbytes_out, size); + if (io.size > 0) { + fd_bytes(fd, io.size, FD_WRITE); + kb_incr(&statCounter.server.all.kbytes_out, io.size); + kb_incr(&statCounter.server.http.kbytes_out, io.size); } - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - if (errflag) { + if (io.flag) { ErrorState *err; - err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, httpState->fwd->request); - err->xerrno = xerrno; - httpState->fwd->fail(err); + err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); + err->xerrno = io.xerrno; + fwd->fail(err); comm_close(fd); return; } @@ -1314,9 +1329,13 @@ HttpStateData::SendComplete(int fd, char *bufnotused, size_t size, comm_err_t er * the timeout for POST/PUT requests that have very large * request bodies. */ - commSetTimeout(fd, Config.Timeout.read, httpTimeout, httpState); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(11, 5, "HttpStateData::httpTimeout", + TimeoutDialer(this,&HttpStateData::httpTimeout)); - httpState->flags.request_sent = 1; + commSetTimeout(fd, Config.Timeout.read, timeoutCall); + + flags.request_sent = 1; } // Close the HTTP server connection. Used by serverComplete(). @@ -1327,7 +1346,8 @@ HttpStateData::closeServer() if (fd >= 0) { fwd->unregister(fd); - comm_remove_close_handler(fd, httpStateFree, this); + comm_remove_close_handler(fd, closeHandler); + closeHandler = NULL; comm_close(fd); fd = -1; } @@ -1754,18 +1774,24 @@ HttpStateData::sendRequest() MemBuf mb; debugs(11, 5, "httpSendRequest: FD " << fd << ", request " << request << ", this " << this << "."); - - commSetTimeout(fd, Config.Timeout.lifetime, httpTimeout, this); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(11, 5, "HttpStateData::httpTimeout", + TimeoutDialer(this,&HttpStateData::httpTimeout)); + commSetTimeout(fd, Config.Timeout.lifetime, timeoutCall); flags.do_next_read = 1; maybeReadVirginBody(); if (orig_request->body_pipe != NULL) { if (!startRequestBodyFlow()) // register to receive body data return false; - requestSender = HttpStateData::sentRequestBodyWrapper; + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &HttpStateData::sentRequestBody); + requestSender = asyncCall(11,5, "HttpStateData::sentRequestBody", dialer); } else { assert(!requestBodySource); - requestSender = HttpStateData::SendComplete; + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &HttpStateData::sendComplete); + requestSender = asyncCall(11,5, "HttpStateData::SendComplete", dialer); } if (_peer != NULL) { @@ -1805,7 +1831,7 @@ HttpStateData::sendRequest() mb.init(); buildRequestPrefix(request, orig_request, entry, &mb, flags); debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf); - comm_write_mbuf(fd, &mb, requestSender, this); + comm_write_mbuf(fd, &mb, requestSender); return true; } @@ -1846,13 +1872,22 @@ HttpStateData::doneSendingRequestBody() if (!Config.accessList.brokenPosts) { debugs(11, 5, "doneSendingRequestBody: No brokenPosts list"); - HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this); + CommIoCbParams io(NULL); + io.fd=fd; + io.flag=COMM_OK; + sendComplete(io); } else if (!ch.fastCheck()) { debugs(11, 5, "doneSendingRequestBody: didn't match brokenPosts"); - HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this); + CommIoCbParams io(NULL); + io.fd=fd; + io.flag=COMM_OK; + sendComplete(io); } else { debugs(11, 2, "doneSendingRequestBody: matched brokenPosts"); - comm_write(fd, "\r\n", 2, HttpStateData::SendComplete, this, NULL); + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &HttpStateData::sendComplete); + AsyncCall::Pointer call= asyncCall(11,5, "HttpStateData::SendComplete", dialer); + comm_write(fd, "\r\n", 2, call); } } @@ -1893,17 +1928,20 @@ HttpStateData::handleRequestBodyProducerAborted() { ServerStateData::handleRequestBodyProducerAborted(); // XXX: SendComplete(COMM_ERR_CLOSING) does little. Is it enough? - SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, 0, this); + CommIoCbParams io(NULL); + io.fd=fd; + io.flag=COMM_ERR_CLOSING; + sendComplete(io); } // called when we wrote request headers(!) or a part of the body void -HttpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) +HttpStateData::sentRequestBody(const CommIoCbParams &io) { - if (size > 0) - kb_incr(&statCounter.server.http.kbytes_out, size); + if (io.size > 0) + kb_incr(&statCounter.server.http.kbytes_out, io.size); - ServerStateData::sentRequestBody(fd, size, errflag); + ServerStateData::sentRequestBody(io); } // Quickly abort the transaction @@ -1921,7 +1959,7 @@ HttpStateData::abortTransaction(const char *reason) } fwd->handleUnregisteredServerEnd(); - delete this; + deleteThis("HttpStateData::abortTransaction"); } void diff --git a/src/http.h b/src/http.h index ffec6722a3..db8fdf2753 100644 --- a/src/http.h +++ b/src/http.h @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.33 2007/12/26 23:39:55 hno Exp $ + * $Id: http.h,v 1.34 2008/02/12 23:55:26 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -47,8 +47,6 @@ public: HttpStateData(FwdState *); ~HttpStateData(); - static IOCB SendComplete; - static IOCB ReadReplyWrapper; static void httpBuildRequestHeader(HttpRequest * request, HttpRequest * orig_request, StoreEntry * entry, @@ -60,7 +58,7 @@ public: bool sendRequest(); void processReplyHeader(); void processReplyBody(); - void readReply(size_t len, comm_err_t flag, int xerrno); + void readReply(const CommIoCbParams &io); virtual void maybeReadVirginBody(); // read response data from the network int cacheableReply(); @@ -82,6 +80,7 @@ protected: virtual HttpRequest *originalRequest(); private: + AsyncCall::Pointer closeHandler; enum ConnectionStatus { INCOMPLETE_MSG, COMPLETE_PERSISTENT_MSG, @@ -107,7 +106,11 @@ private: bool decodeAndWriteReplyBody(); void doneSendingRequestBody(); void requestBodyHandler(MemBuf &); - virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag); + virtual void sentRequestBody(const CommIoCbParams &io); + void sendComplete(const CommIoCbParams &io); + void httpStateConnClosed(const CommCloseCbParams ¶ms); + void httpTimeout(const CommTimeoutCbParams ¶ms); + mb_size_t buildRequestPrefix(HttpRequest * request, HttpRequest * orig_request, StoreEntry * entry,