... using an algorithm suggested by Alex Rousskov.
The code for Comm:: read operations is shuffled into comm/libcomm.la and
the files comm/Read.{h,cc} in symmetry with the current Comm::Write API.
The new API consists of:
* Comm::Read() which accepts the Comm::Connection pointer for the
socket to read on and an AsyncCall callback to be run when read is
ready. The Job is responsible for separately initiating read(2) or
alternative action when that callback is run.
* Comm::ReadNow() which accepts an SBuf buffer and a CommIoCbParams
initialized to contain the Comm::Connection pointer for the socket to
read on. TheCommIoCbParams will be filled out with result flag, xerrno,
and size.
This synchronously performs read(2) operations to append bytes to the
provided buffer. It returns a comm_err_t flag for use in determining how
to handle the results and signalling one of OK, INPROGRESS, ERROR, EOF
as having happened.
comm_read() API is retained for backward compatibility during the
transitional period. However it is now deprecated and scheduled for
removal ASAP. The SBuf overloaded variant is now removed.
* Comm::ReadCancel() - a renaming of the comm_read_cancel() AsyncCall
API. Other cancel API(s) are now deprecated and will be removed ASAP.
Code using comm_read_cancel() with AsyncCall may immediately switch to
this new API with no logic changes necessary even if they are not using
other new Comm API calls.
* Comm::MonitorsRead() - a renaming of comm_monitors_read() AsyncCall
API. comm_monitors_read() is now removed.
Other changes:
- the unused comm_has_pending_read_callback() API is erased.
- the IoCallback::buf2 mechanism previously used for SBuf read I/O is
erased.
- ConnStateData is converted to this new API for filling its SBuf I/O
buffer and for monitoring pinned connection closures.
- fde::readPending() converted to new Comm::MonitorsRead() API.
- Comm half-closed monitoring feature is also converted to this new API.
NP: one bug in ConnStateData handling of intercepted HTTPS traffic is
noted but not fixed in this patch.
bool syncWithComm(); // see CommCommonCbParams::syncWithComm
};
-class SBuf;
-
// read/write (I/O) parameters
class CommIoCbParams: public CommCommonCbParams
{
public:
char *buf;
size_t size;
- SBuf *buf2; // alternative buffer for use when buf is unset
};
// close parameters
#include "comm.h"
#include "comm/Connection.h"
#include "comm/Loops.h"
+#include "comm/Read.h"
#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
#include "CommCalls.h"
}
/**
- * This routine should be called to grow the inbuf and then
- * call comm_read().
+ * This routine should be called to grow the in.buf and then
+ * call Comm::Read().
*/
void
ConnStateData::readSomeData()
typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
- comm_read(clientConnection, in.buf, reader);
+ Comm::Read(clientConnection, reader);
}
void
return result;
}
-int
-ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno)
-{
- if (flag != COMM_OK) {
- debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag);
- return 1;
- }
-
- if (size < 0) {
- if (!ignoreErrno(xerrno)) {
- debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno));
- return 1;
- } else if (in.buf.isEmpty()) {
- debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")");
- }
- }
-
- return 0;
-}
-
int
ConnStateData::connFinishedWithConn(int size)
{
void
ConnStateData::clientReadRequest(const CommIoCbParams &io)
{
- debugs(33,5,HERE << io.conn << " size " << io.size);
+ debugs(33,5, io.conn);
Must(reading());
reader = NULL;
/* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
-
if (io.flag == COMM_ERR_CLOSING) {
- debugs(33,5, HERE << io.conn << " closing Bailout.");
+ debugs(33,5, io.conn << " closing Bailout.");
return;
}
assert(io.conn->fd == clientConnection->fd);
/*
- * Don't reset the timeout value here. The timeout value will be
- * set to Config.Timeout.request by httpAccept() and
- * clientWriteComplete(), and should apply to the request as a
- * whole, not individual read() calls. Plus, it breaks our
- * lame half-close detection
+ * Don't reset the timeout value here. The value should be
+ * counting Config.Timeout.request and applies to the request
+ * as a whole, not individual read() calls.
+ * Plus, it breaks our lame *HalfClosed() detection
*/
- if (connReadWasError(io.flag, io.size, io.xerrno)) {
- notifyAllContexts(io.xerrno);
- io.conn->close();
+
+ CommIoCbParams rd(this); // will be expanded with ReadNow results
+ rd.conn = io.conn;
+ switch (Comm::ReadNow(rd, in.buf))
+ {
+ case COMM_INPROGRESS:
+ if (in.buf.isEmpty())
+ debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
+ readSomeData();
return;
- }
- if (io.flag == COMM_OK) {
- if (io.size > 0) {
- kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+ case COMM_OK:
+ kb_incr(&(statCounter.client_http.kbytes_in), rd.size);
+ // may comm_close or setReplyToError
+ if (!handleReadData())
+ return;
- // may comm_close or setReplyToError
- if (!handleReadData(io.buf2))
- return;
+ /* Continue to process previously read data */
+ break;
- } else if (io.size == 0) {
- debugs(33, 5, HERE << io.conn << " closed?");
+ case COMM_EOF: // close detected by 0-byte read
+ debugs(33, 5, io.conn << " closed?");
- if (connFinishedWithConn(io.size)) {
- clientConnection->close();
- return;
- }
+ if (connFinishedWithConn(rd.size)) {
+ clientConnection->close();
+ return;
+ }
- /* It might be half-closed, we can't tell */
- fd_table[io.conn->fd].flags.socket_eof = true;
+ /* It might be half-closed, we can't tell */
+ fd_table[io.conn->fd].flags.socket_eof = true;
+ commMarkHalfClosed(io.conn->fd);
+ fd_note(io.conn->fd, "half-closed");
- commMarkHalfClosed(io.conn->fd);
+ /* There is one more close check at the end, to detect aborted
+ * (partial) requests. At this point we can't tell if the request
+ * is partial.
+ */
- fd_note(io.conn->fd, "half-closed");
+ /* Continue to process previously read data */
+ break;
- /* There is one more close check at the end, to detect aborted
- * (partial) requests. At this point we can't tell if the request
- * is partial.
- */
- /* Continue to process previously read data */
- }
+ // case COMM_ERROR:
+ default: // no other flags should ever occur
+ debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno));
+ notifyAllContexts(rd.xerrno);
+ io.conn->close();
+ return;
}
/* Process next request */
* \retval true we did not call comm_close or setReplyToError
*/
bool
-ConnStateData::handleReadData(SBuf *buf)
+ConnStateData::handleReadData()
{
- assert(buf == &in.buf); // XXX: make this abort the transaction if this fails
-
// if we are reading a body, stuff data into the body pipe
if (bodyPipe != NULL)
return handleRequestBodyData();
// fake a CONNECT request to force connState to tunnel
static char ip[MAX_IPSTRLEN];
connState->clientConnection->local.toUrl(ip, sizeof(ip));
+ // XXX need to *pre-pend* this fake request to the TLS bits already in the buffer
connState->in.buf.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n");
- bool ret = connState->handleReadData(&connState->in.buf);
+ bool ret = connState->handleReadData();
if (ret)
ret = connState->clientParseRequests();
ConnStateData::stopReading()
{
if (reading()) {
- comm_read_cancel(clientConnection->fd, reader);
+ Comm::ReadCancel(clientConnection->fd, reader);
reader = NULL;
}
}
typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
pinning.readHandler = JobCallback(33, 3,
Dialer, this, ConnStateData::clientPinnedConnectionRead);
- static char unusedBuf[8];
- comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler);
+ Comm::Read(pinning.serverConnection, pinning.readHandler);
}
void
ConnStateData::stopPinnedConnectionMonitoring()
{
if (pinning.readHandler != NULL) {
- comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler);
+ Comm::ReadCancel(pinning.serverConnection->fd, pinning.readHandler);
pinning.readHandler = NULL;
}
}
virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
virtual void noteBodyConsumerAborted(BodyPipe::Pointer);
- bool handleReadData(SBuf *buf);
+ bool handleReadData();
bool handleRequestBodyData();
/**
void clientPinnedConnectionRead(const CommIoCbParams &io);
private:
- int connReadWasError(comm_err_t flag, int size, int xerrno);
int connFinishedWithConn(int size);
void clientAfterReadingRequests();
bool concurrentRequestQueueFilled() const;
#include "comm/Connection.h"
#include "comm/IoCallback.h"
#include "comm/Loops.h"
+#include "comm/Read.h"
#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
#include "CommRead.h"
* New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
*/
-static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
return fd >= 0 && fd_table && fd_table[fd].flags.open != 0;
}
-/**
- * Attempt a read
- *
- * If the read attempt succeeds or fails, call the callback.
- * Else, wait for another IO notification.
- */
-void
-commHandleRead(int fd, void *data)
-{
- Comm::IoCallback *ccb = (Comm::IoCallback *) data;
-
- assert(data == COMMIO_FD_READCB(fd));
- assert(ccb->active());
- /* Attempt a read */
- ++ statCounter.syscalls.sock.reads;
- errno = 0;
- int retval;
- if (ccb->buf) {
- retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
- debugs(5, 3, "char FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
- } else {
- assert(ccb->buf2 != NULL);
- SBuf::size_type sz = ccb->buf2->spaceSize();
- char *buf = ccb->buf2->rawSpace(sz);
- retval = FD_READ_METHOD(fd, buf, sz-1); // blocking synchronous read(2)
- if (retval > 0) {
- ccb->buf2->append(buf, retval);
- }
- debugs(5, 3, "SBuf FD " << fd << ", size " << sz << ", retval " << retval << ", errno " << errno);
- }
-
- if (retval < 0 && !ignoreErrno(errno)) {
- debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
- ccb->offset = 0;
- ccb->finish(COMM_ERROR, errno);
- return;
- };
-
- /* See if we read anything */
- /* Note - read 0 == socket EOF, which is a valid read */
- if (retval >= 0) {
- fd_bytes(fd, retval, FD_READ);
- ccb->offset = retval;
- ccb->finish(COMM_OK, errno);
- return;
- }
-
- /* Nope, register for some more IO */
- Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0);
-}
-
-/**
- * Queue a read. handler/handler_data are called when the read
- * completes, on error, or on file descriptor close.
- */
-void
-comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
-{
- debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
-
- /* Make sure we are open and not closing */
- assert(Comm::IsConnOpen(conn));
- assert(!fd_table[conn->fd].closing());
- Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
-
- // Make sure we are either not reading or just passively monitoring.
- // Active/passive conflicts are OK and simply cancel passive monitoring.
- if (ccb->active()) {
- // if the assertion below fails, we have an active comm_read conflict
- assert(fd_table[conn->fd].halfClosedReader != NULL);
- commStopHalfClosedMonitor(conn->fd);
- assert(!ccb->active());
- }
- ccb->conn = conn;
-
- /* Queue the read */
- ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
- Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
-}
-
-/**
- * Queue a read. handler/handler_data are called when the read
- * completes, on error, or on file descriptor close.
- */
-void
-comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback)
-{
- debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
-
- /* Make sure we are open and not closing */
- assert(Comm::IsConnOpen(conn));
- assert(!fd_table[conn->fd].closing());
- Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
-
- // Make sure we are either not reading or just passively monitoring.
- // Active/passive conflicts are OK and simply cancel passive monitoring.
- if (ccb->active()) {
- // if the assertion below fails, we have an active comm_read conflict
- assert(fd_table[conn->fd].halfClosedReader != NULL);
- commStopHalfClosedMonitor(conn->fd);
- assert(!ccb->active());
- }
- ccb->conn = conn;
- ccb->buf2 = &buf;
-
- /* Queue the read */
- ccb->setCallback(Comm::IOCB_READ, callback, NULL, NULL, buf.spaceSize());
- Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
-}
-
/**
* Empty the read buffers
*
#endif
}
-/**
- * Return whether the FD has a pending completed callback.
- * NP: does not work.
- */
-int
-comm_has_pending_read_callback(int fd)
-{
- assert(isOpen(fd));
- // XXX: We do not know whether there is a read callback scheduled.
- // This is used for pconn management that should probably be more
- // tightly integrated into comm to minimize the chance that a
- // closing pconn socket will be used for a new transaction.
- return false;
-}
-
-// Does comm check this fd for read readiness?
-// Note that when comm is not monitoring, there can be a pending callback
-// call, which may resume comm monitoring once fired.
-bool
-comm_monitors_read(int fd)
-{
- assert(isOpen(fd) && COMMIO_FD_READCB(fd));
- // Being active is usually the same as monitoring because we always
- // start monitoring the FD when we configure Comm::IoCallback for I/O
- // and we usually configure Comm::IoCallback for I/O when we starting
- // monitoring a FD for reading.
- return COMMIO_FD_READCB(fd)->active();
-}
-
-/**
- * Cancel a pending read. Assert that we have the right parameters,
- * and that there are no pending read events!
- *
- * XXX: We do not assert that there are no pending read events and
- * with async calls it becomes even more difficult.
- * The whole interface should be reworked to do callback->cancel()
- * instead of searching for places where the callback may be stored and
- * updating the state of those places.
- *
- * AHC Don't call the comm handlers?
- */
-void
-comm_read_cancel(int fd, IOCB *callback, void *data)
-{
- if (!isOpen(fd)) {
- debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
- return;
- }
-
- Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
- // TODO: is "active" == "monitors FD"?
- if (!cb->active()) {
- debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
- return;
- }
-
- typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
- Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
- if (!call) {
- debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback");
- return;
- }
-
- call->cancel("old comm_read_cancel");
-
- typedef CommIoCbParams Params;
- const Params ¶ms = GetCommParams<Params>(cb->callback);
-
- /* Ok, we can be reasonably sure we won't lose any data here! */
- assert(call->dialer.handler == callback);
- assert(params.data == data);
-
- /* Delete the callback */
- cb->cancel("old comm_read_cancel");
-
- /* And the IO event */
- Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-}
-
-void
-comm_read_cancel(int fd, AsyncCall::Pointer &callback)
-{
- callback->cancel("comm_read_cancel");
-
- if (!isOpen(fd)) {
- debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
- return;
- }
-
- Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
-
- if (!cb->active()) {
- debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
- return;
- }
-
- AsyncCall::Pointer call = cb->callback;
- assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL)
-
- /* Ok, we can be reasonably sure we won't lose any data here! */
- assert(call == callback);
-
- /* Delete the callback */
- cb->cancel("comm_read_cancel");
-
- /* And the IO event */
- Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-}
-
/**
* synchronous wrapper around udp socket functions
*/
if (!fd_table[c->fd].halfClosedReader) { // not reading already
AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
CommIoCbPtrFun(&commHalfClosedReader, NULL));
- comm_read(c, NULL, 0, call);
+ Comm::Read(c, call);
fd_table[c->fd].halfClosedReader = call;
} else
c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD
}
/// stop waiting for possibly half-closed connection to close
-static void
+void
commStopHalfClosedMonitor(int const fd)
{
debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed);
void comm_remove_close_handler(int fd, CLCB *, void *);
void comm_remove_close_handler(int fd, AsyncCall::Pointer &);
-int comm_has_pending_read_callback(int fd);
-bool comm_monitors_read(int fd);
-void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback);
-void comm_read(const Comm::ConnectionPointer &conn, SBuf &buf, AsyncCall::Pointer &callback);
-void comm_read_cancel(int fd, IOCB *callback, void *data);
-void comm_read_cancel(int fd, AsyncCall::Pointer &callback);
int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from);
int comm_udp_recv(int fd, void *buf, size_t len, int flags);
ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags);
Comm::IoCallback::reset()
{
conn = NULL;
- buf2 = NULL; // we do not own this buffer.
if (freefunc) {
freefunc(buf);
buf = NULL;
Params ¶ms = GetCommParams<Params>(callback);
if (conn != NULL) params.fd = conn->fd; // for legacy write handlers...
params.conn = conn;
- params.buf2 = buf2;
params.buf = buf;
params.size = offset;
params.flag = code;
iocb_type type;
Comm::ConnectionPointer conn;
AsyncCall::Pointer callback;
-
- /// Buffer to store read(2) into when set.
- // This is a pointer to the Jobs buffer rather than an SBuf using
- // the same store since we cannot know when or how the Job will
- // alter its SBuf while we are reading.
- SBuf *buf2;
-
- // Legacy c-string buffers used when buf2 is unset.
char *buf;
FREE *freefunc;
int size;
ModPoll.cc \
ModSelect.cc \
ModSelectWin32.cc \
+ Read.cc \
+ Read.h \
TcpAcceptor.cc \
TcpAcceptor.h \
UdpOpenDialer.h \
--- /dev/null
+/*
+ * DEBUG: section 05 Socket Functions
+ */
+#include "squid.h"
+#include "comm.h"
+#include "comm_internal.h"
+#include "CommCalls.h"
+#include "comm/IoCallback.h"
+#include "comm/Loops.h"
+#include "comm/Read.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "SBuf.h"
+#include "StatCounters.h"
+//#include "tools.h"
+
+// Does comm check this fd for read readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
+bool
+Comm::MonitorsRead(int fd)
+{
+ assert(isOpen(fd) && COMMIO_FD_READCB(fd));
+ // Being active is usually the same as monitoring because we always
+ // start monitoring the FD when we configure Comm::IoCallback for I/O
+ // and we usually configure Comm::IoCallback for I/O when we starting
+ // monitoring a FD for reading.
+ return COMMIO_FD_READCB(fd)->active();
+}
+
+void
+Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback)
+{
+ // TODO: move comm_read_base() internals into here
+ // when comm_read() char* API is no longer needed
+ comm_read_base(conn, NULL, 0, callback);
+}
+
+/**
+ * Queue a read.
+ * If a buffer is given the callback is scheduled when the read
+ * completes, on error, or on file descriptor close.
+ *
+ * If no buffer (NULL) is given the callback is scheduled when
+ * the socket FD is ready for a read(2)/recv(2).
+ */
+void
+comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
+{
+ debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
+
+ /* Make sure we are open and not closing */
+ assert(Comm::IsConnOpen(conn));
+ assert(!fd_table[conn->fd].closing());
+ Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
+
+ // Make sure we are either not reading or just passively monitoring.
+ // Active/passive conflicts are OK and simply cancel passive monitoring.
+ if (ccb->active()) {
+ // if the assertion below fails, we have an active comm_read conflict
+ assert(fd_table[conn->fd].halfClosedReader != NULL);
+ commStopHalfClosedMonitor(conn->fd);
+ assert(!ccb->active());
+ }
+ ccb->conn = conn;
+
+ /* Queue the read */
+ ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
+ Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0);
+}
+
+comm_err_t
+Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf)
+{
+ /* Attempt a read */
+ ++ statCounter.syscalls.sock.reads;
+ const SBuf::size_type sz = buf.spaceSize();
+ char *theBuf = buf.rawSpace(sz);
+ errno = 0;
+ const int retval = FD_READ_METHOD(params.conn->fd, theBuf, sz);
+ params.xerrno = errno;
+
+ debugs(5, 3, params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno);
+
+ if (retval > 0) { // data read most common case
+ buf.append(theBuf, retval);
+ fd_bytes(params.conn->fd, retval, FD_READ);
+ params.flag = COMM_OK;
+ params.size = retval;
+
+ } else if (retval == 0) { // remote closure (somewhat less) common
+ // Note - read 0 == socket EOF, which is a valid read.
+ params.flag = COMM_EOF;
+
+ } else if (retval < 0) { // connection errors are worst-case
+ debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerr(params.xerrno));
+ if (ignoreErrno(params.xerrno))
+ params.flag = COMM_INPROGRESS;
+ else
+ params.flag = COMM_ERROR;
+ }
+
+ return params.flag;
+}
+
+/**
+ * Handle an FD which is ready for read(2).
+ *
+ * If there is no provided buffer to fill call the callback.
+ *
+ * Otherwise attempt a read into the provided buffer.
+ * If the read attempt succeeds or fails, call the callback.
+ * Else, wait for another IO notification.
+ */
+void
+Comm::HandleRead(int fd, void *data)
+{
+ Comm::IoCallback *ccb = (Comm::IoCallback *) data;
+
+ assert(data == COMMIO_FD_READCB(fd));
+ assert(ccb->active());
+
+ // without a buffer, just call back
+ if (!ccb->buf) {
+ ccb->finish(COMM_OK, 0);
+ return;
+ }
+
+ /* For legacy callers : Attempt a read */
+ // Keep in sync with Comm::ReadNow()!
+ ++ statCounter.syscalls.sock.reads;
+ errno = 0;
+ int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
+ debugs(5, 3, "FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
+
+ /* See if we read anything */
+ /* Note - read 0 == socket EOF, which is a valid read */
+ if (retval >= 0) {
+ fd_bytes(fd, retval, FD_READ);
+ ccb->offset = retval;
+ ccb->finish(COMM_OK, errno);
+ return;
+
+ } else if (retval < 0 && !ignoreErrno(errno)) {
+ debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
+ ccb->offset = 0;
+ ccb->finish(COMM_ERROR, errno);
+ return;
+ };
+
+
+ /* Nope, register for some more IO */
+ Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0);
+}
+
+/**
+ * Cancel a pending read. Assert that we have the right parameters,
+ * and that there are no pending read events!
+ *
+ * XXX: We do not assert that there are no pending read events and
+ * with async calls it becomes even more difficult.
+ * The whole interface should be reworked to do callback->cancel()
+ * instead of searching for places where the callback may be stored and
+ * updating the state of those places.
+ *
+ * AHC Don't call the comm handlers?
+ */
+void
+comm_read_cancel(int fd, IOCB *callback, void *data)
+{
+ if (!isOpen(fd)) {
+ debugs(5, 4, "fails: FD " << fd << " closed");
+ return;
+ }
+
+ Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
+ // TODO: is "active" == "monitors FD"?
+ if (!cb->active()) {
+ debugs(5, 4, "fails: FD " << fd << " inactive");
+ return;
+ }
+
+ typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
+ Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
+ if (!call) {
+ debugs(5, 4, "fails: FD " << fd << " lacks callback");
+ return;
+ }
+
+ call->cancel("old comm_read_cancel");
+
+ typedef CommIoCbParams Params;
+ const Params ¶ms = GetCommParams<Params>(cb->callback);
+
+ /* Ok, we can be reasonably sure we won't lose any data here! */
+ assert(call->dialer.handler == callback);
+ assert(params.data == data);
+
+ /* Delete the callback */
+ cb->cancel("old comm_read_cancel");
+
+ /* And the IO event */
+ Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+}
+
+void
+Comm::ReadCancel(int fd, AsyncCall::Pointer &callback)
+{
+ callback->cancel("comm_read_cancel");
+
+ if (!isOpen(fd)) {
+ debugs(5, 4, "fails: FD " << fd << " closed");
+ return;
+ }
+
+ Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
+
+ if (!cb->active()) {
+ debugs(5, 4, "fails: FD " << fd << " inactive");
+ return;
+ }
+
+ AsyncCall::Pointer call = cb->callback;
+
+ /* Ok, we can be reasonably sure we won't lose any data here! */
+ assert(call == callback);
+
+ /* Delete the callback */
+ cb->cancel("comm_read_cancel");
+
+ /* And the IO event */
+ Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+}
--- /dev/null
+#ifndef _SQUID_COMM_READ_H
+#define _SQUID_COMM_READ_H
+
+#include "base/AsyncCall.h"
+#include "CommCalls.h"
+#include "comm/forward.h"
+
+class SBuf;
+
+namespace Comm
+{
+
+/**
+ * Start monitoring for read.
+ *
+ * callback is scheduled when the read is possible,
+ * or on file descriptor close.
+ */
+void Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback);
+
+/// whether the FD socket is being monitored for read
+bool MonitorsRead(int fd);
+
+/**
+ * Perform a read(2) on a connection immediately.
+ *
+ * The returned flag is also placed in params.flag.
+ *
+ * \retval COMM_OK data has been read and placed in buf, amount in params.size
+ * \retval COMM_ERROR an error occured, the code is placed in params.xerrno
+ * \retval COMM_INPROGRESS unable to read at this time, or a minor error occured
+ * \retval COMM_ERR_CLOSING 0-byte read has occured.
+ * Usually indicates the remote end has disconnected.
+ */
+comm_err_t ReadNow(CommIoCbParams ¶ms, SBuf &buf);
+
+/// Cancel the read pending on FD. No action if none pending.
+void ReadCancel(int fd, AsyncCall::Pointer &callback);
+
+/// callback handler to process an FD which is available for reading
+extern PF HandleRead;
+
+} // namespace Comm
+
+// Legacy API to be removed
+void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback);
+inline void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
+{
+ assert(buf != NULL);
+ comm_read_base(conn, buf, len, callback);
+}
+void comm_read_cancel(int fd, IOCB *callback, void *data);
+inline void comm_read_cancel(int fd, AsyncCall::Pointer &callback) {Comm::ReadCancel(fd,callback);}
+
+#endif /* _SQUID_COMM_READ_H */
COMM_ERR_DNS = -9,
COMM_ERR_CLOSING = -10,
COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+ COMM_EOF = -12, /**< read(2) returned success, but with 0 bytes */
COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
} comm_err_t;
extern fd_debug_t *fdd_table;
bool isOpen(const int fd);
+void commStopHalfClosedMonitor(int fd);
#endif
#include "comm/Connection.h"
#include "comm/ConnOpener.h"
#include "comm/Loops.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "dlink.h"
#include "event.h"
*/
#include "squid.h"
-#include "comm.h"
+#include "comm/Read.h"
#include "fde.h"
#include "globals.h"
#include "SquidTime.h"
fde::readPending(int fdNumber)
{
if (type == FD_SOCKET)
- return comm_monitors_read(fdNumber);
+ return Comm::MonitorsRead(fdNumber);
return read_handler ? true : false ;
}
#include "acl/FilledChecklist.h"
#include "comm.h"
#include "comm/ConnOpener.h"
+#include "comm/Read.h"
#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
#include "CommCalls.h"
#include "squid.h"
#include "comm.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "errorpage.h"
#include "fd.h"
#include "base/AsyncCbdataCalls.h"
#include "comm.h"
#include "comm/Connection.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "fd.h"
#include "fde.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/ConnOpener.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "CommCalls.h"
#include "globals.h"
#include "squid.h"
#include "comm.h"
#include "comm/Connection.h"
+#include "comm/Read.h"
#include "CommCalls.h"
#include "globals.h"
#include "ipc/Port.h"
#include "CachePeer.h"
#include "comm.h"
#include "comm/Connection.h"
+#include "comm/Read.h"
#include "fd.h"
#include "fde.h"
#include "globals.h"
#include "CacheDigest.h"
#include "CacheManager.h"
#include "comm/Connection.h"
+#include "comm/Read.h"
#include "ETag.h"
#include "event.h"
#include "fde.h"
void ConnStateData::expectNoForwarding() STUB
void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB
void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB
-bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false)
+bool ConnStateData::handleReadData() STUB_RETVAL(false)
bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false)
void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB
void ConnStateData::unpinConnection() STUB
comm_err_t Comm::DoSelect(int) STUB_RETVAL(COMM_ERROR)
void Comm::QuickPollRequired(void) STUB
+#include "comm/Read.h"
+void Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback) STUB
+bool Comm::MonitorsRead(int fd) STUB_RETVAL(false)
+comm_err_t Comm::ReadNow(CommIoCbParams ¶ms, SBuf &buf) STUB_RETVAL(COMM_ERROR)
+void Comm::ReadCancel(int fd, AsyncCall::Pointer &callback) STUB
+//void Comm::HandleRead(int, void*) STUB
+
+void comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback) STUB
+void comm_read_cancel(int fd, IOCB *callback, void *data) STUB
+
#include "comm/TcpAcceptor.h"
//Comm::TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub) STUB
void Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) STUB
#include "comm.h"
#include "comm/Connection.h"
#include "comm/ConnOpener.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "errorpage.h"
#include "fde.h"
#include "squid.h"
#include "comm.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "errorpage.h"
#include "FwdState.h"