]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Merging async-call branch changes to HEAD:
authorrousskov <>
Wed, 13 Feb 2008 06:55:26 +0000 (06:55 +0000)
committerrousskov <>
Wed, 13 Feb 2008 06:55:26 +0000 (06:55 +0000)
Async-call work replaces event-based asynchronous calls with
stand-alone implementation. The common async call API allows Squid
core do call, debug, and troubleshoot all callback handlers in a
uniform way.

An async "job" API is introduced to manage independent logical threads
or work such as protocol transaction handlers on client, server, and
ICAP sides. These jobs should communicate with each other using async
calls to minimize dependencies and avoid reentrant callback loops.

These changes will eventually improve overall code quality, debugging
quality, and Squid robustness.

Below you will find log messages from the async-call branch that are
relevant to the file(s) being committed.

        Convert the comm_* calls to use CommCalls.

        Use the AsyncJob::deleteThis method as "delete this"
        replacement instead of the previously commited block "if
        (inCall) musStop(...) else delete this"

        ICAPInitiate::sendAnswer dialers take care of message locking
        now.

src/Server.cc
src/Server.h
src/ftp.cc
src/http.cc
src/http.h

index 8179112bc34456bb13f5cad54d441141b513e1f5..a65238553b951aa70f03c14189dd2cf7dd1aa280 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: Server.cc,v 1.24 2008/02/08 18:30:18 rousskov Exp $
+ * $Id: Server.cc,v 1.25 2008/02/12 23:55:26 rousskov Exp $
  *
  * DEBUG:
  * AUTHOR: Duane Wessels
@@ -45,7 +45,7 @@
 extern ICAPConfig TheICAPConfig;
 #endif
 
-ServerStateData::ServerStateData(FwdState *theFwdState): requestSender(NULL)
+ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),requestSender(NULL)
 #if ICAP_CLIENT
     , icapAccessCheckPending(false)
 #endif
@@ -178,7 +178,8 @@ void ServerStateData::quitIfAllDone() {
     }
 
     debugs(11,3, HERE << "transaction done");
-    delete this;
+
+    deleteThis("ServerStateData::quitIfAllDone");
 }
 
 // FTP side overloads this to work around multiple calls to fwd->complete
@@ -221,10 +222,10 @@ ServerStateData::abortOnBadEntry(const char *abortReason)
 
 // more request or adapted response body is available
 void
-ServerStateData::noteMoreBodyDataAvailable(BodyPipe &bp)
+ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
 {
 #if ICAP_CLIENT
-    if (adaptedBodySource == &bp) {
+    if (adaptedBodySource == bp) {
         handleMoreAdaptedBodyAvailable();
         return;
     }
@@ -234,10 +235,10 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe &bp)
 
 // the entire request or adapted response body was provided, successfully
 void
-ServerStateData::noteBodyProductionEnded(BodyPipe &bp)
+ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
 {
 #if ICAP_CLIENT
-    if (adaptedBodySource == &bp) {
+    if (adaptedBodySource == bp) {
         handleAdaptedBodyProductionEnded();
         return;
     }
@@ -247,10 +248,10 @@ ServerStateData::noteBodyProductionEnded(BodyPipe &bp)
 
 // premature end of the request or adapted response body production
 void
-ServerStateData::noteBodyProducerAborted(BodyPipe &bp)
+ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
 {
 #if ICAP_CLIENT
-    if (adaptedBodySource == &bp) {
+    if (adaptedBodySource == bp) {
         handleAdaptedBodyProducerAborted();
         return;
     }
@@ -302,29 +303,22 @@ ServerStateData::handleRequestBodyProducerAborted()
     // kids extend this
 }
 
-void
-ServerStateData::sentRequestBodyWrapper(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
-{
-    ServerStateData *server = static_cast<ServerStateData *>(data);
-    server->sentRequestBody(fd, size, errflag);
-}
-
 // called when we wrote request headers(!) or a part of the body
 void
-ServerStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag)
+ServerStateData::sentRequestBody(const CommIoCbParams &io)
 {
-    debugs(11, 5, "sentRequestBody: FD " << fd << ": size " << size << ": errflag " << errflag << ".");
+    debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
     debugs(32,3,HERE << "sentRequestBody called");
 
     requestSender = NULL;
 
-    if (size > 0) {
-        fd_bytes(fd, size, FD_WRITE);
-        kb_incr(&statCounter.server.all.kbytes_out, size);
+    if (io.size > 0) {
+        fd_bytes(io.fd, io.size, FD_WRITE);
+        kb_incr(&statCounter.server.all.kbytes_out, io.size);
         // kids should increment their counters
     }
 
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
     if (!requestBodySource) {
@@ -332,8 +326,8 @@ ServerStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag)
         return; // do nothing;
     }
 
-    if (errflag) {
-        debugs(11, 1, "sentRequestBody error: FD " << fd << ": " << xstrerr(errno));
+    if (io.flag) {
+        debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(errno));
         ErrorState *err;
         err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
         err->xerrno = errno;
@@ -361,8 +355,10 @@ ServerStateData::sendMoreRequestBody()
     MemBuf buf;
     if (requestBodySource->getMoreData(buf)) {
         debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
-        requestSender = &ServerStateData::sentRequestBodyWrapper;
-        comm_write_mbuf(dataDescriptor(), &buf, requestSender, this);
+       typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
+       requestSender = asyncCall(93,3, "ServerStateData::sentRequestBody",
+                                 Dialer(this, &ServerStateData::sentRequestBody));
+        comm_write_mbuf(dataDescriptor(), &buf, requestSender);
     } else {
         debugs(9,3, HERE << "will wait for more request body bytes or eof");
         requestSender = NULL;
@@ -418,7 +414,7 @@ ServerStateData::startIcap(ICAPServiceRep::Pointer service, HttpRequest *cause)
 
     adaptedHeadSource = initiateIcap(
         new ICAPModXactLauncher(this, vrep, cause, service));
-    return true;
+    return adaptedHeadSource != NULL;
 }
 
 // properly cleans up ICAP-related state
@@ -488,7 +484,7 @@ ServerStateData::adaptVirginReplyBody(const char *data, ssize_t len)
 
 // can supply more virgin response body data
 void
-ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &)
+ServerStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
 {
     if (responseBodyBuffer) {
         addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
@@ -502,7 +498,7 @@ ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &)
 
 // the consumer of our virgin response body aborted
 void
-ServerStateData::noteBodyConsumerAborted(BodyPipe &bp)
+ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer)
 {
     stopProducingFor(virginBodyDestination, false);
 
@@ -537,7 +533,6 @@ ServerStateData::noteIcapAnswer(HttpMsg *msg)
         if (doneWithIcap()) // we may still be sending virgin response
             handleIcapCompleted();
     }
-
 }
 
 // will not receive adapted response headers (and, hence, body)
@@ -603,7 +598,6 @@ ServerStateData::handleIcapCompleted()
     }
 
     completeForwarding();
-
     quitIfAllDone();
 }
 
index a360ed679aaa7725be36516fec0cb60d37ef0dcf..6369d63a8f9624748f7f216b38111e20db3b4c5d 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: Server.h,v 1.12 2008/02/08 18:30:18 rousskov Exp $
+ * $Id: Server.h,v 1.13 2008/02/12 23:55:26 rousskov Exp $
  *
  * AUTHOR: Duane Wessels
  *
@@ -49,6 +49,8 @@
 #include "StoreIOBuffer.h"
 #include "forward.h"
 #include "BodyPipe.h"
+#include "ICAP/AsyncJob.h"
+#include "CommCalls.h"
 
 #if ICAP_CLIENT
 #include "ICAP/ICAPServiceRep.h"
@@ -75,9 +77,9 @@ public:
     // BodyConsumer: consume request body or adapted response body.
     // The implementation just calls the corresponding HTTP or ICAP handle*()
     // method, depending on the pipe.
-    virtual void noteMoreBodyDataAvailable(BodyPipe &);
-    virtual void noteBodyProductionEnded(BodyPipe &);
-    virtual void noteBodyProducerAborted(BodyPipe &);
+    virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer);
+    virtual void noteBodyProductionEnded(BodyPipe::Pointer);
+    virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
     // read response data from the network
     virtual void maybeReadVirginBody() = 0;
@@ -97,11 +99,19 @@ public:
     virtual void noteIcapQueryAbort(bool final);
 
     // BodyProducer: provide virgin response body to ICAP.
-    virtual void noteMoreBodySpaceAvailable(BodyPipe &);
-    virtual void noteBodyConsumerAborted(BodyPipe &);
+    virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer );
+    virtual void noteBodyConsumerAborted(BodyPipe::Pointer );
 #endif
     virtual void processReplyBody() = 0;
 
