]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Merging async-call branch changes to HEAD:
authorrousskov <>
Wed, 13 Feb 2008 06:07:52 +0000 (06:07 +0000)
committerrousskov <>
Wed, 13 Feb 2008 06:07:52 +0000 (06:07 +0000)
Async-call work replaces event-based asynchronous calls with
stand-alone implementation. The common async call API allows Squid
core do call, debug, and troubleshoot all callback handlers in a
uniform way.

An async "job" API is introduced to manage independent logical threads
or work such as protocol transaction handlers on client, server, and
ICAP sides. These jobs should communicate with each other using async
calls to minimize dependencies and avoid reentrant callback loops.

These changes will eventually improve overall code quality, debugging
quality, and Squid robustness.

Below you will find log messages from the async-call branch that are
relevant to the file(s) being committed.

        Made comm_read and comm_write calls to use the new CommCalls.

        Converted the ConnStateData related code in client_side.cc to
        use CommCalls.

        BodyPipe now uses the  new job calls interface. Many changes in
        BodyPipe's related code.

src/client_side.cc
src/client_side.h
src/client_side_reply.cc
src/client_side_request.cc
src/client_side_request.h

index a21415201853022b71e40074a7552e5bd149df28..544a566934151d3a554856ce9175411653394fe0 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side.cc,v 1.775 2008/02/11 22:25:22 rousskov Exp $
+ * $Id: client_side.cc,v 1.776 2008/02/12 23:07:52 rousskov Exp $
  *
  * DEBUG: section 33    Client-side Routines
  * AUTHOR: Duane Wessels
@@ -123,15 +123,11 @@ static ClientSocketContext *ClientSocketContextNew(ClientHttpRequest *);
 /* other */
 static IOCB clientWriteComplete;
 static IOCB clientWriteBodyComplete;
-static IOCB clientReadRequest;
-static bool clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read);
-static void clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_read);
-static PF connStateClosed;
-static PF requestTimeout;
+static bool clientParseRequest(ConnStateData * conn, bool &do_next_read);
 static PF clientLifetimeTimeout;
-static ClientSocketContext *parseHttpRequestAbort(ConnStateData::Pointer & conn,
+static ClientSocketContext *parseHttpRequestAbort(ConnStateData * conn,
         const char *uri);
-static ClientSocketContext *parseHttpRequest(ConnStateData::Pointer &, HttpParser *, HttpRequestMethod *, HttpVersion *);
+static ClientSocketContext *parseHttpRequest(ConnStateData *, HttpParser *, HttpRequestMethod *, HttpVersion *);
 #if USE_IDENT
 static IDCB clientIdentDone;
 #endif
@@ -149,18 +145,16 @@ static void clientUpdateHierCounters(HierarchyLogEntry *);
 static bool clientPingHasFinished(ping_data const *aPing);
 static void clientPrepareLogWithRequestDetails(HttpRequest *, AccessLogEntry *);
 #ifndef PURIFY
-static int connIsUsable(ConnStateData::Pointer conn);
+static int connIsUsable(ConnStateData * conn);
 #endif
 static int responseFinishedOrFailed(HttpReply * rep, StoreIOBuffer const &receivedData);
-static void ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData::Pointer & conn);
+static void ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData * conn);
 static void clientUpdateSocketStats(log_type logType, size_t size);
 
 char *skipLeadingSpace(char *aString);
-static int connReadWasError(ConnStateData::Pointer& conn, comm_err_t, int size, int xerrno);
-static int connFinishedWithConn(ConnStateData::Pointer& conn, int size);
 static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount);
-static int connKeepReadingIncompleteRequest(ConnStateData::Pointer & conn);
-static void connCancelIncompleteRequests(ConnStateData::Pointer & conn);
+static int connKeepReadingIncompleteRequest(ConnStateData * conn);
+static void connCancelIncompleteRequests(ConnStateData * conn);
 
 static ConnStateData *connStateCreate(const IPAddress &peer, const IPAddress &me, int fd, http_port_list *port);
 
@@ -204,19 +198,18 @@ ConnStateData::readSomeData()
 
     makeSpaceAvailable();
 
-    /* Make sure we are not still reading from the client side! */
-    /* XXX this could take a bit of CPU time! aiee! -- adrian */
-    assert(!comm_has_pending_read(fd));
-
-    comm_read(fd, in.addressToReadInto(), getAvailableBufferLength(), clientReadRequest, this);
+    typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::clientReadRequest",
+                                        Dialer(this, &ConnStateData::clientReadRequest));
+    comm_read(fd, in.addressToReadInto(), getAvailableBufferLength(), call);
 }
 
 
 void
