]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Update the Comm:: API for read(2)
authorAmos Jeffries <squid3@treenet.co.nz>
Thu, 5 Jun 2014 08:28:20 +0000 (01:28 -0700)
committerAmos Jeffries <squid3@treenet.co.nz>
Thu, 5 Jun 2014 08:28:20 +0000 (01:28 -0700)
... using an algorithm suggested by Alex Rousskov.

The code for Comm:: read operations is shuffled into comm/libcomm.la and
the files comm/Read.{h,cc} in symmetry with the current Comm::Write API.

The new API consists of:

 * Comm::Read() which accepts the Comm::Connection pointer for the
socket to read on and an AsyncCall callback to be run when read is
ready. The Job is responsible for separately initiating read(2) or
alternative action when that callback is run.

 * Comm::ReadNow() which accepts an SBuf buffer and a CommIoCbParams
initialized to contain the Comm::Connection pointer for the socket to
read on. TheCommIoCbParams will be filled out with result flag, xerrno,
and size.
This synchronously performs read(2) operations to append bytes to the
provided buffer. It returns a comm_err_t flag for use in determining how
to handle the results and signalling one of OK, INPROGRESS, ERROR, EOF
as having happened.

comm_read() API is retained for backward compatibility during the
transitional period. However it is now deprecated and scheduled for
removal ASAP. The SBuf overloaded variant is now removed.

 * Comm::ReadCancel() - a renaming of the comm_read_cancel() AsyncCall
API. Other cancel API(s) are now deprecated and will be removed ASAP.

Code using comm_read_cancel() with AsyncCall may immediately switch to
this new API with no logic changes necessary even if they are not using
other new Comm API calls.

 * Comm::MonitorsRead() - a renaming of comm_monitors_read() AsyncCall
API. comm_monitors_read() is now removed.

Other changes:
 - the unused comm_has_pending_read_callback() API is erased.
 - the IoCallback::buf2 mechanism previously used for SBuf read I/O is
   erased.
 - ConnStateData is converted to this new API for filling its SBuf I/O
   buffer and for monitoring pinned connection closures.
 - fde::readPending() converted to new Comm::MonitorsRead() API.
 - Comm half-closed monitoring feature is also converted to this new API.

NP: one bug in ConnStateData handling of intercepted HTTPS traffic is
noted but not fixed in this patch.

25 files changed:
src/CommCalls.h
src/client_side.cc
src/client_side.h
src/comm.cc
src/comm.h
src/comm/IoCallback.cc
src/comm/IoCallback.h
src/comm/Makefile.am
src/comm/Read.cc [new file with mode: 0644]
src/comm/Read.h [new file with mode: 0644]
src/comm/comm_err_t.h
src/comm/comm_internal.h
src/dns_internal.cc
src/fde.cc
src/ftp.cc
src/gopher.cc
src/helper.cc
src/ident/Ident.cc
src/ipc/Port.cc
src/pconn.cc
src/store.cc
src/tests/stub_client_side.cc
src/tests/stub_libcomm.cc
src/tunnel.cc
src/whois.cc

index 93bbafe5e6c7ee728c36f92500fd99a5f980d401..084171b9281f0610b66a1695a25a7f4de9e2dcc3 100644 (file)
@@ -106,8 +106,6 @@ public:
     bool syncWithComm(); // see CommCommonCbParams::syncWithComm
 };
 
-class SBuf;
-
 // read/write (I/O) parameters
 class CommIoCbParams: public CommCommonCbParams
 {
@@ -120,7 +118,6 @@ public:
 public:
     char *buf;
     size_t size;
-    SBuf *buf2;  // alternative buffer for use when buf is unset
 };
 
 // close parameters
index 0cb7be8d1f2285b47f9d9431f3d21320c57b1c13..baade6c0e03d50190118585d9682795e752be219 100644 (file)
@@ -94,6 +94,7 @@
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/Loops.h"
+#include "comm/Read.h"
 #include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
 #include "CommCalls.h"
@@ -237,8 +238,8 @@ ClientSocketContext::getClientReplyContext() const
 }
 
 /**
- * This routine should be called to grow the inbuf and then
- * call comm_read().
+ * This routine should be called to grow the in.buf and then
+ * call Comm::Read().
  */
 void
 ConnStateData::readSomeData()
@@ -253,7 +254,7 @@ ConnStateData::readSomeData()
 
     typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
     reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
-    comm_read(clientConnection, in.buf, reader);
+    Comm::Read(clientConnection, reader);
 }
 
 void
@@ -2416,26 +2417,6 @@ ConnStateData::getConcurrentRequestCount() const
     return result;
 }
 