+//AsyncJob virtual methods
+    virtual bool doneAll() const { return
+#if ICAP_CLIENT
+                                       ICAPInitiator::doneAll() &&
+                                      BodyProducer::doneAll() &&
+#endif
+                                      BodyConsumer::doneAll() && false;}
+
 public: // should be protected
     void serverComplete(); // call when no server communication is expected
 
@@ -123,9 +133,8 @@ protected:
     // sending of the request body to the server
     void sendMoreRequestBody();
     // has body; kids overwrite to increment I/O stats counters
-    virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag) = 0;
+    virtual void sentRequestBody(const CommIoCbParams &io) = 0;
     virtual void doneSendingRequestBody() = 0;
-    static IOCB sentRequestBodyWrapper;
 
     virtual void closeServer() = 0; // end communication with the server
     virtual bool doneWithServer() const = 0; // did we end communication?
@@ -173,7 +182,7 @@ public: // should not be
 
 protected:
     BodyPipe::Pointer requestBodySource; // to consume request body
-    IOCB *requestSender; // set if we are expecting comm_write to call us back
+    AsyncCall::Pointer requestSender; // set if we are expecting comm_write to call us back
 
 #if ICAP_CLIENT
     BodyPipe::Pointer virginBodyDestination; // to provide virgin response body
index 8c5dde6e699431aa51fb2efbcf0cc71f81d26125..0a194b618346b948175e08736a54f1787a6ca338 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: ftp.cc,v 1.444 2008/01/19 07:15:29 amosjeffries Exp $
+ * $Id: ftp.cc,v 1.445 2008/02/12 23:55:26 rousskov Exp $
  *
  * DEBUG: section 9     File Transfer Protocol (FTP)
  * AUTHOR: Harvest Derived
@@ -116,6 +116,8 @@ class FtpStateData : public ServerStateData
 public:
     void *operator new (size_t);
     void operator delete (void *);
+    void *toCbdata() { return this; }
+
     FtpStateData(FwdState *);
     ~FtpStateData();
     char user[MAX_URL];
@@ -172,6 +174,7 @@ public:
     struct _ftp_flags flags;
 
 private:
+    AsyncCall::Pointer closeHandler;
     CBDATA_CLASS(FtpStateData);
 
 public:
@@ -192,7 +195,7 @@ public:
     char *htmlifyListEntry(const char *line);
     void parseListing();
     void dataComplete();
-    void dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno);
+    void dataRead(const CommIoCbParams &io);
     int checkAuth(const HttpHeader * req_hdr);
     void checkUrlpath();
     void buildTitleUrl();
@@ -207,18 +210,19 @@ public:
     void processReplyBody();
     void writeCommand(const char *buf);
 
-    static PF ftpSocketClosed;
     static CNCB ftpPasvCallback;
-    static IOCB dataReadWrapper;
     static PF ftpDataWrite;
-    static PF ftpTimeout;
-    static IOCB ftpReadControlReply;
-    static IOCB ftpWriteCommandCallback;
+    void ftpTimeout(const CommTimeoutCbParams &io);
+    void ftpSocketClosed(const CommCloseCbParams &io);
+    void ftpReadControlReply(const CommIoCbParams &io);
+    void ftpWriteCommandCallback(const CommIoCbParams &io);
+    void ftpAcceptDataConnection(const CommAcceptCbParams &io);
+
     static HttpReply *ftpAuthRequired(HttpRequest * request, const char *realm);
     static wordlist *ftpParseControlReply(char *, size_t, int *, size_t *);
 
     // sending of the request body to the server
-    virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag);
+    virtual void sentRequestBody(const CommIoCbParams&);
     virtual void doneSendingRequestBody();
 
     virtual void haveParsedReplyHeaders();
@@ -379,15 +383,14 @@ FTPSM *FTP_SM_FUNCS[] =
         ftpReadMkdir           /* SENT_MKDIR */
     };
 