-ClientSocketContext::removeFromConnectionList(ConnStateData::Pointer conn)
+ClientSocketContext::removeFromConnectionList(ConnStateData * conn)
 {
     ClientSocketContext::Pointer *tempContextPointer;
-    assert(conn != NULL);
+    assert(conn != NULL && cbdataReferenceValid(conn));
     assert(conn->getCurrentContext() != NULL);
     /* Unlink us from the connection request list */
     tempContextPointer = & conn->currentobject;
@@ -435,7 +428,7 @@ ClientHttpRequest::updateCounters()
         statCounter.client_http.errors++;
 
     clientUpdateStatHistCounters(logType,
-                                 tvSubMsec(start, current_time));
+                                 tvSubMsec(start_time, current_time));
 
     clientUpdateHierCounters(&request->hier);
 }
@@ -504,7 +497,7 @@ ClientHttpRequest::logRequest()
 
         al.cache.code = logType;
 
-        al.cache.msec = tvSubMsec(start, current_time);
+        al.cache.msec = tvSubMsec(start_time, current_time);
 
         if (request)
             clientPrepareLogWithRequestDetails(request, &al);
@@ -593,19 +586,17 @@ ConnStateData::freeAllContexts()
 }
 
 /* This is a handler normally called by comm_close() */
-static void
-connStateClosed(int fd, void *data)
+void ConnStateData::connStateClosed(const CommCloseCbParams &io)
 {
-    ConnStateData *connState = (ConnStateData *)data;
-    assert (fd == connState->fd);
-    connState->close();
+    assert (fd == io.fd);
+    close();
 }
 
 void
 ConnStateData::close()
 {
     debugs(33, 3, "ConnStateData::close: FD " << fd);
-    openReference = NULL;
+    deleteThis("ConnStateData::close");
     fd = -1;
     flags.readMoreRequests = false;
     clientdbEstablished(peer, -1);     /* decrement */
@@ -621,7 +612,7 @@ ConnStateData::close()
 bool
 ConnStateData::isOpen() const
 {
-    return openReference != NULL;
+    return cbdataReferenceValid(this);
 }
 
 ConnStateData::~ConnStateData()
@@ -712,9 +703,9 @@ clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength)
 
 #ifndef PURIFY
 int
-connIsUsable(ConnStateData::Pointer conn)
+connIsUsable(ConnStateData * conn)
 {
-    if (conn == NULL || conn->fd == -1)
+    if (conn == NULL || !cbdataReferenceValid(conn) || conn->fd == -1)
         return 0;
 
     return 1;
@@ -816,7 +807,9 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
     if (!multipartRangeRequest()) {
         size_t length = lengthToSend(bodyData.range());
         noteSentBodyBytes (length);
-        comm_write(fd(), bodyData.data, length, clientWriteBodyComplete, this, NULL);
+       AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete", 
+                                            CommIoCbPtrFun(clientWriteBodyComplete, this));
+       comm_write(fd(), bodyData.data, length, call );
         return;
     }
 
@@ -824,10 +817,12 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
     mb.init();
     packRange(bodyData, &mb);
 
-    if (mb.contentSize())
+    if (mb.contentSize()){
         /* write */
-        comm_write_mbuf(fd(), &mb, clientWriteComplete, this);
-    else
+       AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", 
+                                            CommIoCbPtrFun(clientWriteComplete, this));
+       comm_write_mbuf(fd(), &mb, call);
+    }  else
         writeComplete(fd(), NULL, 0, COMM_OK);
 }
 
@@ -1209,7 +1204,9 @@ ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData)
 
     /* write */
     debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
-    comm_write_mbuf(fd(), mb, clientWriteComplete, this);
+    AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", 
+                                        CommIoCbPtrFun(clientWriteComplete, this));
+    comm_write_mbuf(fd(), mb, call);
 
     delete mb;
 }
@@ -1308,14 +1305,17 @@ ConnStateData::readNextRequest()
     /*
      * Set the timeout BEFORE calling clientReadRequest().
      */
-    commSetTimeout(fd, Config.Timeout.persistent_request,
-                   requestTimeout, this);
+    typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(33, 5, "ConnStateData::requestTimeout",
+                TimeoutDialer(this, &ConnStateData::requestTimeout));
+    commSetTimeout(fd, Config.Timeout.persistent_request, timeoutCall);
+
     readSomeData();
     /* Please don't do anything with the FD past here! */
 }
 
 void
-ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData::Pointer & conn)
+ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData * conn)
 {
     debugs(33, 2, "ClientSocketContextPushDeferredIfNeeded: FD " << conn->fd << " Sending next");
 
@@ -1338,7 +1338,7 @@ ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredReq
 void
 ClientSocketContext::keepaliveNextRequest()
 {
-    ConnStateData::Pointer conn = http->getConn();
+    ConnStateData * conn = http->getConn();
     bool do_next_read = false;
 
     debugs(33, 3, "ClientSocketContext::keepaliveNextRequest: FD " << conn->fd);
@@ -1567,7 +1567,7 @@ ClientSocketContext::initiateClose(const char *reason)
     debugs(33, 5, HERE << "initiateClose: closing for " << reason);
 
     if (http != NULL) {
-        ConnStateData::Pointer conn = http->getConn();
+        ConnStateData * conn = http->getConn();
 
         if (conn != NULL) {
             if (const int64_t expecting = conn->bodySizeLeft()) {
@@ -1650,7 +1650,7 @@ extern "C" CSS clientReplyStatus;
 extern "C" CSD clientReplyDetach;
 
 static ClientSocketContext *
-parseHttpRequestAbort(ConnStateData::Pointer & conn, const char *uri)
+parseHttpRequestAbort(ConnStateData * conn, const char *uri)
 {
     ClientHttpRequest *http;
     ClientSocketContext *context;
@@ -1719,7 +1719,7 @@ setLogUri(ClientHttpRequest * http, char const *uri)
 }
 
 static void
-prepareAcceleratedURL(ConnStateData::Pointer & conn, ClientHttpRequest *http, char *url, const char *req_hdr)
+prepareAcceleratedURL(ConnStateData * conn, ClientHttpRequest *http, char *url, const char *req_hdr)
 {
     int vhost = conn->port->vhost;
     int vport = conn->port->vport;
@@ -1799,7 +1799,7 @@ prepareAcceleratedURL(ConnStateData::Pointer & conn, ClientHttpRequest *http, ch
 }
 
 static void
-prepareTransparentURL(ConnStateData::Pointer & conn, ClientHttpRequest *http, char *url, const char *req_hdr)
+prepareTransparentURL(ConnStateData * conn, ClientHttpRequest *http, char *url, const char *req_hdr)
 {
     char *host;
     char ntoabuf[MAX_IPSTRLEN];
@@ -1840,7 +1840,7 @@ prepareTransparentURL(ConnStateData::Pointer & conn, ClientHttpRequest *http, ch
  *  Sets result->flags.parsed_ok to 1 if we have a good request.
  */
 static ClientSocketContext *
-parseHttpRequest(ConnStateData::Pointer & conn, HttpParser *hp, HttpRequestMethod * method_p, HttpVersion *http_ver)
+parseHttpRequest(ConnStateData *conn, HttpParser *hp, HttpRequestMethod * method_p, HttpVersion *http_ver)
 {
     char *url = NULL;
     char *req_hdr = NULL;
@@ -2037,19 +2037,19 @@ ConnStateData::getConcurrentRequestCount() const
 }
 
 int
-connReadWasError(ConnStateData::Pointer & conn, comm_err_t flag, int size, int xerrno)
+ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno)
 {
     if (flag != COMM_OK) {
-        debugs(33, 2, "connReadWasError: FD " << conn->fd << ": got flag " << flag);
+        debugs(33, 2, "connReadWasError: FD " << fd << ": got flag " << flag);
         return 1;
     }
 
     if (size < 0) {
         if (!ignoreErrno(xerrno)) {
-            debugs(33, 2, "connReadWasError: FD " << conn->fd << ": " << xstrerr(xerrno));
+            debugs(33, 2, "connReadWasError: FD " << fd << ": " << xstrerr(xerrno));
             return 1;
-        } else if (conn->in.notYetUsed == 0) {
-            debugs(33, 2, "connReadWasError: FD " << conn->fd << ": no data to process (" << xstrerr(xerrno) << ")");
+        } else if (in.notYetUsed == 0) {
+            debugs(33, 2, "connReadWasError: FD " << fd << ": no data to process (" << xstrerr(xerrno) << ")");
         }
     }
 
@@ -2057,16 +2057,16 @@ connReadWasError(ConnStateData::Pointer & conn, comm_err_t flag, int size, int x
 }
 
 int
-connFinishedWithConn(ConnStateData::Pointer & conn, int size)
+ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (conn->getConcurrentRequestCount() == 0 && conn->in.notYetUsed == 0) {
+        if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) {
             /* no current or pending requests */
-            debugs(33, 4, "connFinishedWithConn: FD " << conn->fd << " closed");
+            debugs(33, 4, "connFinishedWithConn: FD " << fd << " closed");
             return 1;
         } else if (!Config.onoff.half_closed_clients) {
             /* admin doesn't want to support half-closed client sockets */
-            debugs(33, 3, "connFinishedWithConn: FD " << conn->fd << " aborted (half_closed_clients disabled)");
+            debugs(33, 3, "connFinishedWithConn: FD " << fd << " aborted (half_closed_clients disabled)");
             return 1;
         }
     }
@@ -2091,13 +2091,13 @@ connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount)
 }
 
 int
-connKeepReadingIncompleteRequest(ConnStateData::Pointer & conn)
+connKeepReadingIncompleteRequest(ConnStateData * conn)
 {
     return conn->in.notYetUsed >= Config.maxRequestHeaderSize ? 0 : 1;
 }
 
 void
-connCancelIncompleteRequests(ConnStateData::Pointer & conn)
+connCancelIncompleteRequests(ConnStateData * conn)
 {
     ClientSocketContext *context = parseHttpRequestAbort(conn, "error:request-too-large");
     clientStreamNode *node = context->getClientReplyContext();
@@ -2113,17 +2113,17 @@ connCancelIncompleteRequests(ConnStateData::Pointer & conn)
     context->pullData();
 }
 
-static void
-clientMaybeReadData(ConnStateData::Pointer &conn, int do_next_read)
+void
+ConnStateData::clientMaybeReadData(int do_next_read)
 {
     if (do_next_read) {
-        conn->flags.readMoreRequests = true;
-        conn->readSomeData();
+        flags.readMoreRequests = true;
+        readSomeData();
     }
 }
 
-static void
-clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_read)
+void
+ConnStateData::clientAfterReadingRequests(int do_next_read)
 {
     /*
      * If (1) we are reading a message body, (2) and the connection
@@ -2132,7 +2132,7 @@ clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_rea
      */
 
     if (fd_table[fd].flags.socket_eof) {
-        if ((int64_t)conn->in.notYetUsed < conn->bodySizeLeft()) {
+        if ((int64_t)in.notYetUsed < bodySizeLeft()) {
             /* Partial request received. Abort client connection! */
             debugs(33, 3, "clientAfterReadingRequests: FD " << fd << " aborted, partial request");
             comm_close(fd);
@@ -2140,11 +2140,11 @@ clientAfterReadingRequests(int fd, ConnStateData::Pointer &conn, int do_next_rea
         }
     }
 
-    clientMaybeReadData (conn, do_next_read);
+    clientMaybeReadData (do_next_read);
 }
 
 static void
-clientProcessRequest(ConnStateData::Pointer &conn, HttpParser *hp, ClientSocketContext *context, const HttpRequestMethod& method, HttpVersion http_ver)
+clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *context, const HttpRequestMethod& method, HttpVersion http_ver)
 {
     ClientHttpRequest *http = context->http;
     HttpRequest *request = NULL;
@@ -2264,7 +2264,7 @@ clientProcessRequest(ConnStateData::Pointer &conn, HttpParser *hp, ClientSocketC
         request->body_pipe = conn->expectRequestBody(request->content_length);
 
         // consume header early so that body pipe gets just the body
-        connNoteUseOfBuffer(conn.getRaw(), http->req_sz);
+        connNoteUseOfBuffer(conn, http->req_sz);
         notedUseOfBuffer = true;
 
         conn->handleRequestBodyData();
@@ -2296,7 +2296,7 @@ clientProcessRequest(ConnStateData::Pointer &conn, HttpParser *hp, ClientSocketC
     
 finish:
     if (!notedUseOfBuffer)
-        connNoteUseOfBuffer(conn.getRaw(), http->req_sz);
+        connNoteUseOfBuffer(conn, http->req_sz);
 
     /*
      * DPW 2007-05-18
@@ -2314,7 +2314,7 @@ finish:
 }
 
 static void
-connStripBufferWhitespace (ConnStateData::Pointer &conn)
+connStripBufferWhitespace (ConnStateData * conn)
 {
     while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) {
         xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1);
@@ -2323,7 +2323,7 @@ connStripBufferWhitespace (ConnStateData::Pointer &conn)
 }
 
 static int
-connOkToAddRequest(ConnStateData::Pointer &conn)
+connOkToAddRequest(ConnStateData * conn)
 {
     int result = conn->getConcurrentRequestCount() < (Config.onoff.pipeline_prefetch ? 2 : 1);
 
@@ -2362,7 +2362,7 @@ ConnStateData::bodySizeLeft()
  * scheduled.
  */
 static bool
-clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read)
+clientParseRequest(ConnStateData * conn, bool &do_next_read)
 {
     HttpRequestMethod method;
     ClientSocketContext *context;
@@ -2439,20 +2439,18 @@ clientParseRequest(ConnStateData::Pointer conn, bool &do_next_read)
     return parsed_req;
 }
 
-static void
-clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
-                  void *data)
+void
+ConnStateData::clientReadRequest(const CommIoCbParams &io)
 {
-    debugs(33,5,HERE << "clientReadRequest FD " << fd << " size " << size);
-    ConnStateData::Pointer conn ((ConnStateData *)data);
-    conn->reading(false);
+    debugs(33,5,HERE << "clientReadRequest FD " << io.fd << " size " << io.size);
+    reading(false);
     bool do_next_read = 1; /* the default _is_ to read data! - adrian */
 
-    assert (fd == conn->fd);
+    assert (io.fd == fd);
 
     /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
 
-    if (flag == COMM_ERR_CLOSING) {
+    if (io.flag == COMM_ERR_CLOSING) {
         debugs(33,5, HERE  << " FD " << fd << " closing Bailout.");
         return;
     }
@@ -2464,25 +2462,25 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
      * whole, not individual read() calls.  Plus, it breaks our
      * lame half-close detection
      */
-    if (connReadWasError(conn, flag, size, xerrno)) {
+    if (connReadWasError(io.flag, io.size, io.xerrno)) {
         comm_close(fd);
         return;
     }
 
-    if (flag == COMM_OK) {
-        if (size > 0) {
-            kb_incr(&statCounter.client_http.kbytes_in, size);
+    if (io.flag == COMM_OK) {
+        if (io.size > 0) {
+            kb_incr(&statCounter.client_http.kbytes_in, io.size);
 
-            conn->handleReadData(buf, size);
+            handleReadData(io.buf, io.size);
 
            /* The above may close the connection under our feets */
-           if (!conn->isOpen())
+           if (!isOpen())
                return;
 
-        } else if (size == 0) {
+        } else if (io.size == 0) {
             debugs(33, 5, "clientReadRequest: FD " << fd << " closed?");
 
-            if (connFinishedWithConn(conn, size)) {
+            if (connFinishedWithConn(io.size)) {
                 comm_close(fd);
                 return;
             }
@@ -2505,11 +2503,11 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
     }
 
     /* Process next request */
-    if (conn->getConcurrentRequestCount() == 0)
-        fd_note(conn->fd, "Reading next request");
+    if (getConcurrentRequestCount() == 0)
+        fd_note(fd, "Reading next request");
 
-    if (! clientParseRequest(conn, do_next_read)) {
-       if (!conn->isOpen())
+    if (! clientParseRequest(this, do_next_read)) {
+       if (!isOpen())
            return;
         /*
          * If the client here is half closed and we failed
@@ -2518,17 +2516,17 @@ clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
          * succeeds _if_ the buffer is empty which it won't
          * be if we have an incomplete request.
          */
-        if (conn->getConcurrentRequestCount() == 0 && commIsHalfClosed(fd)) {
+        if (getConcurrentRequestCount() == 0 && commIsHalfClosed(fd)) {
             debugs(33, 5, "clientReadRequest: FD " << fd << ": half-closed connection, no completed request parsed, connection closing.");
             comm_close(fd);
             return;
         }
     }
 
-    if (!conn->isOpen())
+    if (!isOpen())
         return;
 
-    clientAfterReadingRequests(fd, conn, do_next_read);
+    clientAfterReadingRequests(do_next_read);
 }
 
 // called when new request data has been read from the socket
@@ -2581,39 +2579,38 @@ ConnStateData::handleRequestBodyData()
 }
 
 void
-ConnStateData::noteMoreBodySpaceAvailable(BodyPipe &)
+ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer )
 {
     handleRequestBodyData();
 }
 
 void
-ConnStateData::noteBodyConsumerAborted(BodyPipe &)
+ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer )
 {
     if (!closing())
         startClosing("body consumer aborted");
 }
 
 /* general lifetime handler for HTTP requests */
-static void
-requestTimeout(int fd, void *data)
+void
+ConnStateData::requestTimeout(const CommTimeoutCbParams &io)
 {
 #if THIS_CONFUSES_PERSISTENT_CONNECTION_AWARE_BROWSERS_AND_USERS
-    ConnStateData *conn = data;
-    debugs(33, 3, "requestTimeout: FD " << fd << ": lifetime is expired.");
+    debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired.");
 
-    if (COMMIO_FD_WRITECB(fd)->active) {
+    if (COMMIO_FD_WRITECB(io.fd)->active) {
         /* FIXME: If this code is reinstated, check the conn counters,
          * not the fd table state
          */
         /*
          * Some data has been sent to the client, just close the FD
          */
-        comm_close(fd);
-    } else if (conn->nrequests) {
+        comm_close(io.fd);
+    } else if (nrequests) {
         /*
          * assume its a persistent connection; just close it
          */
-        comm_close(fd);
+        comm_close(io.fd);
     } else {
         /*
          * Generate an error
@@ -2621,18 +2618,18 @@ requestTimeout(int fd, void *data)
         ClientHttpRequest **H;
         clientStreamNode *node;
         ClientHttpRequest *http =
-            parseHttpRequestAbort(conn, "error:Connection%20lifetime%20expired");
+            parseHttpRequestAbort(this, "error:Connection%20lifetime%20expired");
         node = http->client_stream.tail->prev->data;
         clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
         assert (repContext);
         repContext->setReplyToError(ERR_LIFETIME_EXP,
-                                    HTTP_REQUEST_TIMEOUT, METHOD_NONE, "N/A", &conn->peer.sin_addr,
+                                    HTTP_REQUEST_TIMEOUT, METHOD_NONE, "N/A", &peer.sin_addr,
                                     NULL, NULL, NULL);
         /* No requests can be outstanded */
-        assert(conn->chr == NULL);
+        assert(chr == NULL);
         /* add to the client request queue */
 
-        for (H = &conn->chr; *H; H = &(*H)->next)
+        for (H = &chr; *H; H = &(*H)->next)
 
             ;
         *H = http;
@@ -2643,12 +2640,15 @@ requestTimeout(int fd, void *data)
         /*
          * if we don't close() here, we still need a timeout handler!
          */
-        commSetTimeout(fd, 30, requestTimeout, conn);
+       typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(33, 5, "ConnStateData::requestTimeout",
+                        TimeoutDialer(this,&ConnStateData::requestTimeout));
+       commSetTimeout(io.fd, 30, timeoutCall);
 
         /*
          * Aha, but we don't want a read handler!
          */
-        commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+        commSetSelect(io.fd, COMM_SELECT_READ, NULL, NULL, 0);
     }
 
 #else
@@ -2660,13 +2660,15 @@ requestTimeout(int fd, void *data)
     * the open has already been completed on another
     * connection)
     */
-    debugs(33, 3, "requestTimeout: FD " << fd << ": lifetime is expired.");
+    debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired.");
 
-    comm_close(fd);
+    comm_close(io.fd);
 
 #endif
 }
 
+
+
 static void
 clientLifetimeTimeout(int fd, void *data)
 {
@@ -2765,13 +2767,20 @@ httpAccept(int sock, int newfd, ConnectionDetail *details,
 
     debugs(33, 4, "httpAccept: FD " << newfd << ": accepted");
     fd_note(newfd, "client http connect");
-    connState = connStateCreate(details->peer, details->me, newfd, s);
-    comm_add_close_handler(newfd, connStateClosed, connState);
+    connState = connStateCreate(&details->peer, &details->me, newfd, s);
+
+    typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed",
+                                 Dialer(connState, &ConnStateData::connStateClosed));
+    comm_add_close_handler(newfd, call);
 
     if (Config.onoff.log_fqdn)
         fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
 
-    commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState);
+       typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(33, 5, "ConnStateData::requestTimeout",
+                        TimeoutDialer(connState,&ConnStateData::requestTimeout));
+       commSetTimeout(newfd, Config.Timeout.read, timeoutCall);
 
 #if USE_IDENT
 
@@ -2974,14 +2983,22 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details,
     if (!(ssl = httpsCreate(newfd, details, sslContext)))
         return;
 
+    debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation.");
+    fd_note(newfd, "client https connect");
     ConnStateData *connState = connStateCreate(details->peer, details->me,
         newfd, &s->http);
-    comm_add_close_handler(newfd, connStateClosed, connState);
+    typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed",
+                                 Dialer(connState, &ConnStateData::connStateClosed));
+    comm_add_close_handler(newfd, call);
 
     if (Config.onoff.log_fqdn)
         fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
 
-    commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState);
+       typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
+       AsyncCall::Pointer timeoutCall =  asyncCall(33, 5, "ConnStateData::requestTimeout",
+                        TimeoutDialer(connState,&ConnStateData::requestTimeout));
+       commSetTimeout(newfd, Config.Timeout.request, timeoutCall);
 
 #if USE_IDENT
 
@@ -3244,8 +3261,8 @@ ACLChecklist *
 clientAclChecklistCreate(const acl_access * acl, ClientHttpRequest * http)
 {
     ACLChecklist *ch;
-    ConnStateData::Pointer conn = http->getConn();
-    ch = aclChecklistCreate(acl, http->request, conn.getRaw() != NULL ? conn->rfc931 : dash_str);
+    ConnStateData * conn = http->getConn();
+    ch = aclChecklistCreate(acl, http->request, cbdataReferenceValid(conn) && conn != NULL ? conn->rfc931 : dash_str);
 
     /*
      * hack for ident ACL. It needs to get full addresses, and a place to store
@@ -3259,7 +3276,7 @@ clientAclChecklistCreate(const acl_access * acl, ClientHttpRequest * http)
      * the server end.
      */
 
-    if (conn.getRaw() != NULL)
+    if (conn != NULL)
         ch->conn(conn);        /* unreferenced in acl.cc */
 
     return ch;
@@ -3267,9 +3284,8 @@ clientAclChecklistCreate(const acl_access * acl, ClientHttpRequest * http)
 
 CBDATA_CLASS_INIT(ConnStateData);
 
-ConnStateData::ConnStateData() : transparent_ (false), reading_ (false), closing_ (false)
+ConnStateData::ConnStateData() :AsyncJob("ConnStateData"), transparent_ (false), reading_ (false), closing_ (false)
 {
-    openReference = this;
 }
 
 bool
index 149c5a23766431ceb7c8631e8e4bc29aa3783670..e7babe58cbced62896aad1666b1899c51a7320d0 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: client_side.h,v 1.29 2008/02/11 22:25:22 rousskov Exp $
+ * $Id: client_side.h,v 1.30 2008/02/12 23:07:52 rousskov Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -37,6 +37,8 @@
 #include "StoreIOBuffer.h"
 #include "BodyPipe.h"
 #include "RefCount.h"
+#include "ICAP/AsyncJob.h"
+#include "CommCalls.h"
 
 class ConnStateData;
 
@@ -109,7 +111,7 @@ unsigned parsed_ok: 1; /* Was this parsed correctly? */
     clientStreamNode * getTail() const;
     clientStreamNode * getClientReplyContext() const;
     void connIsFinished();
-    void removeFromConnectionList(RefCount<ConnStateData> conn);
+    void removeFromConnectionList(ConnStateData * conn);
     void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData);
     bool multipartRangeRequest() const;
     void registerWithConn();
@@ -126,11 +128,10 @@ private:
 };
 
 /* A connection to a socket */
-class ConnStateData : public BodyProducer, public RefCountable
+class ConnStateData : public BodyProducer/*, public RefCountable*/
 {
 
 public:
-    typedef RefCount<ConnStateData> Pointer;
 
     ConnStateData();
     ~ConnStateData();
@@ -202,12 +203,20 @@ public:
     void startClosing(const char *reason);
 
     BodyPipe::Pointer expectRequestBody(int64_t size);
-    virtual void noteMoreBodySpaceAvailable(BodyPipe &);
-    virtual void noteBodyConsumerAborted(BodyPipe &);
+    virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
+    virtual void noteBodyConsumerAborted(BodyPipe::Pointer);
 
     void handleReadData(char *buf, size_t size);
     void handleRequestBodyData();
 
+    // comm callbacks
+    void clientReadRequest(const CommIoCbParams &io);
+    void connStateClosed(const CommCloseCbParams &io);
+    void requestTimeout(const CommTimeoutCbParams &params);
+
+    // AsyncJob API
+    virtual bool doneAll() const { return BodyProducer::doneAll() && false;}
+
 #if USE_SSL
     bool switchToHttps();
     bool switchedToHttps() const { return switchedToHttps_; }
@@ -215,13 +224,19 @@ public:
     bool switchedToHttps() const { return false; }
 #endif
 
+private:
+    int connReadWasError(comm_err_t flag, int size, int xerrno);
+    int connFinishedWithConn(int size);
+    void clientMaybeReadData(int do_next_read);
+    void clientAfterReadingRequests(int do_next_read);
+
 private:
     CBDATA_CLASS2(ConnStateData);
     bool transparent_;
     bool reading_;
     bool closing_;
+
     bool switchedToHttps_;
-    Pointer openReference;
     BodyPipe::Pointer bodyPipe; // set when we are reading request body
 };
 
index bcd5d89f69d438d22c8925a810573296e252de2f..d7d495ebf6b6a5fb1635d49f5461d82cae1d4b0b 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side_reply.cc,v 1.152 2008/02/08 18:31:02 rousskov Exp $
+ * $Id: client_side_reply.cc,v 1.153 2008/02/12 23:07:52 rousskov Exp $
  *
  * DEBUG: section 88    Client-side Reply Routines
  * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c)
@@ -1906,7 +1906,7 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
 
     StoreEntry *entry = http->storeEntry();
 
-    ConnStateData::Pointer conn = http->getConn();
+    ConnStateData * conn = http->getConn();
 
     int fd = conn != NULL ? conn->fd : -1;
 
index 9ad33815c5842cebb451d9a6a8cfec5b44ee8505..8d4375f61d079518ac3b849d15c33cc3ba48fdfc 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side_request.cc,v 1.104 2008/02/11 22:26:16 rousskov Exp $
+ * $Id: client_side_request.cc,v 1.105 2008/02/12 23:07:52 rousskov Exp $
  * 
  * DEBUG: section 85    Client-side Request Routines
  * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c)
@@ -143,9 +143,13 @@ ClientHttpRequest::operator delete (void *address)
     cbdataFree(t);
 }
 
-ClientHttpRequest::ClientHttpRequest(ConnStateData::Pointer aConn) : loggingEntry_(NULL)
+ClientHttpRequest::ClientHttpRequest(ConnStateData * aConn) : 
+#if ICAP_CLIENT
+AsyncJob("ClientHttpRequest"),
+#endif
+loggingEntry_(NULL)
 {
-    start = current_time;
+    start_time = current_time;
     setConn(aConn);
     dlinkAdd(this, &active, &ClientActiveRequests);
 #if ICAP_CLIENT
@@ -261,6 +265,9 @@ ClientHttpRequest::~ClientHttpRequest()
     if (calloutContext)
         delete calloutContext;
 
+    if(conn_)
+       cbdataReferenceDone(conn_);
+
     /* moving to the next connection is handled by the context free */
     dlinkDelete(&active, &ClientActiveRequests);
 
@@ -282,7 +289,7 @@ clientBeginRequest(const HttpRequestMethod& method, char const *url, CSCB * stre
     ClientHttpRequest *http = new ClientHttpRequest(NULL);
     HttpRequest *request;
     StoreIOBuffer tempBuffer;
-    http->start = current_time;
+    http->start_time = current_time;
     /* this is only used to adjust the connection offset in client_side.c */
     http->req_sz = 0;
     tempBuffer.length = taillen;
@@ -1153,7 +1160,7 @@ ClientHttpRequest::startIcap(ICAPServiceRep::Pointer service)
     assert(!icapBodySource);
     icapHeadSource = initiateIcap(
         new ICAPModXactLauncher(this, request, NULL, service));
-    return true;
+    return icapHeadSource != NULL;
 }
 
 void
@@ -1214,7 +1221,7 @@ ClientHttpRequest::noteIcapQueryAbort(bool final)
 }
 
 void
-ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe &)
+ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
 {
     assert(request_satisfaction_mode);
     assert(icapBodySource != NULL);
@@ -1235,7 +1242,7 @@ ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe &)
 }
 
 void
-ClientHttpRequest::noteBodyProductionEnded(BodyPipe &)
+ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
 {
     assert(!icapHeadSource);
     if (icapBodySource != NULL) { // did not end request satisfaction yet
@@ -1258,7 +1265,7 @@ ClientHttpRequest::endRequestSatisfaction() {
 }
 
 void
-ClientHttpRequest::noteBodyProducerAborted(BodyPipe &)
+ClientHttpRequest::noteBodyProducerAborted(BodyPipe::Pointer)
 {
     assert(!icapHeadSource);
     stopConsumingFrom(icapBodySource);
@@ -1292,7 +1299,7 @@ ClientHttpRequest::handleIcapFailure(bool bypassable)
     // true cause of the error at this point, so I did not pass it.
     IPAddress noAddr;
     noAddr.SetNoAddr();
-    ConnStateData::Pointer c = getConn();
+    ConnStateData * c = getConn();
     repContext->setReplyToError(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR,
         request->method, NULL,
         (c != NULL ? c->peer : noAddr), request, NULL,
index d0ceaceda9a24c772dba900fa628a264d2fe385d..e7ed08501659b5b227f9a72fe160700eddf983a3 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: client_side_request.h,v 1.36 2008/02/11 22:33:48 rousskov Exp $
+ * $Id: client_side_request.h,v 1.37 2008/02/12 23:07:52 rousskov Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -41,6 +41,7 @@
 #include "client_side.h"
 #include "AccessLogEntry.h"
 #include "dlink.h"
+#include "ICAP/AsyncJob.h"
 
 #if ICAP_CLIENT
 #include "ICAP/ICAPServiceRep.h"
@@ -68,8 +69,10 @@ class ClientHttpRequest
 public:
     void *operator new (size_t);
     void operator delete (void *);
-
-    ClientHttpRequest(ConnStateData::Pointer);
+#if ICAP_CLIENT
+    void *toCbdata() { return this; }
+#endif
+    ClientHttpRequest(ConnStateData *);
     ~ClientHttpRequest();
     /* Not implemented - present to prevent synthetic operations */
     ClientHttpRequest(ClientHttpRequest const &);
@@ -90,9 +93,9 @@ public:
     _SQUID_INLINE_ StoreEntry *loggingEntry() const;
     void loggingEntry(StoreEntry *);
 
-    _SQUID_INLINE_ ConnStateData::Pointer getConn();
-    _SQUID_INLINE_ ConnStateData::Pointer const getConn() const;
-    _SQUID_INLINE_ void setConn(ConnStateData::Pointer);
+    _SQUID_INLINE_ ConnStateData * getConn();
+    _SQUID_INLINE_ ConnStateData * const getConn() const;
+    _SQUID_INLINE_ void setConn(ConnStateData *);
     HttpRequest *request;              /* Parsed URL ... */
     char *uri;
     char *log_uri;
@@ -109,7 +112,7 @@ public:
     size_t req_sz;             /* raw request size on input, not current request size */
     log_type logType;
 
-    struct timeval start;
+    struct timeval start_time;
     AccessLogEntry al;
 
     struct
@@ -147,12 +150,18 @@ unsigned int purging:
     ClientRequestContext *calloutContext;
     void doCallouts();
 
+#if ICAP_CLIENT
+//AsyncJob virtual methods
+    virtual bool doneAll() const { return ICAPInitiator::doneAll() && 
+                                      BodyConsumer::doneAll() && false;}
+#endif
+
 private:
     CBDATA_CLASS(ClientHttpRequest);
     int64_t maxReplyBodySize_;
     StoreEntry *entry_;
     StoreEntry *loggingEntry_;
-    ConnStateData::Pointer conn_;
+    ConnStateData * conn_;
 
 #if USE_SSL
 public:
@@ -175,9 +184,9 @@ private:
     virtual void noteIcapQueryAbort(bool final);
 
     // BodyConsumer API, called by BodyPipe
-    virtual void noteMoreBodyDataAvailable(BodyPipe &);
-    virtual void noteBodyProductionEnded(BodyPipe &);
-    virtual void noteBodyProducerAborted(BodyPipe &);
+    virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer);
+    virtual void noteBodyProductionEnded(BodyPipe::Pointer);
+    virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
     void endRequestSatisfaction();