-int
-ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno)
-{
-    if (flag != COMM_OK) {
-        debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag);
-        return 1;
-    }
-
-    if (size < 0) {
-        if (!ignoreErrno(xerrno)) {
-            debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno));
-            return 1;
-        } else if (in.buf.isEmpty()) {
-            debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")");
-        }
-    }
-
-    return 0;
-}
-
 int
 ConnStateData::connFinishedWithConn(int size)
 {
@@ -2984,14 +2965,13 @@ ConnStateData::clientParseRequests()
 void
 ConnStateData::clientReadRequest(const CommIoCbParams &io)
 {
-    debugs(33,5,HERE << io.conn << " size " << io.size);
+    debugs(33,5, io.conn);
     Must(reading());
     reader = NULL;
 
     /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
-
     if (io.flag == COMM_ERR_CLOSING) {
-        debugs(33,5, HERE << io.conn << " closing Bailout.");
+        debugs(33,5, io.conn << " closing Bailout.");
         return;
     }
 
@@ -2999,47 +2979,58 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io)
     assert(io.conn->fd == clientConnection->fd);
 
     /*
-     * Don't reset the timeout value here.  The timeout value will be
-     * set to Config.Timeout.request by httpAccept() and
-     * clientWriteComplete(), and should apply to the request as a
-     * whole, not individual read() calls.  Plus, it breaks our
-     * lame half-close detection
+     * Don't reset the timeout value here. The value should be
+     * counting Config.Timeout.request and applies to the request
+     * as a whole, not individual read() calls.
+     * Plus, it breaks our lame *HalfClosed() detection
      */
-    if (connReadWasError(io.flag, io.size, io.xerrno)) {
-        notifyAllContexts(io.xerrno);
-        io.conn->close();
+
+    CommIoCbParams rd(this); // will be expanded with ReadNow results
+    rd.conn = io.conn;
+    switch (Comm::ReadNow(rd, in.buf))
+    {
+    case COMM_INPROGRESS:
+        if (in.buf.isEmpty())
+            debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
+        readSomeData();
         return;
-    }
 
-    if (io.flag == COMM_OK) {
-        if (io.size > 0) {
-            kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+    case COMM_OK:
+        kb_incr(&(statCounter.client_http.kbytes_in), rd.size);
+        // may comm_close or setReplyToError
+        if (!handleReadData())
+            return;
 
-            // may comm_close or setReplyToError
-            if (!handleReadData(io.buf2))
-                return;
+        /* Continue to process previously read data */
+        break;
 
-        } else if (io.size == 0) {
-            debugs(33, 5, HERE << io.conn << " closed?");
+    case COMM_EOF: // close detected by 0-byte read
+        debugs(33, 5, io.conn << " closed?");
 
-            if (connFinishedWithConn(io.size)) {
-                clientConnection->close();
-                return;
-            }
+        if (connFinishedWithConn(rd.size)) {
+            clientConnection->close();
+            return;
+        }
 
-            /* It might be half-closed, we can't tell */
-            fd_table[io.conn->fd].flags.socket_eof = true;
+        /* It might be half-closed, we can't tell */
+        fd_table[io.conn->fd].flags.socket_eof = true;
+        commMarkHalfClosed(io.conn->fd);
+        fd_note(io.conn->fd, "half-closed");
 
-            commMarkHalfClosed(io.conn->fd);
+        /* There is one more close check at the end, to detect aborted
+         * (partial) requests. At this point we can't tell if the request
+         * is partial.
+         */
 
-            fd_note(io.conn->fd, "half-closed");
+        /* Continue to process previously read data */
+        break;
 
-            /* There is one more close check at the end, to detect aborted
-             * (partial) requests. At this point we can't tell if the request
-             * is partial.
-             */
-            /* Continue to process previously read data */
-        }
+    // case COMM_ERROR:
+    default: // no other flags should ever occur
+        debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno));
+        notifyAllContexts(rd.xerrno);
+        io.conn->close();
+        return;
     }
 
     /* Process next request */
@@ -3077,10 +3068,8 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io)
  * \retval true  we did not call comm_close or setReplyToError
  */
 bool
-ConnStateData::handleReadData(SBuf *buf)
+ConnStateData::handleReadData()
 {
-    assert(buf == &in.buf); // XXX: make this abort the transaction if this fails
-
     // if we are reading a body, stuff data into the body pipe
     if (bodyPipe != NULL)
         return handleRequestBodyData();
@@ -3631,8 +3620,9 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data)
         // fake a CONNECT request to force connState to tunnel
         static char ip[MAX_IPSTRLEN];
         connState->clientConnection->local.toUrl(ip, sizeof(ip));