-void
-FtpStateData::ftpSocketClosed(int fdnotused, void *data)
+void 
+FtpStateData::ftpSocketClosed(const CommCloseCbParams &io)
 {
-    FtpStateData *ftpState = (FtpStateData *)data;
-    ftpState->ctrl.fd = -1;
-    delete ftpState;
+    ctrl.fd = -1;
+    deleteThis("FtpStateData::ftpSocketClosed");
 }
 
-FtpStateData::FtpStateData(FwdState *theFwdState) : ServerStateData(theFwdState)
+FtpStateData::FtpStateData(FwdState *theFwdState) : AsyncJob("FtpStateData"), ServerStateData(theFwdState)
 {
     const char *url = entry->url();
     debugs(9, 3, HERE << "'" << url << "'" );
@@ -403,7 +406,10 @@ FtpStateData::FtpStateData(FwdState *theFwdState) : ServerStateData(theFwdState)
 
     flags.rest_supported = 1;
 
-    comm_add_close_handler(ctrl.fd, ftpSocketClosed, this);
+    typedef CommCbMemFunT<FtpStateData, CommCloseCbParams> Dialer;
+    closeHandler = asyncCall(9, 5, "FtpStateData::ftpSocketClosed",
+                                Dialer(this,&FtpStateData::ftpSocketClosed));
+    comm_add_close_handler(ctrl.fd, closeHandler);
 
     if (request->method == METHOD_PUT)
         flags.put = 1;
@@ -497,20 +503,18 @@ FtpStateData::loginParser(const char *login, int escaped)
 }
 
 void
-FtpStateData::ftpTimeout(int fd, void *data)
+FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
 {
-    FtpStateData *ftpState = (FtpStateData *)data;
-    StoreEntry *entry = ftpState->entry;
-    debugs(9, 4, HERE << "FD " << fd << ": '" << entry->url() << "'" );
+    debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" );
 
-    if (SENT_PASV == ftpState->state && fd == ftpState->data.fd) {
+    if (SENT_PASV == state && io.fd == data.fd) {
         /* stupid ftp.netscape.com */
-        ftpState->fwd->dontRetry(false);
-        ftpState->fwd->ftpPasvFailed(true);
-        debugs(9, DBG_IMPORTANT, "Timeout in SENT_PASV state" );
+        fwd->dontRetry(false);
+        fwd->ftpPasvFailed(true);
+        debugs(9, DBG_IMPORTANT, "ftpTimeout: timeout in SENT_PASV state" );
     }
 
-    ftpState->failed(ERR_READ_TIMEOUT, 0);
+    failed(ERR_READ_TIMEOUT, 0);
     /* failed() closes ctrl.fd and frees ftpState */
 }
 
@@ -1212,14 +1216,6 @@ FtpStateData::dataComplete()
     scheduleReadControlReply(0);
 }
 
-void
-FtpStateData::dataReadWrapper(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno, void *data)
-{
-    FtpStateData *ftpState = (FtpStateData *)data;
-    ftpState->data.read_pending = false;
-    ftpState->dataRead(fd, buf, len, errflag, xerrno);
-}
-
 void
 FtpStateData::maybeReadVirginBody()
 {
@@ -1238,30 +1234,38 @@ FtpStateData::maybeReadVirginBody()
 
     data.read_pending = true;
 
-    commSetTimeout(data.fd, Config.Timeout.read, ftpTimeout, this);
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                        TimeoutDialer(this,&FtpStateData::ftpTimeout));
+    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
 
     debugs(9,5,HERE << "queueing read on FD " << data.fd);
 
-    entry->delayAwareRead(data.fd, data.readBuf->space(), read_sz, dataReadWrapper, this);
+    typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
+    entry->delayAwareRead(data.fd, data.readBuf->space(), read_sz,
+       asyncCall(9, 5, "FtpStateData::dataRead",
+             Dialer(this, &FtpStateData::dataRead)));
 }
 
 void
-FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno)
+FtpStateData::dataRead(const CommIoCbParams &io)
 {
     int j;
     int bin;
 
-    debugs(9, 3, HERE << "FD " << fd << " Read " << len << " bytes");
+    data.read_pending = false;
+
+    debugs(9, 3, HERE << "ftpDataRead: FD " << io.fd << " Read " << io.size << " bytes");
 
-    if (len > 0) {
-        kb_incr(&statCounter.server.all.kbytes_in, len);
-        kb_incr(&statCounter.server.ftp.kbytes_in, len);
+    if (io.size > 0) {
+        kb_incr(&statCounter.server.all.kbytes_in, io.size);
+        kb_incr(&statCounter.server.ftp.kbytes_in, io.size);
     }
 
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    assert(fd == data.fd);
+    assert(io.fd == data.fd);
 
 #if DELAY_POOLS
 
@@ -1274,36 +1278,41 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe
         return;
     }
 
-    if (errflag == COMM_OK && len > 0) {
+    if (io.flag == COMM_OK && io.size > 0) {
 #if DELAY_POOLS
-        delayId.bytesIn(len);
+        delayId.bytesIn(io.size);
 #endif
 
     }
 
 
-    if (errflag == COMM_OK && len > 0) {
-        debugs(9,5,HERE << "appended " << len << " bytes to readBuf");
-        data.readBuf->appended(len);
+    if (io.flag == COMM_OK && io.size > 0) {
+        debugs(9,5,HERE << "appended " << io.size << " bytes to readBuf");
+        data.readBuf->appended(io.size);
 #if DELAY_POOLS
 
         DelayId delayId = entry->mem_obj->mostBytesAllowed();
-        delayId.bytesIn(len);
+        delayId.bytesIn(io.size);
 #endif
 
         IOStats.Ftp.reads++;
 
-        for (j = len - 1, bin = 0; j; bin++)
+        for (j = io.size - 1, bin = 0; j; bin++)
             j >>= 1;
 
         IOStats.Ftp.read_hist[bin]++;
     }
 
-    if (errflag != COMM_OK || len < 0) {
-         debugs(50, ignoreErrno(xerrno) ? 3 : DBG_IMPORTANT, HERE << "read error: " << xstrerr(xerrno));
+    if (io.flag != COMM_OK || io.size < 0) {
+         debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT,
+             "ftpDataRead: read error: " << xstrerr(io.xerrno));
+
+        if (ignoreErrno(io.xerrno)) {
+            typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+            AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                        TimeoutDialer(this,&FtpStateData::ftpTimeout));
+            commSetTimeout(io.fd, Config.Timeout.read, timeoutCall);
 
-        if (ignoreErrno(xerrno)) {
-            commSetTimeout(fd, Config.Timeout.read, ftpTimeout, this);
             maybeReadVirginBody();
         } else {
             if (!flags.http_header_sent && !fwd->ftpPasvFailed() && flags.pasv_supported) {
@@ -1315,8 +1324,8 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe
             /* failed closes ctrl.fd and frees ftpState */
             return;
         }
-    } else if (len == 0) {
-       debugs(9,3, HERE << "Calling dataComplete() because len == 0");
+    } else if (io.size == 0) {
+        debugs(9,3, HERE << "Calling dataComplete() because io.size == 0");
        /*
         * DPW 2007-04-23
         * Dangerous curves ahead.  This call to dataComplete was
@@ -1336,7 +1345,7 @@ FtpStateData::dataRead(int fd, char *buf, size_t len, comm_err_t errflag, int xe
 void
 FtpStateData::processReplyBody()
 {
-    debugs(9, 3, HERE);
+    debugs(9, 3, HERE << "FtpStateData::processReplyBody starting.");
 
     if (request->method == METHOD_HEAD && (flags.isdir || theSize != -1)) {
         serverComplete();
@@ -1590,34 +1599,35 @@ FtpStateData::writeCommand(const char *buf)
 
     ctrl.last_command = ebuf;
 
+    typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(9, 5, "FtpStateData::ftpWriteCommandCallback",
+                                       Dialer(this, &FtpStateData::ftpWriteCommandCallback));
     comm_write(ctrl.fd,
                ctrl.last_command,
                strlen(ctrl.last_command),
-               FtpStateData::ftpWriteCommandCallback,
-               this, NULL);
+              call);
 
     scheduleReadControlReply(0);
 }
 
 void
-FtpStateData::ftpWriteCommandCallback(int fd, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data)
+FtpStateData::ftpWriteCommandCallback(const CommIoCbParams &io)
 {
-    FtpStateData *ftpState = (FtpStateData *)data;
 
-    debugs(9, 5, HERE << "wrote " << size << " bytes");
+    debugs(9, 5, "ftpWriteCommandCallback: wrote " << io.size << " bytes");
 
-    if (size > 0) {
-        fd_bytes(fd, size, FD_WRITE);
-        kb_incr(&statCounter.server.all.kbytes_out, size);
-        kb_incr(&statCounter.server.ftp.kbytes_out, size);
+    if (io.size > 0) {
+        fd_bytes(io.fd, io.size, FD_WRITE);
+        kb_incr(&statCounter.server.all.kbytes_out, io.size);
+        kb_incr(&statCounter.server.ftp.kbytes_out, io.size);
     }
 
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    if (errflag) {
-        debugs(9, DBG_IMPORTANT, HERE << "FD " << fd << ": " << xstrerr(xerrno));
-        ftpState->failed(ERR_WRITE_ERROR, xerrno);
+    if (io.flag) {
+        debugs(9, DBG_IMPORTANT, "ftpWriteCommandCallback: FD " << io.fd << ": " << xstrerr(io.xerrno));
+        failed(ERR_WRITE_ERROR, io.xerrno);
         /* failed closes ctrl.fd and frees ftpState */
         return;
     }
@@ -1727,54 +1737,59 @@ FtpStateData::scheduleReadControlReply(int buffered_ok)
         handleControlReply();
     } else {
         /* XXX What about Config.Timeout.read? */
-        comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, ftpReadControlReply, this);
+       typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
+       AsyncCall::Pointer reader=asyncCall(9, 5, "FtpStateData::ftpReadControlReply",
+                                   Dialer(this, &FtpStateData::ftpReadControlReply));
+       comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
         /*
          * Cancel the timeout on the Data socket (if any) and
          * establish one on the control socket.
          */
 
-        if (data.fd > -1)
-            commSetTimeout(data.fd, -1, NULL, NULL);
+        if (data.fd > -1){
+           AsyncCall::Pointer nullCall =  NULL;
+            commSetTimeout(data.fd, -1, nullCall);
+       }
+
+       typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(this,&FtpStateData::ftpTimeout));
 
-        commSetTimeout(ctrl.fd, Config.Timeout.read, ftpTimeout,
-                       this);
+        commSetTimeout(ctrl.fd, Config.Timeout.read, timeoutCall);
     }
 }
 
