From: Amos Jeffries Date: Thu, 5 Jun 2014 08:28:20 +0000 (-0700) Subject: Update the Comm:: API for read(2) X-Git-Tag: SQUID_3_5_0_1~203 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=7e66d5e2a0145f95566f5a83e854364a3d95199c;p=thirdparty%2Fsquid.git Update the Comm:: API for read(2) ... using an algorithm suggested by Alex Rousskov. The code for Comm:: read operations is shuffled into comm/libcomm.la and the files comm/Read.{h,cc} in symmetry with the current Comm::Write API. The new API consists of: * Comm::Read() which accepts the Comm::Connection pointer for the socket to read on and an AsyncCall callback to be run when read is ready. The Job is responsible for separately initiating read(2) or alternative action when that callback is run. * Comm::ReadNow() which accepts an SBuf buffer and a CommIoCbParams initialized to contain the Comm::Connection pointer for the socket to read on. TheCommIoCbParams will be filled out with result flag, xerrno, and size. This synchronously performs read(2) operations to append bytes to the provided buffer. It returns a comm_err_t flag for use in determining how to handle the results and signalling one of OK, INPROGRESS, ERROR, EOF as having happened. comm_read() API is retained for backward compatibility during the transitional period. However it is now deprecated and scheduled for removal ASAP. The SBuf overloaded variant is now removed. * Comm::ReadCancel() - a renaming of the comm_read_cancel() AsyncCall API. Other cancel API(s) are now deprecated and will be removed ASAP. Code using comm_read_cancel() with AsyncCall may immediately switch to this new API with no logic changes necessary even if they are not using other new Comm API calls. * Comm::MonitorsRead() - a renaming of comm_monitors_read() AsyncCall API. comm_monitors_read() is now removed. Other changes: - the unused comm_has_pending_read_callback() API is erased. - the IoCallback::buf2 mechanism previously used for SBuf read I/O is erased. - ConnStateData is converted to this new API for filling its SBuf I/O buffer and for monitoring pinned connection closures. - fde::readPending() converted to new Comm::MonitorsRead() API. - Comm half-closed monitoring feature is also converted to this new API. NP: one bug in ConnStateData handling of intercepted HTTPS traffic is noted but not fixed in this patch. --- diff --git a/src/CommCalls.h b/src/CommCalls.h index 93bbafe5e6..084171b928 100644 --- a/src/CommCalls.h +++ b/src/CommCalls.h @@ -106,8 +106,6 @@ public: bool syncWithComm(); // see CommCommonCbParams::syncWithComm }; -class SBuf; - // read/write (I/O) parameters class CommIoCbParams: public CommCommonCbParams { @@ -120,7 +118,6 @@ public: public: char *buf; size_t size; - SBuf *buf2; // alternative buffer for use when buf is unset }; // close parameters diff --git a/src/client_side.cc b/src/client_side.cc index 0cb7be8d1f..baade6c0e0 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -94,6 +94,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" @@ -237,8 +238,8 @@ ClientSocketContext::getClientReplyContext() const } /** - * This routine should be called to grow the inbuf and then - * call comm_read(). + * This routine should be called to grow the in.buf and then + * call Comm::Read(). */ void ConnStateData::readSomeData() @@ -253,7 +254,7 @@ ConnStateData::readSomeData() typedef CommCbMemFunT Dialer; reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.buf, reader); + Comm::Read(clientConnection, reader); } void @@ -2416,26 +2417,6 @@ ConnStateData::getConcurrentRequestCount() const return result; } -int -ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno) -{ - if (flag != COMM_OK) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag); - return 1; - } - - if (size < 0) { - if (!ignoreErrno(xerrno)) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); - return 1; - } else if (in.buf.isEmpty()) { - debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); - } - } - - return 0; -} - int ConnStateData::connFinishedWithConn(int size) { @@ -2984,14 +2965,13 @@ ConnStateData::clientParseRequests() void ConnStateData::clientReadRequest(const CommIoCbParams &io) { - debugs(33,5,HERE << io.conn << " size " << io.size); + debugs(33,5, io.conn); Must(reading()); reader = NULL; /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ - if (io.flag == COMM_ERR_CLOSING) { - debugs(33,5, HERE << io.conn << " closing Bailout."); + debugs(33,5, io.conn << " closing Bailout."); return; } @@ -2999,47 +2979,58 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io) assert(io.conn->fd == clientConnection->fd); /* - * Don't reset the timeout value here. The timeout value will be - * set to Config.Timeout.request by httpAccept() and - * clientWriteComplete(), and should apply to the request as a - * whole, not individual read() calls. Plus, it breaks our - * lame half-close detection + * Don't reset the timeout value here. The value should be + * counting Config.Timeout.request and applies to the request + * as a whole, not individual read() calls. + * Plus, it breaks our lame *HalfClosed() detection */ - if (connReadWasError(io.flag, io.size, io.xerrno)) { - notifyAllContexts(io.xerrno); - io.conn->close(); + + CommIoCbParams rd(this); // will be expanded with ReadNow results + rd.conn = io.conn; + switch (Comm::ReadNow(rd, in.buf)) + { + case COMM_INPROGRESS: + if (in.buf.isEmpty()) + debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); + readSomeData(); return; - } - if (io.flag == COMM_OK) { - if (io.size > 0) { - kb_incr(&(statCounter.client_http.kbytes_in), io.size); + case COMM_OK: + kb_incr(&(statCounter.client_http.kbytes_in), rd.size); + // may comm_close or setReplyToError + if (!handleReadData()) + return; - // may comm_close or setReplyToError - if (!handleReadData(io.buf2)) - return; + /* Continue to process previously read data */ + break; - } else if (io.size == 0) { - debugs(33, 5, HERE << io.conn << " closed?"); + case COMM_EOF: // close detected by 0-byte read + debugs(33, 5, io.conn << " closed?"); - if (connFinishedWithConn(io.size)) { - clientConnection->close(); - return; - } + if (connFinishedWithConn(rd.size)) { + clientConnection->close(); + return; + } - /* It might be half-closed, we can't tell */ - fd_table[io.conn->fd].flags.socket_eof = true; + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); - commMarkHalfClosed(io.conn->fd); + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ - fd_note(io.conn->fd, "half-closed"); + /* Continue to process previously read data */ + break; - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } + // case COMM_ERROR: + default: // no other flags should ever occur + debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno)); + notifyAllContexts(rd.xerrno); + io.conn->close(); + return; } /* Process next request */ @@ -3077,10 +3068,8 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io) * \retval true we did not call comm_close or setReplyToError */ bool -ConnStateData::handleReadData(SBuf *buf) +ConnStateData::handleReadData() { - assert(buf == &in.buf); // XXX: make this abort the transaction if this fails - // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) return handleRequestBodyData(); @@ -3631,8 +3620,9 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data) // fake a CONNECT request to force connState to tunnel static char ip[MAX_IPSTRLEN]; connState->clientConnection->local.toUrl(ip, sizeof(ip)); + // XXX need to *pre-pend* this fake request to the TLS bits already in the buffer connState->in.buf.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n"); - bool ret = connState->handleReadData(&connState->in.buf); + bool ret = connState->handleReadData(); if (ret) ret = connState->clientParseRequests(); @@ -4298,7 +4288,7 @@ void ConnStateData::stopReading() { if (reading()) { - comm_read_cancel(clientConnection->fd, reader); + Comm::ReadCancel(clientConnection->fd, reader); reader = NULL; } } @@ -4498,15 +4488,14 @@ ConnStateData::startPinnedConnectionMonitoring() typedef CommCbMemFunT Dialer; pinning.readHandler = JobCallback(33, 3, Dialer, this, ConnStateData::clientPinnedConnectionRead); - static char unusedBuf[8]; - comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler); + Comm::Read(pinning.serverConnection, pinning.readHandler); } void ConnStateData::stopPinnedConnectionMonitoring() { if (pinning.readHandler != NULL) { - comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler); + Comm::ReadCancel(pinning.serverConnection->fd, pinning.readHandler); pinning.readHandler = NULL; } } diff --git a/src/client_side.h b/src/client_side.h index 7ec69f182b..792dfc3c4b 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -290,7 +290,7 @@ public: virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - bool handleReadData(SBuf *buf); + bool handleReadData(); bool handleRequestBodyData(); /** @@ -385,7 +385,6 @@ protected: void clientPinnedConnectionRead(const CommIoCbParams &io); private: - int connReadWasError(comm_err_t flag, int size, int xerrno); int connFinishedWithConn(int size); void clientAfterReadingRequests(); bool concurrentRequestQueueFilled() const; diff --git a/src/comm.cc b/src/comm.cc index 2548680d05..653e67c5a6 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -39,6 +39,7 @@ #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommRead.h" @@ -80,7 +81,6 @@ * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ -static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI); @@ -114,116 +114,6 @@ isOpen(const int fd) return fd >= 0 && fd_table && fd_table[fd].flags.open != 0; } -/** - * Attempt a read - * - * If the read attempt succeeds or fails, call the callback. - * Else, wait for another IO notification. - */ -void -commHandleRead(int fd, void *data) -{ - Comm::IoCallback *ccb = (Comm::IoCallback *) data; - - assert(data == COMMIO_FD_READCB(fd)); - assert(ccb->active()); - /* Attempt a read */ - ++ statCounter.syscalls.sock.reads; - errno = 0; - int retval; - if (ccb->buf) { - retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); - debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); - } else { - assert(ccb->buf2 != NULL); - SBuf::size_type sz = ccb->buf2->spaceSize(); - char *buf = ccb->buf2->rawSpace(sz); - retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2) - if (retval > 0) { - ccb->buf2->append(buf, retval); - } - debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno); - } - - if (retval < 0 && !ignoreErrno(errno)) { - debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); - ccb->offset = 0; - ccb->finish(COMM_ERROR, errno); - return; - }; - - /* See if we read anything */ - /* Note - read 0 == socket EOF, which is a valid read */ - if (retval >= 0) { - fd_bytes(fd, retval, FD_READ); - ccb->offset = retval; - ccb->finish(COMM_OK, errno); - return; - } - - /* Nope, register for some more IO */ - Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ -void -comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback) -{ - debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); - - /* Make sure we are open and not closing */ - assert(Comm::IsConnOpen(conn)); - assert(!fd_table[conn->fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); - - // Make sure we are either not reading or just passively monitoring. - // Active/passive conflicts are OK and simply cancel passive monitoring. - if (ccb->active()) { - // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[conn->fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(conn->fd); - assert(!ccb->active()); - } - ccb->conn = conn; - ccb->buf2 = &buf; - - /* Queue the read */ - ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize()); - Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); -} - /** * Empty the read buffers * @@ -245,115 +135,6 @@ comm_empty_os_read_buffers(int fd) #endif } -/** - * Return whether the FD has a pending completed callback. - * NP: does not work. - */ -int -comm_has_pending_read_callback(int fd) -{ - assert(isOpen(fd)); - // XXX: We do not know whether there is a read callback scheduled. - // This is used for pconn management that should probably be more - // tightly integrated into comm to minimize the chance that a - // closing pconn socket will be used for a new transaction. - return false; -} - -// Does comm check this fd for read readiness? -// Note that when comm is not monitoring, there can be a pending callback -// call, which may resume comm monitoring once fired. -bool -comm_monitors_read(int fd) -{ - assert(isOpen(fd) && COMMIO_FD_READCB(fd)); - // Being active is usually the same as monitoring because we always - // start monitoring the FD when we configure Comm::IoCallback for I/O - // and we usually configure Comm::IoCallback for I/O when we starting - // monitoring a FD for reading. - return COMMIO_FD_READCB(fd)->active(); -} - -/** - * Cancel a pending read. Assert that we have the right parameters, - * and that there are no pending read events! - * - * XXX: We do not assert that there are no pending read events and - * with async calls it becomes even more difficult. - * The whole interface should be reworked to do callback->cancel() - * instead of searching for places where the callback may be stored and - * updating the state of those places. - * - * AHC Don't call the comm handlers? - */ -void -comm_read_cancel(int fd, IOCB *callback, void *data) -{ - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - // TODO: is "active" == "monitors FD"? - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - typedef CommCbFunPtrCallT Call; - Call *call = dynamic_cast(cb->callback.getRaw()); - if (!call) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback"); - return; - } - - call->cancel("old comm_read_cancel"); - - typedef CommIoCbParams Params; - const Params ¶ms = GetCommParams(cb->callback); - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call->dialer.handler == callback); - assert(params.data == data); - - /* Delete the callback */ - cb->cancel("old comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - -void -comm_read_cancel(int fd, AsyncCall::Pointer &callback) -{ - callback->cancel("comm_read_cancel"); - - if (!isOpen(fd)) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed"); - return; - } - - Comm::IoCallback *cb = COMMIO_FD_READCB(fd); - - if (!cb->active()) { - debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive"); - return; - } - - AsyncCall::Pointer call = cb->callback; - assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL) - - /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(call == callback); - - /* Delete the callback */ - cb->cancel("comm_read_cancel"); - - /* And the IO event */ - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); -} - /** * synchronous wrapper around udp socket functions */ @@ -1886,7 +1667,7 @@ commHalfClosedCheck(void *) if (!fd_table[c->fd].halfClosedReader) { // not reading already AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader", CommIoCbPtrFun(&commHalfClosedReader, NULL)); - comm_read(c, NULL, 0, call); + Comm::Read(c, call); fd_table[c->fd].halfClosedReader = call; } else c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD @@ -1905,7 +1686,7 @@ commHasHalfClosedMonitor(int fd) } /// stop waiting for possibly half-closed connection to close -static void +void commStopHalfClosedMonitor(int const fd) { debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed); diff --git a/src/comm.h b/src/comm.h index feb8c36788..46d377ca4f 100644 --- a/src/comm.h +++ b/src/comm.h @@ -77,12 +77,6 @@ void comm_add_close_handler(int fd, AsyncCall::Pointer &); void comm_remove_close_handler(int fd, CLCB *, void *); void comm_remove_close_handler(int fd, AsyncCall::Pointer &); -int comm_has_pending_read_callback(int fd); -bool comm_monitors_read(int fd); -void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); -void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback); -void comm_read_cancel(int fd, IOCB *callback, void *data); -void comm_read_cancel(int fd, AsyncCall::Pointer &callback); int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from); int comm_udp_recv(int fd, void *buf, size_t len, int flags); ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags); diff --git a/src/comm/IoCallback.cc b/src/comm/IoCallback.cc index 39f359429a..a018953668 100644 --- a/src/comm/IoCallback.cc +++ b/src/comm/IoCallback.cc @@ -89,7 +89,6 @@ void Comm::IoCallback::reset() { conn = NULL; - buf2 = NULL; // we do not own this buffer. if (freefunc) { freefunc(buf); buf = NULL; @@ -121,7 +120,6 @@ Comm::IoCallback::finish(comm_err_t code, int xerrn) Params ¶ms = GetCommParams(callback); if (conn != NULL) params.fd = conn->fd; // for legacy write handlers... params.conn = conn; - params.buf2 = buf2; params.buf = buf; params.size = offset; params.flag = code; diff --git a/src/comm/IoCallback.h b/src/comm/IoCallback.h index b2574795f5..705ebf845c 100644 --- a/src/comm/IoCallback.h +++ b/src/comm/IoCallback.h @@ -25,14 +25,6 @@ public: iocb_type type; Comm::ConnectionPointer conn; AsyncCall::Pointer callback; - - /// Buffer to store read(2) into when set. - // This is a pointer to the Jobs buffer rather than an SBuf using - // the same store since we cannot know when or how the Job will - // alter its SBuf while we are reading. - SBuf *buf2; - - // Legacy c-string buffers used when buf2 is unset. char *buf; FREE *freefunc; int size; diff --git a/src/comm/Makefile.am b/src/comm/Makefile.am index 42ff5a4e4f..45cfc77c9d 100644 --- a/src/comm/Makefile.am +++ b/src/comm/Makefile.am @@ -21,6 +21,8 @@ libcomm_la_SOURCES= \ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + Read.cc \ + Read.h \ TcpAcceptor.cc \ TcpAcceptor.h \ UdpOpenDialer.h \ diff --git a/src/comm/Read.cc b/src/comm/Read.cc new file mode 100644 index 0000000000..558f3be9ce --- /dev/null +++ b/src/comm/Read.cc @@ -0,0 +1,234 @@ +/* + * DEBUG: section 05 Socket Functions + */ +#include "squid.h" +#include "comm.h" +#include "comm_internal.h" +#include "CommCalls.h" +#include "comm/IoCallback.h" +#include "comm/Loops.h" +#include "comm/Read.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "SBuf.h" +#include "StatCounters.h" +//#include "tools.h" + +// Does comm check this fd for read readiness? +// Note that when comm is not monitoring, there can be a pending callback +// call, which may resume comm monitoring once fired. +bool +Comm::MonitorsRead(int fd) +{ + assert(isOpen(fd) && COMMIO_FD_READCB(fd)); + // Being active is usually the same as monitoring because we always + // start monitoring the FD when we configure Comm::IoCallback for I/O + // and we usually configure Comm::IoCallback for I/O when we starting + // monitoring a FD for reading. + return COMMIO_FD_READCB(fd)->active(); +} + +void +Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) +{ + // TODO: move comm_read_base() internals into here + // when comm_read() char* API is no longer needed + comm_read_base(conn, NULL, 0, callback); +} + +/** + * Queue a read. + * If a buffer is given the callback is scheduled when the read + * completes, on error, or on file descriptor close. + * + * If no buffer (NULL) is given the callback is scheduled when + * the socket FD is ready for a read(2)/recv(2). + */ +void +comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) +{ + debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); + + /* Make sure we are open and not closing */ + assert(Comm::IsConnOpen(conn)); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); + + // Make sure we are either not reading or just passively monitoring. + // Active/passive conflicts are OK and simply cancel passive monitoring. + if (ccb->active()) { + // if the assertion below fails, we have an active comm_read conflict + assert(fd_table[conn->fd].halfClosedReader != NULL); + commStopHalfClosedMonitor(conn->fd); + assert(!ccb->active()); + } + ccb->conn = conn; + + /* Queue the read */ + ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); + Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0); +} + +comm_err_t +Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) +{ + /* Attempt a read */ + ++ statCounter.syscalls.sock.reads; + const SBuf::size_type sz = buf.spaceSize(); + char *theBuf = buf.rawSpace(sz); + errno = 0; + const int retval = FD_READ_METHOD(params.conn->fd, theBuf, sz); + params.xerrno = errno; + + debugs(5, 3, params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno); + + if (retval > 0) { // data read most common case + buf.append(theBuf, retval); + fd_bytes(params.conn->fd, retval, FD_READ); + params.flag = COMM_OK; + params.size = retval; + + } else if (retval == 0) { // remote closure (somewhat less) common + // Note - read 0 == socket EOF, which is a valid read. + params.flag = COMM_EOF; + + } else if (retval < 0) { // connection errors are worst-case + debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerr(params.xerrno)); + if (ignoreErrno(params.xerrno)) + params.flag = COMM_INPROGRESS; + else + params.flag = COMM_ERROR; + } + + return params.flag; +} + +/** + * Handle an FD which is ready for read(2). + * + * If there is no provided buffer to fill call the callback. + * + * Otherwise attempt a read into the provided buffer. + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +void +Comm::HandleRead(int fd, void *data) +{ + Comm::IoCallback *ccb = (Comm::IoCallback *) data; + + assert(data == COMMIO_FD_READCB(fd)); + assert(ccb->active()); + + // without a buffer, just call back + if (!ccb->buf) { + ccb->finish(COMM_OK, 0); + return; + } + + /* For legacy callers : Attempt a read */ + // Keep in sync with Comm::ReadNow()! + ++ statCounter.syscalls.sock.reads; + errno = 0; + int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); + debugs(5, 3, "FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno); + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + fd_bytes(fd, retval, FD_READ); + ccb->offset = retval; + ccb->finish(COMM_OK, errno); + return; + + } else if (retval < 0 && !ignoreErrno(errno)) { + debugs(5, 3, "comm_read_try: scheduling COMM_ERROR"); + ccb->offset = 0; + ccb->finish(COMM_ERROR, errno); + return; + }; + + + /* Nope, register for some more IO */ + Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0); +} + +/** + * Cancel a pending read. Assert that we have the right parameters, + * and that there are no pending read events! + * + * XXX: We do not assert that there are no pending read events and + * with async calls it becomes even more difficult. + * The whole interface should be reworked to do callback->cancel() + * instead of searching for places where the callback may be stored and + * updating the state of those places. + * + * AHC Don't call the comm handlers? + */ +void +comm_read_cancel(int fd, IOCB *callback, void *data) +{ + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + // TODO: is "active" == "monitors FD"? + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + typedef CommCbFunPtrCallT Call; + Call *call = dynamic_cast(cb->callback.getRaw()); + if (!call) { + debugs(5, 4, "fails: FD " << fd << " lacks callback"); + return; + } + + call->cancel("old comm_read_cancel"); + + typedef CommIoCbParams Params; + const Params ¶ms = GetCommParams(cb->callback); + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call->dialer.handler == callback); + assert(params.data == data); + + /* Delete the callback */ + cb->cancel("old comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +void +Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) +{ + callback->cancel("comm_read_cancel"); + + if (!isOpen(fd)) { + debugs(5, 4, "fails: FD " << fd << " closed"); + return; + } + + Comm::IoCallback *cb = COMMIO_FD_READCB(fd); + + if (!cb->active()) { + debugs(5, 4, "fails: FD " << fd << " inactive"); + return; + } + + AsyncCall::Pointer call = cb->callback; + + /* Ok, we can be reasonably sure we won't lose any data here! */ + assert(call == callback); + + /* Delete the callback */ + cb->cancel("comm_read_cancel"); + + /* And the IO event */ + Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} diff --git a/src/comm/Read.h b/src/comm/Read.h new file mode 100644 index 0000000000..3ba7f3415a --- /dev/null +++ b/src/comm/Read.h @@ -0,0 +1,55 @@ +#ifndef _SQUID_COMM_READ_H +#define _SQUID_COMM_READ_H + +#include "base/AsyncCall.h" +#include "CommCalls.h" +#include "comm/forward.h" + +class SBuf; + +namespace Comm +{ + +/** + * Start monitoring for read. + * + * callback is scheduled when the read is possible, + * or on file descriptor close. + */ +void Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback); + +/// whether the FD socket is being monitored for read +bool MonitorsRead(int fd); + +/** + * Perform a read(2) on a connection immediately. + * + * The returned flag is also placed in params.flag. + * + * \retval COMM_OK data has been read and placed in buf, amount in params.size + * \retval COMM_ERROR an error occured, the code is placed in params.xerrno + * \retval COMM_INPROGRESS unable to read at this time, or a minor error occured + * \retval COMM_ERR_CLOSING 0-byte read has occured. + * Usually indicates the remote end has disconnected. + */ +comm_err_t ReadNow(CommIoCbParams ¶ms, SBuf &buf); + +/// Cancel the read pending on FD. No action if none pending. +void ReadCancel(int fd, AsyncCall::Pointer &callback); + +/// callback handler to process an FD which is available for reading +extern PF HandleRead; + +} // namespace Comm + +// Legacy API to be removed +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback); +inline void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) +{ + assert(buf != NULL); + comm_read_base(conn, buf, len, callback); +} +void comm_read_cancel(int fd, IOCB *callback, void *data); +inline void comm_read_cancel(int fd, AsyncCall::Pointer &callback) {Comm::ReadCancel(fd,callback);} + +#endif /* _SQUID_COMM_READ_H */ diff --git a/src/comm/comm_err_t.h b/src/comm/comm_err_t.h index 1b22d3bbfc..88148569ac 100644 --- a/src/comm/comm_err_t.h +++ b/src/comm/comm_err_t.h @@ -13,6 +13,7 @@ typedef enum { COMM_ERR_DNS = -9, COMM_ERR_CLOSING = -10, COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ + COMM_EOF = -12, /**< read(2) returned success, but with 0 bytes */ COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ } comm_err_t; diff --git a/src/comm/comm_internal.h b/src/comm/comm_internal.h index b0f8efb990..ba2a1ff6d6 100644 --- a/src/comm/comm_internal.h +++ b/src/comm/comm_internal.h @@ -12,5 +12,6 @@ typedef struct _fd_debug_t fd_debug_t; extern fd_debug_t *fdd_table; bool isOpen(const int fd); +void commStopHalfClosedMonitor(int fd); #endif diff --git a/src/dns_internal.cc b/src/dns_internal.cc index 120e9dd03f..3cca775949 100644 --- a/src/dns_internal.cc +++ b/src/dns_internal.cc @@ -36,6 +36,7 @@ #include "comm/Connection.h" #include "comm/ConnOpener.h" #include "comm/Loops.h" +#include "comm/Read.h" #include "comm/Write.h" #include "dlink.h" #include "event.h" diff --git a/src/fde.cc b/src/fde.cc index 6128efd59c..705a33414a 100644 --- a/src/fde.cc +++ b/src/fde.cc @@ -32,7 +32,7 @@ */ #include "squid.h" -#include "comm.h" +#include "comm/Read.h" #include "fde.h" #include "globals.h" #include "SquidTime.h" @@ -44,7 +44,7 @@ bool fde::readPending(int fdNumber) { if (type == FD_SOCKET) - return comm_monitors_read(fdNumber); + return Comm::MonitorsRead(fdNumber); return read_handler ? true : false ; } diff --git a/src/ftp.cc b/src/ftp.cc index 2acaafee9e..172d371d9c 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -34,6 +34,7 @@ #include "acl/FilledChecklist.h" #include "comm.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "CommCalls.h" diff --git a/src/gopher.cc b/src/gopher.cc index 8a467737fb..c72039ec5c 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -31,6 +31,7 @@ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fd.h" diff --git a/src/helper.cc b/src/helper.cc index c28ea14cf4..04bc003bc6 100644 --- a/src/helper.cc +++ b/src/helper.cc @@ -34,6 +34,7 @@ #include "base/AsyncCbdataCalls.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc index 1f90afebec..064d52ac22 100644 --- a/src/ident/Ident.cc +++ b/src/ident/Ident.cc @@ -35,6 +35,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "CommCalls.h" #include "globals.h" diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc index 2535d05b25..79d8820b6d 100644 --- a/src/ipc/Port.cc +++ b/src/ipc/Port.cc @@ -6,6 +6,7 @@ #include "squid.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "CommCalls.h" #include "globals.h" #include "ipc/Port.h" diff --git a/src/pconn.cc b/src/pconn.cc index 0ae66630f4..4d115612b2 100644 --- a/src/pconn.cc +++ b/src/pconn.cc @@ -34,6 +34,7 @@ #include "CachePeer.h" #include "comm.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "fd.h" #include "fde.h" #include "globals.h" diff --git a/src/store.cc b/src/store.cc index ebea92f716..207061db9b 100644 --- a/src/store.cc +++ b/src/store.cc @@ -35,6 +35,7 @@ #include "CacheDigest.h" #include "CacheManager.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "ETag.h" #include "event.h" #include "fde.h" diff --git a/src/tests/stub_client_side.cc b/src/tests/stub_client_side.cc index d1a4fc94e9..662be12ea6 100644 --- a/src/tests/stub_client_side.cc +++ b/src/tests/stub_client_side.cc @@ -51,7 +51,7 @@ void ConnStateData::stopSending(const char *error) STUB void ConnStateData::expectNoForwarding() STUB void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB -bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false) +bool ConnStateData::handleReadData() STUB_RETVAL(false) bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false) void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB void ConnStateData::unpinConnection() STUB diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc index 16f19d1ba3..f31768877d 100644 --- a/src/tests/stub_libcomm.cc +++ b/src/tests/stub_libcomm.cc @@ -48,6 +48,16 @@ void Comm::ResetSelect(int) STUB comm_err_t Comm::DoSelect(int) STUB_RETVAL(COMM_ERROR) void Comm::QuickPollRequired(void) STUB +#include "comm/Read.h" +void Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) STUB +bool Comm::MonitorsRead(int fd) STUB_RETVAL(false) +comm_err_t Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) STUB_RETVAL(COMM_ERROR) +void Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) STUB +//void Comm::HandleRead(int, void*) STUB + +void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) STUB +void comm_read_cancel(int fd, IOCB *callback, void *data) STUB + #include "comm/TcpAcceptor.h" //Comm::TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub) STUB void Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) STUB diff --git a/src/tunnel.cc b/src/tunnel.cc index ffabddbd77..08228155ee 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -40,6 +40,7 @@ #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "fde.h" diff --git a/src/whois.cc b/src/whois.cc index 7cfb39e016..00a9f792c6 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -33,6 +33,7 @@ #include "squid.h" #include "comm.h" +#include "comm/Read.h" #include "comm/Write.h" #include "errorpage.h" #include "FwdState.h"