+        // XXX need to *pre-pend* this fake request to the TLS bits already in the buffer
         connState->in.buf.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n");
-        bool ret = connState->handleReadData(&connState->in.buf);
+        bool ret = connState->handleReadData();
         if (ret)
             ret = connState->clientParseRequests();
 
@@ -4298,7 +4288,7 @@ void
 ConnStateData::stopReading()
 {
     if (reading()) {
-        comm_read_cancel(clientConnection->fd, reader);
+        Comm::ReadCancel(clientConnection->fd, reader);
         reader = NULL;
     }
 }
@@ -4498,15 +4488,14 @@ ConnStateData::startPinnedConnectionMonitoring()
     typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
     pinning.readHandler = JobCallback(33, 3,
                                       Dialer, this, ConnStateData::clientPinnedConnectionRead);
-    static char unusedBuf[8];
-    comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler);
+    Comm::Read(pinning.serverConnection, pinning.readHandler);
 }
 
 void
 ConnStateData::stopPinnedConnectionMonitoring()
 {
     if (pinning.readHandler != NULL) {
-        comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler);
+        Comm::ReadCancel(pinning.serverConnection->fd, pinning.readHandler);
         pinning.readHandler = NULL;
     }
 }
index 7ec69f182b567b62d766c7ba7cdb1176bd8a291e..792dfc3c4ba5bb5777b0615297d71294996f3265 100644 (file)
@@ -290,7 +290,7 @@ public:
     virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
     virtual void noteBodyConsumerAborted(BodyPipe::Pointer);
 
-    bool handleReadData(SBuf *buf);
+    bool handleReadData();
     bool handleRequestBodyData();
 
     /**
@@ -385,7 +385,6 @@ protected:
     void clientPinnedConnectionRead(const CommIoCbParams &io);
 
 private:
-    int connReadWasError(comm_err_t flag, int size, int xerrno);
     int connFinishedWithConn(int size);
     void clientAfterReadingRequests();
     bool concurrentRequestQueueFilled() const;
index 2548680d0556a425d0aa15eaa58c7139b764ad48..653e67c5a60b9bdbc133efdbd3dffcfa53de06ec 100644 (file)
@@ -39,6 +39,7 @@
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
 #include "comm/Loops.h"
+#include "comm/Read.h"
 #include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
 #include "CommRead.h"
@@ -80,7 +81,6 @@
  * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
  */
 
-static void commStopHalfClosedMonitor(int fd);
 static IOCB commHalfClosedReader;
 static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
 static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
@@ -114,116 +114,6 @@ isOpen(const int fd)
     return fd >= 0 && fd_table && fd_table[fd].flags.open != 0;
 }
 