-void
-FtpStateData::ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t errflag, int xerrno, void *data)
+void FtpStateData::ftpReadControlReply(const CommIoCbParams &io)
 {
-    FtpStateData *ftpState = (FtpStateData *)data;
-    StoreEntry *entry = ftpState->entry;
-    debugs(9, 3, HERE "FD " << fd << ", Read " << len << " bytes");
+    debugs(9, 3, "ftpReadControlReply: FD " << io.fd << ", Read " << io.size << " bytes");
 
-    if (len > 0) {
-        kb_incr(&statCounter.server.all.kbytes_in, len);
-        kb_incr(&statCounter.server.ftp.kbytes_in, len);
+    if (io.size > 0) {
+        kb_incr(&statCounter.server.all.kbytes_in, io.size);
+        kb_incr(&statCounter.server.ftp.kbytes_in, io.size);
     }
 
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        ftpState->abortTransaction("entry aborted during control reply read");
+        abortTransaction("entry aborted during control reply read");
         return;
     }
 
-    assert(ftpState->ctrl.offset < ftpState->ctrl.size);
+    assert(ctrl.offset < ctrl.size);
 
-    if (errflag == COMM_OK && len > 0) {
-        fd_bytes(fd, len, FD_READ);
+    if (io.flag == COMM_OK && io.size > 0) {
+        fd_bytes(io.fd, io.size, FD_READ);
     }
 
+    if (io.flag != COMM_OK || io.size < 0) {
+        debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT, 
+            "ftpReadControlReply: read error: " << xstrerr(io.xerrno));
 
-    if (errflag != COMM_OK || len < 0) {
-         debugs(50, ignoreErrno(xerrno) ? 3 : DBG_IMPORTANT, "ftpReadControlReply: read error: " << xstrerr(xerrno));
-
-        if (ignoreErrno(xerrno)) {
-            ftpState->scheduleReadControlReply(0);
+        if (ignoreErrno(io.xerrno)) {
+            scheduleReadControlReply(0);
         } else {
-            ftpState->failed(ERR_READ_ERROR, xerrno);
+            failed(ERR_READ_ERROR, io.xerrno);
             /* failed closes ctrl.fd and frees ftpState */
             return;
         }
@@ -1782,22 +1797,22 @@ FtpStateData::ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t errf
         return;
     }
 
-    if (len == 0) {
+    if (io.size == 0) {
         if (entry->store_status == STORE_PENDING) {
-            ftpState->failed(ERR_FTP_FAILURE, 0);
+            failed(ERR_FTP_FAILURE, 0);
             /* failed closes ctrl.fd and frees ftpState */
             return;
         }
 
     /* XXX this may end up having to be serverComplete() .. */
-        ftpState->abortTransaction("zero control reply read");
+        abortTransaction("zero control reply read");
         return;
     }
 
-    len += ftpState->ctrl.offset;
-    ftpState->ctrl.offset = len;
-    assert(len <= ftpState->ctrl.size);
-    ftpState->handleControlReply();
+    unsigned int len =io.size + ctrl.offset;
+    ctrl.offset = len;
+    assert(len <= ctrl.size);
+    handleControlReply();
 }
 
 void
@@ -2512,7 +2527,11 @@ ftpSendPassive(FtpStateData * ftpState)
      * ugly hack for ftp servers like ftp.netscape.com that sometimes
      * dont acknowledge PASV commands.
      */
-    commSetTimeout(ftpState->data.fd, 15, FtpStateData::ftpTimeout, ftpState);
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+
+    commSetTimeout(ftpState->data.fd, 15, timeoutCall);
 }
 
 void
@@ -2857,26 +2876,18 @@ ftpReadEPRT(FtpStateData * ftpState)
  \par
  * "read" handler to accept FTP data connections.
  *
- \param fd     Handle/FD for the listening connection which has received a connect request.
- \param details        Some state data for the listening connection
- \param newfd  Handle/FD to the connection which has just been opened.
- \param flag   Error details for the listening connection. 
- \param xerrno ??
- \param data   ??
+ \param io comm accept(2) callback parameters
  */
-static void
-ftpAcceptDataConnection(int fd, int newfd, ConnectionDetail *details,
-                        comm_err_t flag, int xerrno, void *data)
+void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
 {
     char ntoapeer[MAX_IPSTRLEN];
-    FtpStateData *ftpState = (FtpStateData *)data;
-    debugs(9, 3, HERE);
+    debugs(9, 3, "ftpAcceptDataConnection");
 
-    if (flag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) {
-        ftpState->abortTransaction("entry aborted when accepting data conn");
+    if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
+        abortTransaction("entry aborted when accepting data conn");
         return;
     }
 
@@ -2886,52 +2897,58 @@ ftpAcceptDataConnection(int fd, int newfd, ConnectionDetail *details,
      * This prevents third-party hacks, but also third-party load balancing handshakes.
      */
     if (Config.Ftp.sanitycheck) {
-        details->peer.NtoA(ntoapeer,MAX_IPSTRLEN);
-
-        if (strcmp(fd_table[ftpState->ctrl.fd].ipaddr, ntoapeer) != 0) {
-            debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" <<
-                   details->peer << "), expecting " << fd_table[ftpState->ctrl.fd].ipaddr);
-
-            comm_close(newfd);
-            comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
+        io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
+
+        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+            debugs(9, DBG_IMPORTANT, 
+                "FTP data connection from unexpected server (" <<
+                io.details.peer << "), expecting " <<
+                fd_table[ctrl.fd].ipaddr);
+
+            comm_close(io.nfd);
+           typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+           AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+                        acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+            comm_accept(data.fd, acceptCall);
             return;
         }
     }
 
-    if (flag != COMM_OK) {
-        debugs(9, DBG_IMPORTANT, HERE << "Comm Error for FD " << newfd << ": " << xstrerr(xerrno));
+    if (io.flag != COMM_OK) {
+        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: comm_accept(" << io.nfd << "): " << xstrerr(io.xerrno));
         /** \todo XXX Need to set error message */
-        ftpFail(ftpState);
+        ftpFail(this);
         return;
     }
 
     /**\par
      * Replace the Listen socket with the accepted data socket */
-    debugs(9, 3, HERE << "Connected data socket on FD " << newfd);
-
-    /* remember that details is state for fd, it will be erased by the following comm_close() */
-    ftpState->data.port = details->peer.GetPort();
-
-    details->peer.NtoA(ftpState->data.host,SQUIDHOSTNAMELEN);
+    comm_close(data.fd);
 
-    comm_close(ftpState->data.fd);
+    data.fd = io.nfd;
+    data.port = io.details.peer.GetPort();
+    io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
 
-    ftpState->data.fd = newfd;
+    debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
+        "FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
+        "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
+        "data-peer= " << fd_table[data.fd].ipaddr);
 
-    debugs(9, 3, "FTP connection to " << details->peer << " FD table says: " <<
-                 " ctrl-peer= " << fd_table[ftpState->ctrl.fd].ipaddr << ", " <<
-                 " data-peer= " << fd_table[ftpState->data.fd].ipaddr );
 
-    commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
+    AsyncCall::Pointer nullCall = NULL;
+    commSetTimeout(ctrl.fd, -1, nullCall);
 
-    commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, ftpState);
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(this,&FtpStateData::ftpTimeout));
+    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
 
     /*\todo XXX We should have a flag to track connect state...
      *    host NULL -> not connected, port == local port
      *    host set  -> connected, port == remote port
      */
     /* Restart state (SENT_NLST/LIST/RETR) */
-    FTP_SM_FUNCS[ftpState->state] (ftpState);
+    FTP_SM_FUNCS[state] (this);
 }
 
 static void
@@ -3006,17 +3023,26 @@ void FtpStateData::readStor() {
          * Cancel the timeout on the Control socket and
          * establish one on the data socket.
          */
-        commSetTimeout(ctrl.fd, -1, NULL, NULL);
-        commSetTimeout(data.fd, Config.Timeout.read, FtpStateData::ftpTimeout,
-                       this);
+       AsyncCall::Pointer nullCall = NULL;
+        commSetTimeout(ctrl.fd, -1, nullCall);
+
+       typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(this,&FtpStateData::ftpTimeout));
+
+        commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
 
         state = WRITING_DATA;
         debugs(9, 3, HERE << "writing data channel");
     } else if (code == 150) {
         /*\par
          * When client code is 150 with a hostname, Accept data channel. */
-        debugs(9, 3, HERE << "accepting data channel");
-        comm_accept(data.fd, ftpAcceptDataConnection, this);
+        debugs(9, 3, "ftpReadStor: accepting data channel");
+        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+        AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+            acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+
+        comm_accept(data.fd, acceptCall);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3138,17 +3164,27 @@ ftpReadList(FtpStateData * ftpState)
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
          */
-        commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
+       AsyncCall::Pointer nullCall = NULL;
+        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
         return;
     } else if (code == 150) {
         /* Accept data channel */
-        comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
+       typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+       AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+                              acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+
+        comm_accept(ftpState->data.fd, acceptCall);
         /*
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
          */
-        commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, ftpState);
+       AsyncCall::Pointer nullCall = NULL;
+        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
+       
+       typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
@@ -3189,17 +3225,25 @@ ftpReadRetr(FtpStateData * ftpState)
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
          */
-        commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
+       AsyncCall::Pointer nullCall = NULL;
+        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
     } else if (code == 150) {
         /* Accept data channel */
-        comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
+       typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+       AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+                        acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+        comm_accept(ftpState->data.fd, acceptCall);
         /*
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
          */
-        commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout,
-                       ftpState);
+       AsyncCall::Pointer nullCall = NULL;
+        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
+
+       typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(9, 5, "FtpStateData::ftpTimeout",
+                                           TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
@@ -3244,11 +3288,11 @@ FtpStateData::handleRequestBodyProducerAborted()
 
 /* This will be called when the put write is completed */
 void
-FtpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag)
+FtpStateData::sentRequestBody(const CommIoCbParams &io)
 {
-    if (size > 0)
-        kb_incr(&statCounter.server.ftp.kbytes_out, size);
-    ServerStateData::sentRequestBody(fd, size, errflag);
+    if (io.size > 0)
+        kb_incr(&statCounter.server.ftp.kbytes_out, io.size);
+    ServerStateData::sentRequestBody(io);
 }
 
 static void
@@ -3713,7 +3757,8 @@ FtpStateData::closeServer()
 
     if (ctrl.fd > -1) {
         fwd->unregister(ctrl.fd);
-        comm_remove_close_handler(ctrl.fd, ftpSocketClosed, this);
+       comm_remove_close_handler(ctrl.fd, closeHandler);
+        closeHandler = NULL;
         comm_close(ctrl.fd);
         ctrl.fd = -1;
     }
@@ -3761,5 +3806,5 @@ FtpStateData::abortTransaction(const char *reason)
     }
     
     fwd->handleUnregisteredServerEnd();
-    delete this;
+    deleteThis("FtpStateData::abortTransaction");
 }
index 88ab0ffa93c5669bed5dfb3ccfd897ab34c210ac..3b48de45d734dfabdc705cd3f485c64e86f06d76 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: http.cc,v 1.546 2008/02/03 10:00:30 amosjeffries Exp $
+ * $Id: http.cc,v 1.547 2008/02/12 23:55:26 rousskov Exp $
  *
  * DEBUG: section 11    Hypertext Transfer Protocol (HTTP)
  * AUTHOR: Harvest Derived
@@ -71,14 +71,12 @@ CBDATA_CLASS_INIT(HttpStateData);
 
 static const char *const crlf = "\r\n";
 
-static PF httpStateFree;
-static PF httpTimeout;
 static void httpMaybeRemovePublic(StoreEntry *, http_status);
 static void copyOneHeaderFromClientsideRequestToUpstreamRequest(const HttpHeaderEntry *e, String strConnection, HttpRequest * request, HttpRequest * orig_request,
         HttpHeader * hdr_out, int we_do_ranges, http_state_flags);
 
-HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdState),
-                      header_bytes_read(0), reply_bytes_read(0), httpChunkDecoder(NULL)
+HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), ServerStateData(theFwdState),
+        header_bytes_read(0), reply_bytes_read(0), httpChunkDecoder(NULL)
 {
     debugs(11,5,HERE << "HttpStateData " << this << " created");
     ignoreCacheControl = false;
@@ -134,7 +132,10 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdStat
     /*
      * register the handler to free HTTP state data when the FD closes
      */
-    comm_add_close_handler(fd, httpStateFree, this);
+    typedef CommCbMemFunT<HttpStateData, CommCloseCbParams> Dialer;
+    closeHandler = asyncCall(9, 5, "httpStateData::httpStateConnClosed",
+                                 Dialer(this,&HttpStateData::httpStateConnClosed));
+    comm_add_close_handler(fd, closeHandler);
 }
 
 HttpStateData::~HttpStateData()
@@ -161,13 +162,20 @@ HttpStateData::dataDescriptor() const
 {
     return fd;
 }
-
+/*
 static void
 httpStateFree(int fd, void *data)
 {
     HttpStateData *httpState = static_cast<HttpStateData *>(data);
     debugs(11, 5, "httpStateFree: FD " << fd << ", httpState=" << data);
     delete httpState;
+}*/
+
+void 
+HttpStateData::httpStateConnClosed(const CommCloseCbParams &params)
+{
+    debugs(11, 5, "httpStateFree: FD " << params.fd << ", httpState=" << params.data);
+    deleteThis("HttpStateData::httpStateConnClosed");
 }
 
 int
@@ -183,15 +191,13 @@ httpCachable(const HttpRequestMethod& method)
     return 1;
 }
 
-static void
-httpTimeout(int fd, void *data)
+void
+HttpStateData::httpTimeout(const CommTimeoutCbParams &params)
 {
-    HttpStateData *httpState = static_cast<HttpStateData *>(data);
-    StoreEntry *entry = httpState->entry;
     debugs(11, 4, "httpTimeout: FD " << fd << ": '" << entry->url() << "'" );
 
     if (entry->store_status == STORE_PENDING) {
-        httpState->fwd->fail(errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT, httpState->fwd->request));
+        fwd->fail(errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT, fwd->request));
     }
 
     comm_close(fd);