-/**
- * Attempt a read
- *
- * If the read attempt succeeds or fails, call the callback.
- * Else, wait for another IO notification.
- */
-void
-commHandleRead(int fd, void *data)
-{
-    Comm::IoCallback *ccb = (Comm::IoCallback *) data;
-
-    assert(data == COMMIO_FD_READCB(fd));
-    assert(ccb->active());
-    /* Attempt a read */
-    ++ statCounter.syscalls.sock.reads;
-    errno = 0;
-    int retval;
-    if (ccb->buf) {
-        retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
-        debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
-    } else {
-        assert(ccb->buf2 != NULL);
-        SBuf::size_type sz = ccb->buf2->spaceSize();
-        char *buf = ccb->buf2->rawSpace(sz);
-        retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2)
-        if (retval > 0) {
-            ccb->buf2->append(buf, retval);
-        }
-        debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno);
-    }
-
-    if (retval < 0 && !ignoreErrno(errno)) {
-        debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
-        ccb->offset = 0;
-        ccb->finish(COMM_ERROR, errno);
-        return;
-    };
-
-    /* See if we read anything */
-    /* Note - read 0 == socket EOF, which is a valid read */
-    if (retval >= 0) {
-        fd_bytes(fd, retval, FD_READ);
-        ccb->offset = retval;
-        ccb->finish(COMM_OK, errno);
-        return;
-    }
-
-    /* Nope, register for some more IO */
-    Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0);
-}
-
-/**
- * Queue a read. handler/handler_data are called when the read
- * completes, on error, or on file descriptor close.
- */
-void
-comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
-{
-    debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
-
-    /* Make sure we are open and not closing */
-    assert(Comm::IsConnOpen(conn));
-    assert(!fd_table[conn->fd].closing());
-    Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
-
-    // Make sure we are either not reading or just passively monitoring.
-    // Active/passive conflicts are OK and simply cancel passive monitoring.
-    if (ccb->active()) {
-        // if the assertion below fails, we have an active comm_read conflict
-        assert(fd_table[conn->fd].halfClosedReader != NULL);
-        commStopHalfClosedMonitor(conn->fd);
-        assert(!ccb->active());
-    }
-    ccb->conn = conn;
-
-    /* Queue the read */
-    ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
-    Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
-}
-
-/**
- * Queue a read. handler/handler_data are called when the read
- * completes, on error, or on file descriptor close.
- */
-void
-comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback)
-{
-    debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
-
-    /* Make sure we are open and not closing */
-    assert(Comm::IsConnOpen(conn));
-    assert(!fd_table[conn->fd].closing());
-    Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
-
-    // Make sure we are either not reading or just passively monitoring.
-    // Active/passive conflicts are OK and simply cancel passive monitoring.
-    if (ccb->active()) {
-        // if the assertion below fails, we have an active comm_read conflict
-        assert(fd_table[conn->fd].halfClosedReader != NULL);
-        commStopHalfClosedMonitor(conn->fd);
-        assert(!ccb->active());
-    }
-    ccb->conn = conn;
-    ccb->buf2 = &buf;
-
-    /* Queue the read */
-    ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize());
-    Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
-}
-
 /**
  * Empty the read buffers
  *
@@ -245,115 +135,6 @@ comm_empty_os_read_buffers(int fd)
 #endif
 }
 
-/**
- * Return whether the FD has a pending completed callback.
- * NP: does not work.
- */
-int
-comm_has_pending_read_callback(int fd)
-{
-    assert(isOpen(fd));
-    // XXX: We do not know whether there is a read callback scheduled.
-    // This is used for pconn management that should probably be more
-    // tightly integrated into comm to minimize the chance that a
-    // closing pconn socket will be used for a new transaction.
-    return false;
-}
-
-// Does comm check this fd for read readiness?
-// Note that when comm is not monitoring, there can be a pending callback
-// call, which may resume comm monitoring once fired.
-bool
-comm_monitors_read(int fd)
-{
-    assert(isOpen(fd) && COMMIO_FD_READCB(fd));
-    // Being active is usually the same as monitoring because we always
-    // start monitoring the FD when we configure Comm::IoCallback for I/O
-    // and we usually configure Comm::IoCallback for I/O when we starting
-    // monitoring a FD for reading.
-    return COMMIO_FD_READCB(fd)->active();
-}
-
-/**
- * Cancel a pending read. Assert that we have the right parameters,
- * and that there are no pending read events!
- *
- * XXX: We do not assert that there are no pending read events and
- * with async calls it becomes even more difficult.
- * The whole interface should be reworked to do callback->cancel()
- * instead of searching for places where the callback may be stored and
- * updating the state of those places.
- *
- * AHC Don't call the comm handlers?
- */
-void
-comm_read_cancel(int fd, IOCB *callback, void *data)
-{
-    if (!isOpen(fd)) {
-        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
-        return;
-    }
-
-    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
-    // TODO: is "active" == "monitors FD"?
-    if (!cb->active()) {
-        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
-        return;
-    }
-
-    typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
-    Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
-    if (!call) {
-        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback");
-        return;
-    }
-
-    call->cancel("old comm_read_cancel");
-
-    typedef CommIoCbParams Params;
-    const Params &params = GetCommParams<Params>(cb->callback);
-
-    /* Ok, we can be reasonably sure we won't lose any data here! */
-    assert(call->dialer.handler == callback);
-    assert(params.data == data);
-
-    /* Delete the callback */
-    cb->cancel("old comm_read_cancel");
-
-    /* And the IO event */
-    Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-}
-
-void
-comm_read_cancel(int fd, AsyncCall::Pointer &callback)
-{
-    callback->cancel("comm_read_cancel");
-
-    if (!isOpen(fd)) {
-        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
-        return;
-    }
-
-    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
-
-    if (!cb->active()) {
-        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
-        return;
-    }
-
-    AsyncCall::Pointer call = cb->callback;
-    assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL)
-
-    /* Ok, we can be reasonably sure we won't lose any data here! */
-    assert(call == callback);
-
-    /* Delete the callback */
-    cb->cancel("comm_read_cancel");
-
-    /* And the IO event */
-    Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-}
-
 /**
  * synchronous wrapper around udp socket functions
  */
@@ -1886,7 +1667,7 @@ commHalfClosedCheck(void *)
         if (!fd_table[c->fd].halfClosedReader) { // not reading already
             AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
                                                  CommIoCbPtrFun(&commHalfClosedReader, NULL));