@@ -946,6 +952,7 @@ HttpStateData::persistentConnStatus() const
 /*
  * This is the callback after some data has been read from the network
  */
+/*
 void
 HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
 {
@@ -956,19 +963,23 @@ HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag,
     httpState->readReply (len, flag, xerrno);
     PROF_stop(HttpStateData_readReply);
 }
-
+*/
 /* XXX this function is too long! */
 void
-HttpStateData::readReply (size_t len, comm_err_t flag, int xerrno)
+HttpStateData::readReply (const CommIoCbParams &io)
 {
     int bin;
     int clen;
-    flags.do_next_read = 0;
+    int len = io.size;
 
+    assert(fd == io.fd);
+
+    flags.do_next_read = 0;
+   
     debugs(11, 5, "httpReadReply: FD " << fd << ": len " << len << ".");
 
     // Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us
-    if (flag == COMM_ERR_CLOSING) {
+    if (io.flag == COMM_ERR_CLOSING) {
         debugs(11, 3, "http socket closing");
         return;
     }
@@ -979,15 +990,15 @@ HttpStateData::readReply (size_t len, comm_err_t flag, int xerrno)
     }
 
     // handle I/O errors
-    if (flag != COMM_OK || len < 0) {
+    if (io.flag != COMM_OK || len < 0) {
         debugs(11, 2, "httpReadReply: FD " << fd << ": read failure: " << xstrerror() << ".");
 
-        if (ignoreErrno(xerrno)) {
+        if (ignoreErrno(io.xerrno)) {
             flags.do_next_read = 1;
         } else {
             ErrorState *err;
             err = errorCon(ERR_READ_ERROR, HTTP_BAD_GATEWAY, fwd->request);
-            err->xerrno = xerrno;
+            err->xerrno = io.xerrno;
             fwd->fail(err);
             flags.do_next_read = 0;
             comm_close(fd);
@@ -1155,7 +1166,7 @@ HttpStateData::decodeAndWriteReplyBody()
 void
 HttpStateData::processReplyBody()
 {
-
+    AsyncCall::Pointer call;
     IPAddress client_addr;
 
     if (!flags.headers_parsed) {
@@ -1194,15 +1205,15 @@ HttpStateData::processReplyBody()
         (void) 0;
     } else
         switch (persistentConnStatus()) {
-
         case INCOMPLETE_MSG:
             debugs(11, 5, "processReplyBody: INCOMPLETE_MSG");
             /* Wait for more data or EOF condition */
-
             if (flags.keepalive_broken) {
-                commSetTimeout(fd, 10, NULL, NULL);
+               call = NULL;
+                commSetTimeout(fd, 10, call);
             } else {
-                commSetTimeout(fd, Config.Timeout.read, NULL, NULL);
+               call = NULL;
+                commSetTimeout(fd, Config.Timeout.read, call);
             }
 
             flags.do_next_read = 1;
@@ -1211,10 +1222,12 @@ HttpStateData::processReplyBody()
         case COMPLETE_PERSISTENT_MSG:
             debugs(11, 5, "processReplyBody: COMPLETE_PERSISTENT_MSG");
             /* yes we have to clear all these! */
-            commSetTimeout(fd, -1, NULL, NULL);
+           call = NULL;
+            commSetTimeout(fd, -1, call);
             flags.do_next_read = 0;
 
-            comm_remove_close_handler(fd, httpStateFree, this);
+           comm_remove_close_handler(fd, closeHandler);
+            closeHandler = NULL;
             fwd->unregister(fd);
 #if LINUX_TPROXY
 
@@ -1270,8 +1283,11 @@ HttpStateData::maybeReadVirginBody()
     }
 
     if (flags.do_next_read) {
-       flags.do_next_read = 0;
-       entry->delayAwareRead(fd, readBuf->space(read_sz), read_sz, ReadReplyWrapper, this);
+        flags.do_next_read = 0;
+        typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+        entry->delayAwareRead(fd, readBuf->space(read_sz), read_sz,
+            asyncCall(11, 5, "HttpStateData::readReply",
+            Dialer(this, &HttpStateData::readReply)));
     }
 }
 
@@ -1279,29 +1295,28 @@ HttpStateData::maybeReadVirginBody()
  * This will be called when request write is complete.
  */
 void
-HttpStateData::SendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
+HttpStateData::sendComplete(const CommIoCbParams &io)
 {
-    HttpStateData *httpState = static_cast<HttpStateData *>(data);
-    debugs(11, 5, "httpSendComplete: FD " << fd << ": size " << size << ": errflag " << errflag << ".");
+    debugs(11, 5, "httpSendComplete: FD " << fd << ": size " << io.size << ": errflag " << io.flag << ".");
 #if URL_CHECKSUM_DEBUG
 
     entry->mem_obj->checkUrlChecksum();
 #endif
 
-    if (size > 0) {
-        fd_bytes(fd, size, FD_WRITE);
-        kb_incr(&statCounter.server.all.kbytes_out, size);
-        kb_incr(&statCounter.server.http.kbytes_out, size);
+    if (io.size > 0) {
+        fd_bytes(fd, io.size, FD_WRITE);
+        kb_incr(&statCounter.server.all.kbytes_out, io.size);
+        kb_incr(&statCounter.server.http.kbytes_out, io.size);
     }
 
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    if (errflag) {
+    if (io.flag) {
         ErrorState *err;
-        err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, httpState->fwd->request);
-        err->xerrno = xerrno;
-        httpState->fwd->fail(err);
+        err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
+        err->xerrno = io.xerrno;
+        fwd->fail(err);
         comm_close(fd);
         return;
     }
@@ -1314,9 +1329,13 @@ HttpStateData::SendComplete(int fd, char *bufnotused, size_t size, comm_err_t er
      * the timeout for POST/PUT requests that have very large
      * request bodies.
      */
-    commSetTimeout(fd, Config.Timeout.read, httpTimeout, httpState);
+    typedef CommCbMemFunT<HttpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(11, 5, "HttpStateData::httpTimeout",
+                        TimeoutDialer(this,&HttpStateData::httpTimeout));
 
-    httpState->flags.request_sent = 1;
+    commSetTimeout(fd, Config.Timeout.read, timeoutCall);
+
+    flags.request_sent = 1;
 }
 
 // Close the HTTP server connection. Used by serverComplete().
@@ -1327,7 +1346,8 @@ HttpStateData::closeServer()
 
     if (fd >= 0) {
         fwd->unregister(fd);
-        comm_remove_close_handler(fd, httpStateFree, this);
+       comm_remove_close_handler(fd, closeHandler);
+        closeHandler = NULL;
         comm_close(fd);
         fd = -1;
     }
@@ -1754,18 +1774,24 @@ HttpStateData::sendRequest()
     MemBuf mb;
 
     debugs(11, 5, "httpSendRequest: FD " << fd << ", request " << request << ", this " << this << ".");
-
-    commSetTimeout(fd, Config.Timeout.lifetime, httpTimeout, this);
+    typedef CommCbMemFunT<HttpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(11, 5, "HttpStateData::httpTimeout",
+                        TimeoutDialer(this,&HttpStateData::httpTimeout));
+    commSetTimeout(fd, Config.Timeout.lifetime, timeoutCall);
     flags.do_next_read = 1;
     maybeReadVirginBody();
 
     if (orig_request->body_pipe != NULL) {
         if (!startRequestBodyFlow()) // register to receive body data
             return false;
-        requestSender = HttpStateData::sentRequestBodyWrapper;
+       typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+        Dialer dialer(this, &HttpStateData::sentRequestBody);
+       requestSender = asyncCall(11,5, "HttpStateData::sentRequestBody", dialer);
     } else {
         assert(!requestBodySource);
-        requestSender = HttpStateData::SendComplete;
+       typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+        Dialer dialer(this, &HttpStateData::sendComplete);
+       requestSender = asyncCall(11,5, "HttpStateData::SendComplete", dialer);
     }
 
     if (_peer != NULL) {
@@ -1805,7 +1831,7 @@ HttpStateData::sendRequest()
     mb.init();
     buildRequestPrefix(request, orig_request, entry, &mb, flags);
     debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf);
-    comm_write_mbuf(fd, &mb, requestSender, this);
+    comm_write_mbuf(fd, &mb, requestSender);
 
     return true;
 }
@@ -1846,13 +1872,22 @@ HttpStateData::doneSendingRequestBody()
 
     if (!Config.accessList.brokenPosts) {
         debugs(11, 5, "doneSendingRequestBody: No brokenPosts list");
-        HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this);
+       CommIoCbParams io(NULL);
+       io.fd=fd;
+       io.flag=COMM_OK;
+       sendComplete(io);
     } else if (!ch.fastCheck()) {
         debugs(11, 5, "doneSendingRequestBody: didn't match brokenPosts");
-        HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this);
+       CommIoCbParams io(NULL);
+       io.fd=fd;
+       io.flag=COMM_OK;
+       sendComplete(io);
     } else {
         debugs(11, 2, "doneSendingRequestBody: matched brokenPosts");
-        comm_write(fd, "\r\n", 2, HttpStateData::SendComplete, this, NULL);
+       typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+        Dialer dialer(this, &HttpStateData::sendComplete);
+       AsyncCall::Pointer call= asyncCall(11,5, "HttpStateData::SendComplete", dialer);
+        comm_write(fd, "\r\n", 2, call);
     }
 }
 
@@ -1893,17 +1928,20 @@ HttpStateData::handleRequestBodyProducerAborted()
 {
     ServerStateData::handleRequestBodyProducerAborted();
     // XXX: SendComplete(COMM_ERR_CLOSING) does little. Is it enough?
-    SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, 0, this);
+    CommIoCbParams io(NULL);
+    io.fd=fd;
+    io.flag=COMM_ERR_CLOSING;
+    sendComplete(io);
 }
 
 // called when we wrote request headers(!) or a part of the body
 void
-HttpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag)
+HttpStateData::sentRequestBody(const CommIoCbParams &io)
 {
-    if (size > 0)
-        kb_incr(&statCounter.server.http.kbytes_out, size);
+    if (io.size > 0)
+        kb_incr(&statCounter.server.http.kbytes_out, io.size);
 
-    ServerStateData::sentRequestBody(fd, size, errflag);
+    ServerStateData::sentRequestBody(io);
 }
 
 // Quickly abort the transaction
@@ -1921,7 +1959,7 @@ HttpStateData::abortTransaction(const char *reason)
     }
 
     fwd->handleUnregisteredServerEnd();