-            comm_read(c, NULL, 0, call);
+            Comm::Read(c, call);
             fd_table[c->fd].halfClosedReader = call;
         } else
             c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD
@@ -1905,7 +1686,7 @@ commHasHalfClosedMonitor(int fd)
 }
 
 /// stop waiting for possibly half-closed connection to close
-static void
+void
 commStopHalfClosedMonitor(int const fd)
 {
     debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed);
index feb8c367886eaa7af147ca2b9df515a4dafb4892..46d377ca4f21bf42013f47bd03b18ac6226267f9 100644 (file)
@@ -77,12 +77,6 @@ void comm_add_close_handler(int fd, AsyncCall::Pointer &);
 void comm_remove_close_handler(int fd, CLCB *, void *);
 void comm_remove_close_handler(int fd, AsyncCall::Pointer &);
 
-int comm_has_pending_read_callback(int fd);
-bool comm_monitors_read(int fd);
-void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback);
-void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback);
-void comm_read_cancel(int fd, IOCB *callback, void *data);
-void comm_read_cancel(int fd, AsyncCall::Pointer &callback);
 int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from);
 int comm_udp_recv(int fd, void *buf, size_t len, int flags);
 ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags);
index 39f359429aadd3d11f865587379cf3542936ff4e..a018953668c41db40c7a21767501b840fd4b0e3a 100644 (file)
@@ -89,7 +89,6 @@ void
 Comm::IoCallback::reset()
 {
     conn = NULL;
-    buf2 = NULL; // we do not own this buffer.
     if (freefunc) {
         freefunc(buf);
         buf = NULL;
@@ -121,7 +120,6 @@ Comm::IoCallback::finish(comm_err_t code, int xerrn)
         Params &params = GetCommParams<Params>(callback);
         if (conn != NULL) params.fd = conn->fd; // for legacy write handlers...
         params.conn = conn;
-        params.buf2 = buf2;
         params.buf = buf;
         params.size = offset;
         params.flag = code;
index b2574795f5e9d935d0588784464d2edcc409c482..705ebf845c0314b2ea6a995b29c8bc3a7e56b7e1 100644 (file)
@@ -25,14 +25,6 @@ public:
     iocb_type type;
     Comm::ConnectionPointer conn;
     AsyncCall::Pointer callback;
-
-    /// Buffer to store read(2) into when set.
-    // This is a pointer to the Jobs buffer rather than an SBuf using
-    // the same store since we cannot know when or how the Job will
-    // alter its SBuf while we are reading.
-    SBuf *buf2;
-
-    // Legacy c-string buffers used when buf2 is unset.
     char *buf;
     FREE *freefunc;
     int size;
index 42ff5a4e4fb8cf634fda1cbf4219a2250dcbf650..45cfc77c9d17d714e255b066d2a2ac5137b0b62b 100644 (file)
@@ -21,6 +21,8 @@ libcomm_la_SOURCES= \
        ModPoll.cc \
        ModSelect.cc \
        ModSelectWin32.cc \
+       Read.cc \
+       Read.h \
        TcpAcceptor.cc \
        TcpAcceptor.h \
        UdpOpenDialer.h \
diff --git a/src/comm/Read.cc b/src/comm/Read.cc
new file mode 100644 (file)
index 0000000..558f3be
--- /dev/null
@@ -0,0 +1,234 @@
+/*
+ * DEBUG: section 05    Socket Functions
+ */
+#include "squid.h"
+#include "comm.h"
+#include "comm_internal.h"
+#include "CommCalls.h"
+#include "comm/IoCallback.h"
+#include "comm/Loops.h"
+#include "comm/Read.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "SBuf.h"
+#include "StatCounters.h"
+//#include "tools.h"
+
+// Does comm check this fd for read readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
+bool
+Comm::MonitorsRead(int fd)
+{
+    assert(isOpen(fd) && COMMIO_FD_READCB(fd));
+    // Being active is usually the same as monitoring because we always
+    // start monitoring the FD when we configure Comm::IoCallback for I/O
+    // and we usually configure Comm::IoCallback for I/O when we starting
+    // monitoring a FD for reading.
+    return COMMIO_FD_READCB(fd)->active();
+}
+
+void
+Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback)
+{
+    // TODO: move comm_read_base() internals into here
+    // when comm_read() char* API is no longer needed
+    comm_read_base(conn, NULL, 0, callback);
+}
+
+/**
+ * Queue a read.
+ * If a buffer is given the callback is scheduled when the read
+ * completes, on error, or on file descriptor close.
+ *
+ * If no buffer (NULL) is given the callback is scheduled when
+ * the socket FD is ready for a read(2)/recv(2).
+ */
+void
+comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
+{
+    debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
+
+    /* Make sure we are open and not closing */
+    assert(Comm::IsConnOpen(conn));
+    assert(!fd_table[conn->fd].closing());
+    Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
+
+    // Make sure we are either not reading or just passively monitoring.
+    // Active/passive conflicts are OK and simply cancel passive monitoring.
+    if (ccb->active()) {
+        // if the assertion below fails, we have an active comm_read conflict
+        assert(fd_table[conn->fd].halfClosedReader != NULL);
+        commStopHalfClosedMonitor(conn->fd);
+        assert(!ccb->active());
+    }
+    ccb->conn = conn;
+
+    /* Queue the read */
+    ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
+    Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0);
+}
+
+comm_err_t
+Comm::ReadNow(CommIoCbParams &params, SBuf &buf)
+{
+    /* Attempt a read */
+    ++ statCounter.syscalls.sock.reads;
+    const SBuf::size_type sz = buf.spaceSize();
+    char *theBuf = buf.rawSpace(sz);
+    errno = 0;
+    const int retval = FD_READ_METHOD(params.conn->fd, theBuf, sz);
+    params.xerrno = errno;
+
+    debugs(5, 3, params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno);
+
+    if (retval > 0) { // data read most common case
+        buf.append(theBuf, retval);
+        fd_bytes(params.conn->fd, retval, FD_READ);
+        params.flag = COMM_OK;
+        params.size = retval;
+
+    } else if (retval == 0) { // remote closure (somewhat less) common
+        // Note - read 0 == socket EOF, which is a valid read.
+        params.flag = COMM_EOF;
+
+    } else if (retval < 0) { // connection errors are worst-case
+        debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerr(params.xerrno));
+        if (ignoreErrno(params.xerrno))
+            params.flag =  COMM_INPROGRESS;
+        else
+            params.flag =  COMM_ERROR;
+    }
+
+    return params.flag;
+}
+
+/**
+ * Handle an FD which is ready for read(2).
+ *
+ * If there is no provided buffer to fill call the callback.
+ *
+ * Otherwise attempt a read into the provided buffer.
+ * If the read attempt succeeds or fails, call the callback.
+ * Else, wait for another IO notification.
+ */
+void
+Comm::HandleRead(int fd, void *data)
+{
+    Comm::IoCallback *ccb = (Comm::IoCallback *) data;
+
+    assert(data == COMMIO_FD_READCB(fd));
+    assert(ccb->active());
+
+    // without a buffer, just call back
+    if (!ccb->buf) {
+        ccb->finish(COMM_OK, 0);
+        return;
+    }
+
+    /* For legacy callers : Attempt a read */
+    // Keep in sync with Comm::ReadNow()!
+    ++ statCounter.syscalls.sock.reads;
+    errno = 0;
+    int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
+    debugs(5, 3, "FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
+
+    /* See if we read anything */
+    /* Note - read 0 == socket EOF, which is a valid read */
+    if (retval >= 0) {
+        fd_bytes(fd, retval, FD_READ);
+        ccb->offset = retval;
+        ccb->finish(COMM_OK, errno);
+        return;
+
+    } else if (retval < 0 && !ignoreErrno(errno)) {
+        debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
+        ccb->offset = 0;
+        ccb->finish(COMM_ERROR, errno);
+        return;
+    };
+
+
+    /* Nope, register for some more IO */
+    Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0);
+}
+
+/**
+ * Cancel a pending read. Assert that we have the right parameters,
+ * and that there are no pending read events!
+ *
+ * XXX: We do not assert that there are no pending read events and
+ * with async calls it becomes even more difficult.
+ * The whole interface should be reworked to do callback->cancel()
+ * instead of searching for places where the callback may be stored and
+ * updating the state of those places.
+ *
+ * AHC Don't call the comm handlers?
+ */
+void
+comm_read_cancel(int fd, IOCB *callback, void *data)
+{
+    if (!isOpen(fd)) {
+        debugs(5, 4, "fails: FD " << fd << " closed");
+        return;
+    }
+
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
+    // TODO: is "active" == "monitors FD"?
+    if (!cb->active()) {
+        debugs(5, 4, "fails: FD " << fd << " inactive");
+        return;
+    }
+
+    typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
+    Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
+    if (!call) {
+        debugs(5, 4, "fails: FD " << fd << " lacks callback");
+        return;
+    }
+
+    call->cancel("old comm_read_cancel");
+
+    typedef CommIoCbParams Params;
+    const Params &params = GetCommParams<Params>(cb->callback);
+
+    /* Ok, we can be reasonably sure we won't lose any data here! */
+    assert(call->dialer.handler == callback);
+    assert(params.data == data);
+
+    /* Delete the callback */
+    cb->cancel("old comm_read_cancel");
+
+    /* And the IO event */
+    Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+}
+
+void
+Comm::ReadCancel(int fd, AsyncCall::Pointer &callback)
+{
+    callback->cancel("comm_read_cancel");
+
+    if (!isOpen(fd)) {
+        debugs(5, 4, "fails: FD " << fd << " closed");
+        return;
+    }
+
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
+
+    if (!cb->active()) {
+        debugs(5, 4, "fails: FD " << fd << " inactive");
+        return;
+    }
+
+    AsyncCall::Pointer call = cb->callback;
+
+    /* Ok, we can be reasonably sure we won't lose any data here! */
+    assert(call == callback);
+
+    /* Delete the callback */
+    cb->cancel("comm_read_cancel");
+
+    /* And the IO event */
+    Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+}
diff --git a/src/comm/Read.h b/src/comm/Read.h
new file mode 100644 (file)
index 0000000..3ba7f34
--- /dev/null
@@ -0,0 +1,55 @@
+#ifndef _SQUID_COMM_READ_H
+#define _SQUID_COMM_READ_H
+
+#include "base/AsyncCall.h"
+#include "CommCalls.h"
+#include "comm/forward.h"
+
+class SBuf;
+
+namespace Comm
+{
+
+/**
+ * Start monitoring for read.
+ *
+ * callback is scheduled when the read is possible,
+ * or on file descriptor close.
+ */
+void Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback);
+
+/// whether the FD socket is being monitored for read
+bool MonitorsRead(int fd);
+
+/**
+ * Perform a read(2) on a connection immediately.
+ *
+ * The returned flag is also placed in params.flag.
+ *
+ * \retval COMM_OK          data has been read and placed in buf, amount in params.size
+ * \retval COMM_ERROR       an error occured, the code is placed in params.xerrno
+ * \retval COMM_INPROGRESS  unable to read at this time, or a minor error occured
+ * \retval COMM_ERR_CLOSING 0-byte read has occured.
+ *                          Usually indicates the remote end has disconnected.
+ */
+comm_err_t ReadNow(CommIoCbParams &params, SBuf &buf);
+
+/// Cancel the read pending on FD. No action if none pending.
+void ReadCancel(int fd, AsyncCall::Pointer &callback);
+
+/// callback handler to process an FD which is available for reading
+extern PF HandleRead;
+
+} // namespace Comm
+
+// Legacy API to be removed
+void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback);
+inline void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
+{
+    assert(buf != NULL);
+    comm_read_base(conn, buf, len, callback);
+}
+void comm_read_cancel(int fd, IOCB *callback, void *data);
+inline void comm_read_cancel(int fd, AsyncCall::Pointer &callback) {Comm::ReadCancel(fd,callback);}
+
+#endif /* _SQUID_COMM_READ_H */
index 1b22d3bbfc20e6e0188579930e8dbb394df22788..88148569ac2438ec62e328773cc0091fd3cd3db0 100644 (file)
@@ -13,6 +13,7 @@ typedef enum {
     COMM_ERR_DNS = -9,
     COMM_ERR_CLOSING = -10,
     COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+    COMM_EOF = -12, /**< read(2) returned success, but with 0 bytes */
     COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
 } comm_err_t;
 
index b0f8efb9901018c2a6f2ceb012911dbf5e157c59..ba2a1ff6d6e89b32e911db70def511465742b394 100644 (file)
@@ -12,5 +12,6 @@ typedef struct _fd_debug_t fd_debug_t;
 extern fd_debug_t *fdd_table;
 
 bool isOpen(const int fd);
+void commStopHalfClosedMonitor(int fd);
 
 #endif
index 120e9dd03f2fbea0cae15b83cfc93901b7d9c896..3cca7759494e297535a574526bbc5482e43040d4 100644 (file)
@@ -36,6 +36,7 @@
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
 #include "comm/Loops.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "dlink.h"
 #include "event.h"
index 6128efd59c52e862dbccec1ee547492a04624c07..705a33414aea9be3693a6bd06d7d7bb19ce837ce 100644 (file)
@@ -32,7 +32,7 @@
  */
 
 #include "squid.h"
-#include "comm.h"
+#include "comm/Read.h"
 #include "fde.h"
 #include "globals.h"
 #include "SquidTime.h"
@@ -44,7 +44,7 @@ bool
 fde::readPending(int fdNumber)
 {
     if (type == FD_SOCKET)
-        return comm_monitors_read(fdNumber);
+        return Comm::MonitorsRead(fdNumber);
 
     return read_handler ? true : false ;
 }