-    delete this;
+    deleteThis("HttpStateData::abortTransaction");
 }
 
 void
index ffec6722a3226aafbb0188e8f16f28aea0410b3d..db8fdf2753f15999db5d73279199883b2830d48c 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: http.h,v 1.33 2007/12/26 23:39:55 hno Exp $
+ * $Id: http.h,v 1.34 2008/02/12 23:55:26 rousskov Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -47,8 +47,6 @@ public:
     HttpStateData(FwdState *);
     ~HttpStateData();
 
-    static IOCB SendComplete;
-    static IOCB ReadReplyWrapper;
     static void httpBuildRequestHeader(HttpRequest * request,
                                        HttpRequest * orig_request,
                                        StoreEntry * entry,
@@ -60,7 +58,7 @@ public:
     bool sendRequest();
     void processReplyHeader();
     void processReplyBody();
-    void readReply(size_t len, comm_err_t flag, int xerrno);
+    void readReply(const CommIoCbParams &io);
     virtual void maybeReadVirginBody(); // read response data from the network
     int cacheableReply();
 
@@ -82,6 +80,7 @@ protected:
     virtual HttpRequest *originalRequest();
 
 private:
+    AsyncCall::Pointer closeHandler;
     enum ConnectionStatus {
         INCOMPLETE_MSG,
         COMPLETE_PERSISTENT_MSG,
@@ -107,7 +106,11 @@ private:
     bool decodeAndWriteReplyBody();
     void doneSendingRequestBody();
     void requestBodyHandler(MemBuf &);
-    virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag);
+    virtual void sentRequestBody(const CommIoCbParams &io);
+    void sendComplete(const CommIoCbParams &io);
+    void httpStateConnClosed(const CommCloseCbParams &params);
+    void httpTimeout(const CommTimeoutCbParams &params);
+
     mb_size_t buildRequestPrefix(HttpRequest * request,
                                  HttpRequest * orig_request,
                                  StoreEntry * entry,