index 2acaafee9ea807d541f14cebfba5538e697b50da..172d371d9c808d97e38bad1961bed0eb221f8fb5 100644 (file)
@@ -34,6 +34,7 @@
 #include "acl/FilledChecklist.h"
 #include "comm.h"
 #include "comm/ConnOpener.h"
+#include "comm/Read.h"
 #include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
 #include "CommCalls.h"
index 8a467737fbc53590dad564cfea9f09a6f28e9b2a..c72039ec5c84d93ab3aff844696d9ea9ba921f18 100644 (file)
@@ -31,6 +31,7 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "errorpage.h"
 #include "fd.h"
index c28ea14cf4617804ae76ec9a73d5c2f45f1301a2..04bc003bc68b2e80ec8c70899ecc8d61fae76616 100644 (file)
@@ -34,6 +34,7 @@
 #include "base/AsyncCbdataCalls.h"
 #include "comm.h"
 #include "comm/Connection.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
index 1f90afebec34ee18ac76af3aaba953f41ae55c58..064d52ac220ba4162a91dfc4bc11e3e5dabf4268 100644 (file)
@@ -35,6 +35,7 @@
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "CommCalls.h"
 #include "globals.h"
index 2535d05b25c4c1b8aa26ab5fadc32be0c713cacd..79d8820b6d41ddd18531749100ec8a4581f740da 100644 (file)
@@ -6,6 +6,7 @@
 #include "squid.h"
 #include "comm.h"
 #include "comm/Connection.h"
+#include "comm/Read.h"
 #include "CommCalls.h"
 #include "globals.h"
 #include "ipc/Port.h"
index 0ae66630f46cf1eb471f5039da65ec44d01234c1..4d115612b24054feb2b19ac7a30f64835f9ff7d7 100644 (file)
@@ -34,6 +34,7 @@
 #include "CachePeer.h"
 #include "comm.h"
 #include "comm/Connection.h"
+#include "comm/Read.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
index ebea92f716578ce21fc1579ac5e366ccaa01a691..207061db9bb62d10604332347e823b7ecfb4d3f1 100644 (file)
@@ -35,6 +35,7 @@
 #include "CacheDigest.h"
 #include "CacheManager.h"
 #include "comm/Connection.h"
+#include "comm/Read.h"
 #include "ETag.h"
 #include "event.h"
 #include "fde.h"
index d1a4fc94e9a22f2ea1c5c0add44942353c00eac8..662be12ea648131b3b90c75800c32a3bb13f26a8 100644 (file)
@@ -51,7 +51,7 @@ void ConnStateData::stopSending(const char *error) STUB
 void ConnStateData::expectNoForwarding() STUB
 void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB
 void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB
-bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false)
+bool ConnStateData::handleReadData() STUB_RETVAL(false)
 bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false)
 void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB
 void ConnStateData::unpinConnection() STUB
index 16f19d1ba3bb7bd9ad1f473bc8d6c3063cd9095a..f31768877d83e53fb940e25876ff0866447da75b 100644 (file)
@@ -48,6 +48,16 @@ void Comm::ResetSelect(int) STUB
 comm_err_t Comm::DoSelect(int) STUB_RETVAL(COMM_ERROR)
 void Comm::QuickPollRequired(void) STUB
 
+#include "comm/Read.h"
+void Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) STUB
+bool Comm::MonitorsRead(int fd) STUB_RETVAL(false)
+comm_err_t Comm::ReadNow(CommIoCbParams &params, SBuf &buf) STUB_RETVAL(COMM_ERROR)
+void Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) STUB
+//void Comm::HandleRead(int, void*) STUB
+
+void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) STUB
+void comm_read_cancel(int fd, IOCB *callback, void *data) STUB
+
 #include "comm/TcpAcceptor.h"
 //Comm::TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub) STUB
 void Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) STUB
index ffabddbd778713032e062d5e8e6fbfe097a9c2be..08228155ee0b89fbd748671bafb0027cce8933c3 100644 (file)
@@ -40,6 +40,7 @@
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "errorpage.h"
 #include "fde.h"
index 7cfb39e016cd51f85982d9d7c9baea0e5a6a8bd5..00a9f792c6ad651a21c1c40c483ba0e7089b7519 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "errorpage.h"
 #include "FwdState.h"