]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
SourceLayout: Comm Write cleanups
authorAmos Jeffries <amosjeffries@squid-cache.org>
Sat, 27 Nov 2010 01:46:22 +0000 (18:46 -0700)
committerAmos Jeffries <amosjeffries@squid-cache.org>
Sat, 27 Nov 2010 01:46:22 +0000 (18:46 -0700)
* creates namespace Comm.

* The comm_write() functions are moved into that scope as Comm::Write()
  and only accept AsyncCall now. Old wrapper functions are removed.

* commio_* functions are all moved to methods of a new Comm::IoCallback
  object. Which represents either a read or a write callback event
  waiting to happen. Old wrapper functions have been removed.

* The fdc_table of pending read and write callbacks has been moved into
  the Comm scope with (the name iocb_table) and should be considered private.
  For now the COMMIO_*_CB() macros are retained to produce a pointer to
  a callback object in this table.

* libcomm-listener.la has been renamed to libcomm.la

28 files changed:
src/Makefile.am
src/MemBuf.cc
src/Packer.cc
src/Server.cc
src/Server.h
src/adaptation/icap/Xaction.cc
src/client_side.cc
src/client_side_request.cc
src/comm.cc
src/comm.h
src/comm/IoCallback.cc [new file with mode: 0644]
src/comm/IoCallback.h [new file with mode: 0644]
src/comm/Makefile.am
src/comm/Write.cc [new file with mode: 0644]
src/comm/Write.h [new file with mode: 0644]
src/comm_err_t.h [new file with mode: 0644]
src/dns_internal.cc
src/errorpage.cc
src/ftp.cc
src/gopher.cc
src/helper.cc
src/http.cc
src/ident/Ident.cc
src/ipc/UdsOp.cc
src/mgr/Inquirer.cc
src/mgr/StoreToCommWriter.cc
src/tunnel.cc
src/whois.cc

index d1ef27212b31d0c0a53d7789eed8bf929e6ed41d..00d06638ee63687e6c6537abfb4b86bdd42baf89 100644 (file)
@@ -255,6 +255,7 @@ squid_COMMSOURCES = \
 libsquid_la_SOURCES = \
        comm.cc \
        comm.h \
+       comm_err_t.h \
        CommCalls.cc \
        CommCalls.h \
        DescriptorSet.cc \
@@ -550,7 +551,7 @@ nodist_squid_SOURCES = \
 
 squid_LDADD = \
        $(COMMON_LIBS) \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        eui/libeui.la \
        icmp/libicmp.la icmp/libicmp-core.la \
        log/liblog.la \
@@ -1279,10 +1280,10 @@ tests_testCacheManager_SOURCES = \
        wordlist.cc
 nodist_tests_testCacheManager_SOURCES = \
        $(BUILT_SOURCES)
-# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead.
+# comm.cc only requires comm/libcomm.la until fdc_table is dead.
 tests_testCacheManager_LDADD = \
        $(COMMON_LIBS) \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        icmp/libicmp.la icmp/libicmp-core.la \
        log/liblog.la \
        $(REPL_OBJS) \
@@ -1486,7 +1487,7 @@ nodist_tests_testEvent_SOURCES = \
 tests_testEvent_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        log/liblog.la \
        $(REPL_OBJS) \
        ${ADAPTATION_LIBS} \
@@ -1649,7 +1650,7 @@ nodist_tests_testEventLoop_SOURCES = \
 tests_testEventLoop_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        log/liblog.la \
        $(REPL_OBJS) \
        ${ADAPTATION_LIBS} \
@@ -1807,7 +1808,7 @@ nodist_tests_test_http_range_SOURCES = \
 tests_test_http_range_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        log/liblog.la \
        $(REPL_OBJS) \
        ${ADAPTATION_LIBS} \
@@ -1970,7 +1971,7 @@ nodist_tests_testHttpRequest_SOURCES = \
 tests_testHttpRequest_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        log/liblog.la \
        $(REPL_OBJS) \
        ${ADAPTATION_LIBS} \
@@ -2385,7 +2386,7 @@ nodist_tests_testURL_SOURCES = \
 tests_testURL_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
-       comm/libcomm-listener.la \
+       comm/libcomm.la \
        log/liblog.la \
        $(REGEXLIB) \
        $(REPL_OBJS) \
index 1a2713a7af37a7e526e30880ab3f5e01836e3e0e..70480bec6506f9e6780aa386a509ab99525e10d4 100644 (file)
@@ -41,7 +41,7 @@
  * Rationale:
  * ----------
  *
- * Here is how one would comm_write an object without MemBuffer:
+ * Here is how one would Comm::Write an object without MemBuffer:
  *
  * {
  * -- allocate:
@@ -53,7 +53,7 @@
  * ...
  *
  * -- write
- * comm_write(buf, free, ...);
+ * Comm::Write(buf, free, ...);
  * }
  *
  * The whole "packing" idea is quite messy: We are given a buffer of fixed
@@ -91,7 +91,7 @@
  * ...
  *
  * -- write
- * comm_write_mbuf(fd, buf, handler, data);
+ * Comm::Write(fd, buf, callback);
  *
  * -- *iff* you did not give the buffer away, free it yourself
  * -- buf.clean();
index 2128c0709a8b941e19ecbde3be18728aa181dd23..4e50d1a5ad468c6c456d5a24a8e49a8c30caea26 100644 (file)
@@ -45,7 +45,7 @@
  * Comm.c lacks commAppend[Printf] because comm does not handle its own
  * buffers (no mem_obj equivalent for comm.c).
  *
- * Thus, if one wants to be able to store _and_ comm_write an object, s/he
+ * Thus, if one wants to be able to store _and_ Comm::Write an object, s/he
  * has to implement two almost identical functions.
  *
  * Packer
  * Packer has its own append and printf routines that "know" where to send
  * incoming data. In case of store interface, Packer sends data to
  * storeAppend.  Otherwise, Packer uses a MemBuf that can be flushed later to
- * comm_write.
+ * Comm::Write.
  *
  * Thus, one can write just one function that will either "pack" things for
- * comm_write or "append" things to store, depending on actual packer
+ * Comm::Write or "append" things to store, depending on actual packer
  * supplied.
  *
  * It is amazing how much work a tiny object can save. :)
index bb02d4c4439ce778a9f1352e8ec703c236920303..8256a1f77a2f9c6a45a8e154102d37a7d6a18d7d 100644 (file)
@@ -34,6 +34,7 @@
 
 #include "squid.h"
 #include "base/TextException.h"
+#include "comm/Write.h"
 #include "Server.h"
 #include "Store.h"
 #include "fde.h" /* for fd_table[fd].closing */
@@ -425,7 +426,7 @@ ServerStateData::sendMoreRequestBody()
         typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
         requestSender = JobCallback(93,3,
                                     Dialer, this, ServerStateData::sentRequestBody);
-        comm_write_mbuf(fd, &buf, requestSender);
+        Comm::Write(fd, &buf, requestSender);
     } else {
         debugs(9,3, HERE << "will wait for more request body bytes or eof");
         requestSender = NULL;
index 7bb1c9ac6448ec373692f20a844fdc656b982cc9..31db3c98dac6d71d58c941d8a835c2273da90988 100644 (file)
@@ -185,7 +185,7 @@ public: // should not be
 
 protected:
     BodyPipe::Pointer requestBodySource;  /**< to consume request body */
-    AsyncCall::Pointer requestSender;     /**< set if we are expecting comm_write to call us back */
+    AsyncCall::Pointer requestSender;     /**< set if we are expecting Comm::Write to call us back */
 
 #if USE_ADAPTATION
     BodyPipe::Pointer virginBodyDestination;  /**< to provide virgin response body */
index b37032a14dc5b73cf2d9dae2589392d83a176365..3e1f3b5d4156e3a158cacefb655d7dda97eb5911 100644 (file)
@@ -4,6 +4,7 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "comm/Write.h"
 #include "CommCalls.h"
 #include "HttpMsg.h"
 #include "adaptation/icap/Xaction.h"
@@ -237,7 +238,7 @@ void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
     writer = JobCallback(93,3,
                          Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
 
-    comm_write_mbuf(connection, &buf, writer);
+    Comm::Write(connection, &buf, writer);
     updateTimeout();
 }
 
index e962654758be9678d2b7a4f22245350024e54176..6abeb3e5fc142e0b161bde1a55146eeaab59ade0 100644 (file)
@@ -92,6 +92,7 @@
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
+#include "comm/Write.h"
 #include "comm/ListenStateData.h"
 #include "base/TextException.h"
 #include "ConnectionDetail.h"
@@ -376,7 +377,7 @@ ClientSocketContext::writeControlMsg(HttpControlMsg &msg)
 
     AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
                                          CommIoCbPtrFun(&WroteControlMsg, this));
-    comm_write_mbuf(fd(), mb, call);
+    Comm::Write(fd(), mb, call);
 
     delete mb;
 }
@@ -962,7 +963,7 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
         noteSentBodyBytes (length);
         AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
                                              CommIoCbPtrFun(clientWriteBodyComplete, this));
-        comm_write(fd(), bodyData.data, length, call );
+        Comm::Write(fd(), bodyData.data, length, call, NULL);
         return;
     }
 
@@ -977,7 +978,7 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
         /* write */
         AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
                                              CommIoCbPtrFun(clientWriteComplete, this));
-        comm_write_mbuf(fd(), &mb, call);
+        Comm::Write(fd(), &mb, call);
     }  else
         writeComplete(fd(), NULL, 0, COMM_OK);
 }
@@ -1380,7 +1381,7 @@ ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData)
     debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
     AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
                                          CommIoCbPtrFun(clientWriteComplete, this));
-    comm_write_mbuf(fd(), mb, call);
+    Comm::Write(fd(), mb, call);
 
     delete mb;
 }
index 8d1c457d026f4ce24283da63f44b64e850cdf3a5..280828ea1cc7a956460b636d18a2dae4a9e1067b 100644 (file)
@@ -59,6 +59,7 @@
 #include "client_side_reply.h"
 #include "client_side_request.h"
 #include "ClientRequestContext.h"
+#include "comm/Write.h"
 #include "compat/inet_pton.h"
 #include "fde.h"
 #include "HttpReply.h"
@@ -1205,8 +1206,9 @@ ClientHttpRequest::sslBumpStart()
     // TODO: Unify with tunnel.cc and add a Server(?) header
     static const char *const conn_established =
         "HTTP/1.1 200 Connection established\r\n\r\n";
-    comm_write(fd, conn_established, strlen(conn_established),
-               &SslBumpEstablish, this, NULL);
+    AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish",
+                                         CommIoCbPtrFun(&SslBumpEstablish, this));
+    Comm::Write(fd, conn_established, strlen(conn_established), call, NULL);
 }
 
 #endif
index b5e6a8190e06e0db10eb05b7d0317e487f120a76..fc7e131f3fb7f761adacb83c08e9c736696b88ec 100644 (file)
@@ -39,6 +39,8 @@
 #include "fde.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
 #include "comm/ListenStateData.h"
 #include "CommIO.h"
 #include "CommRead.h"
  * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
  */
 
-typedef enum {
-    IOCB_NONE,
-    IOCB_READ,
-    IOCB_WRITE
-} iocb_type;
-
 static void commStopHalfClosedMonitor(int fd);
 static IOCB commHalfClosedReader;
 static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
@@ -84,142 +80,6 @@ CBDATA_CLASS_INIT(CommQuotaQueue);
 static void commHandleWriteHelper(void * data);
 #endif
 
-static void commSelectOrQueueWrite(const int fd);
-
-struct comm_io_callback_t {
-    iocb_type type;
-    int fd;
-    AsyncCall::Pointer callback;
-    char *buf;
-    FREE *freefunc;
-    int size;
-    int offset;
-    comm_err_t errcode;
-    int xerrno;
-#if DELAY_POOLS
-    unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
-#endif
-
-
-    bool active() const { return callback != NULL; }
-};
-
-struct _comm_fd {
-    int fd;
-    comm_io_callback_t readcb;
-    comm_io_callback_t writecb;
-};
-typedef struct _comm_fd comm_fd_t;
-comm_fd_t *commfd_table;
-
-// TODO: make this a comm_io_callback_t method?
-bool
-commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
-{
-    assert(ccb->fd == fd);
-    assert(ccb->type == type);
-    return ccb->active();
-}
-
-/*
- * Configure comm_io_callback_t for I/O
- *
- * @param fd           filedescriptor
- * @param ccb          comm io callback
- * @param cb           callback
- * @param cbdata       callback data (must be cbdata'ed)
- * @param buf          buffer, if applicable
- * @param freefunc     freefunc, if applicable
- * @param size         buffer size
- */
-static void
-commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
-                    AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
-{
-    assert(!ccb->active());
-    assert(ccb->type == type);
-    assert(cb != NULL);
-    ccb->fd = fd;
-    ccb->callback = cb;
-    ccb->buf = buf;
-    ccb->freefunc = freefunc;
-    ccb->size = size;
-    ccb->offset = 0;
-}
-
-
-// Schedule the callback call and clear the callback
-static void
-commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
-{
-    debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
-           code << ", " << xerrno << ")");
-    assert(ccb->active());
-    assert(ccb->fd == fd);
-    ccb->errcode = code;
-    ccb->xerrno = xerrno;
-
-#if DELAY_POOLS
-    ccb->quotaQueueReserv = 0;
-#endif
-
-    comm_io_callback_t cb = *ccb;
-
-    /* We've got a copy; blow away the real one */
-    /* XXX duplicate code from commio_cancel_callback! */
-    ccb->xerrno = 0;
-    ccb->callback = NULL; // cb has it
-
-    /* free data */
-    if (cb.freefunc) {
-        cb.freefunc(cb.buf);
-        cb.buf = NULL;
-    }
-
-    if (cb.callback != NULL) {
-        typedef CommIoCbParams Params;
-        Params &params = GetCommParams<Params>(cb.callback);
-        params.fd = cb.fd;
-        params.buf = cb.buf;
-        params.size = cb.offset;
-        params.flag = cb.errcode;
-        params.xerrno = cb.xerrno;
-        ScheduleCallHere(cb.callback);
-    }
-}
-
-
-/*
- * Cancel the given callback
- *
- * Remember that the data is cbdataRef'ed.
- */
-// TODO: make this a comm_io_callback_t method
-static void
-commio_cancel_callback(int fd, comm_io_callback_t *ccb)
-{
-    debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
-    assert(ccb->fd == fd);
-    assert(ccb->active());
-
-    ccb->xerrno = 0;
-    ccb->callback = NULL;
-
-#if DELAY_POOLS
-    ccb->quotaQueueReserv = 0;
-#endif
-}
-
-/*
- * Call the given comm callback; assumes the callback is valid.
- *
- * @param ccb          io completion callback
- */
-void
-commio_call_callback(comm_io_callback_t *ccb)
-{
-}
-
 class ConnectStateData
 {
 
@@ -267,7 +127,6 @@ static void commSetTcpNoDelay(int);
 #endif
 static void commSetTcpRcvbuf(int, int);
 static PF commConnectFree;
-static PF commHandleWrite;
 static IPH commConnectDnsHandle;
 
 typedef enum {
@@ -293,10 +152,10 @@ isOpen(const int fd)
 void
 commHandleRead(int fd, void *data)
 {
-    comm_io_callback_t *ccb = (comm_io_callback_t *) data;
+    Comm::IoCallback *ccb = (Comm::IoCallback *) data;
 
     assert(data == COMMIO_FD_READCB(fd));
-    assert(commio_has_callback(fd, IOCB_READ, ccb));
+    assert(ccb->active());
     /* Attempt a read */
     statCounter.syscalls.sock.reads++;
     errno = 0;
@@ -307,7 +166,7 @@ commHandleRead(int fd, void *data)
     if (retval < 0 && !ignoreErrno(errno)) {
         debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
         ccb->offset = 0;
-        commio_finish_callback(fd, ccb, COMM_ERROR, errno);
+        ccb->finish(COMM_ERROR, errno);
         return;
     };
 
@@ -316,7 +175,7 @@ commHandleRead(int fd, void *data)
     if (retval >= 0) {
         fd_bytes(fd, retval, FD_READ);
         ccb->offset = retval;
-        commio_finish_callback(fd, ccb, COMM_OK, errno);
+        ccb->finish(COMM_OK, errno);
         return;
     }
 
@@ -344,7 +203,7 @@ comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
     /* Make sure we are open and not closing */
     assert(isOpen(fd));
     assert(!fd_table[fd].closing());
-    comm_io_callback_t *ccb = COMMIO_FD_READCB(fd);
+    Comm::IoCallback *ccb = COMMIO_FD_READCB(fd);
 
     // Make sure we are either not reading or just passively monitoring.
     // Active/passive conflicts are OK and simply cancel passive monitoring.
@@ -356,7 +215,7 @@ comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
     }
 
     /* Queue the read */
-    commio_set_callback(fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
+    ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
     commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
 }
 
@@ -404,9 +263,9 @@ comm_monitors_read(int fd)
 {
     assert(isOpen(fd));
     // Being active is usually the same as monitoring because we always
-    // start monitoring the FD when we configure comm_io_callback_t for I/O
-    // and we usually configure comm_io_callback_t for I/O when we starting
-    // monitoring a FD for reading. TODO: replace with commio_has_callback
+    // start monitoring the FD when we configure Comm::IoCallback for I/O
+    // and we usually configure Comm::IoCallback for I/O when we starting
+    // monitoring a FD for reading.
     return COMMIO_FD_READCB(fd)->active();
 }
 
@@ -430,7 +289,7 @@ comm_read_cancel(int fd, IOCB *callback, void *data)
         return;
     }
 
-    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
     // TODO: is "active" == "monitors FD"?
     if (!cb->active()) {
         debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
@@ -454,7 +313,7 @@ comm_read_cancel(int fd, IOCB *callback, void *data)
     assert(params.data == data);
 
     /* Delete the callback */
-    commio_cancel_callback(fd, cb);
+    cb->cancel("old comm_read_cancel");
 
     /* And the IO event */
     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
@@ -470,7 +329,7 @@ comm_read_cancel(int fd, AsyncCall::Pointer &callback)
         return;
     }
 
-    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
 
     if (!cb->active()) {
         debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
@@ -484,7 +343,7 @@ comm_read_cancel(int fd, AsyncCall::Pointer &callback)
     assert(call == callback);
 
     /* Delete the callback */
-    commio_cancel_callback(fd, cb);
+    cb->cancel("comm_read_cancel");
 
     /* And the IO event */
     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
@@ -1605,13 +1464,13 @@ _comm_close(int fd, char const *file, int line)
     commSetTimeout(fd, -1, NULL, NULL);
 
     // notify read/write handlers after canceling select reservations, if any
-    if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
+    if (COMMIO_FD_WRITECB(fd)->active()) {
         commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
-        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
+        COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno);
     }
-    if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
+    if (COMMIO_FD_READCB(fd)->active()) {
         commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-        commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
+        COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno);
     }
 
 #if DELAY_POOLS
@@ -1939,14 +1798,8 @@ comm_init(void)
     /* make sure the accept() socket FIFO delay queue exists */
     Comm::AcceptLimiter::Instance();
 
-    commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
-    for (int pos = 0; pos < Squid_MaxFD; pos++) {
-        commfd_table[pos].fd = pos;
-        commfd_table[pos].readcb.fd = pos;
-        commfd_table[pos].readcb.type = IOCB_READ;
-        commfd_table[pos].writecb.fd = pos;
-        commfd_table[pos].writecb.type = IOCB_WRITE;
-    }
+    // make sure the IO pending callback table exists
+    Comm::CallbackTableInit();
 
     /* XXX account fd_table */
     /* Keep a few file descriptors free so that we don't run out of FD's
@@ -1967,12 +1820,12 @@ comm_exit(void)
 
     safe_free(fd_table);
     safe_free(fdd_table);
-    safe_free(commfd_table);
+    Comm::CallbackTableDestruct();
 }
 
 #if DELAY_POOLS
 // called when the queue is done waiting for the client bucket to fill
-static void
+void
 commHandleWriteHelper(void * data)
 {
     CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
@@ -1991,14 +1844,14 @@ commHandleWriteHelper(void * data)
     do {
         // check that the head descriptor is still relevant
         const int head = clientInfo->quotaPeekFd();
-        comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+        Comm::IoCallback *ccb = COMMIO_FD_WRITECB(head);
 
         if (fd_table[head].clientInfo == clientInfo &&
                 clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
                 !fd_table[head].closing()) {
 
             // wait for the head descriptor to become ready for writing
-            commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+            commSetSelect(head, COMM_SELECT_WRITE, Comm::HandleWrite, ccb, 0);
             clientInfo->selectWaiting = true;
             return;
         }
@@ -2195,191 +2048,8 @@ CommQuotaQueue::dequeue()
     fds.pop_front();
     ++outs;
 }
-
-#endif
-
-/* Write to FD. */
-static void
-commHandleWrite(int fd, void *data)
-{
-    comm_io_callback_t *state = (comm_io_callback_t *)data;
-    int len = 0;
-    int nleft;
-
-    assert(state == COMMIO_FD_WRITECB(fd));
-
-    PROF_start(commHandleWrite);
-    debugs(5, 5, "commHandleWrite: FD " << fd << ": off " <<
-           (long int) state->offset << ", sz " << (long int) state->size << ".");
-
-    nleft = state->size - state->offset;
-
-#if DELAY_POOLS
-    ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
-    if (clientInfo && !clientInfo->writeLimitingActive)
-        clientInfo = NULL; // we only care about quota limits here
-
-    if (clientInfo) {
-        assert(clientInfo->selectWaiting);
-        clientInfo->selectWaiting = false;
-
-        assert(clientInfo->hasQueue());
-        assert(clientInfo->quotaPeekFd() == fd);
-        clientInfo->quotaDequeue(); // we will write or requeue below
-
-        if (nleft > 0) {
-            const int quota = clientInfo->quotaForDequed();
-            if (!quota) {  // if no write quota left, queue this fd
-                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-                clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
-            }
-
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << "FD " << fd << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
-            }
-
-        }
-    }
-
-#endif
-
-    /* actually WRITE data */
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
-    debugs(5, 5, "commHandleWrite: write() returns " << len);
-
-#if DELAY_POOLS
-    if (clientInfo) {
-        if (len > 0) {
-            /* we wrote data - drain them from bucket */
-            clientInfo->bucketSize -= len;
-            if (clientInfo->bucketSize < 0.0) {
-                debugs(5,1, HERE << "drained too much"); // should not happen
-                clientInfo->bucketSize = 0;
-            }
-        }
-
-        // even if we wrote nothing, we were served; give others a chance
-        clientInfo->kickQuotaQueue();
-    }
-#endif
-
-    fd_bytes(fd, len, FD_WRITE);
-    statCounter.syscalls.sock.writes++;
-    // After each successful partial write,
-    // reset fde::writeStart to the current time.
-    fd_table[fd].writeStart = squid_curtime;
-
-    if (len == 0) {
-        /* Note we even call write if nleft == 0 */
-        /* We're done */
-
-        if (nleft != 0)
-            debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
-
-        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-    } else if (len < 0) {
-        /* An error */
-
-        if (fd_table[fd].flags.socket_eof) {
-            debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-        } else if (ignoreErrno(errno)) {
-            debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commSelectOrQueueWrite(fd);
-        } else {
-            debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-        }
-    } else {
-        /* A successful write, continue */
-        state->offset += len;
-
-        if (state->offset < state->size) {
-            /* Not done, reinstall the write handler and write some more */
-            commSelectOrQueueWrite(fd);
-        } else {
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
-        }
-    }
-
-    PROF_stop(commHandleWrite);
-}
-
-/*
- * Queue a write. handler/handler_data are called when the write
- * completes, on error, or on file descriptor close.
- *
- * free_func is used to free the passed buffer when the write has completed.
- */
-void
-comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
-{
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
-                                         CommIoCbPtrFun(handler, handler_data));
-
-    comm_write(fd, buf, size, call, free_func);
-}
-
-void
-comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
-{
-    debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback);
-
-    /* Make sure we are open, not closing, and not writing */
-    assert(isOpen(fd));
-    assert(!fd_table[fd].closing());
-    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
-    assert(!ccb->active());
-
-    fd_table[fd].writeStart = squid_curtime;
-    /* Queue the write */
-    commio_set_callback(fd, IOCB_WRITE, ccb, callback,
-                        (char *)buf, free_func, size);
-
-    commSelectOrQueueWrite(fd);
-}
-
-// called when fd needs to write but may need to wait in line for its quota
-static void
-commSelectOrQueueWrite(const int fd)
-{
-    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
-
-#if DELAY_POOLS
-    // stand in line if there is one
-    if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
-        if (clientInfo->writeLimitingActive) {
-            ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-            clientInfo->kickQuotaQueue();
-            return;
-        }
-    }
 #endif
 
-    commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
-}
-
-
-/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
-void
-comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
-{
-    comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
-}
-
-void
-comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
-{
-    comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
-}
-
-
 /*
  * hm, this might be too general-purpose for all the places we'd
  * like to use it.
@@ -2462,7 +2132,7 @@ AlreadyTimedOut(fde *F)
 static bool
 writeTimedOut(int fd)
 {
-    if (!commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd)))
+    if (!COMMIO_FD_WRITECB(fd)->active())
         return false;
 
     if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
@@ -2485,7 +2155,7 @@ checkTimeouts(void)
             // We have an active write callback and we are timed out
             debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
             commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERROR, ETIMEDOUT);
+            COMMIO_FD_WRITECB(fd)->finish(COMM_ERROR, ETIMEDOUT);
         } else if (AlreadyTimedOut(F))
             continue;
 
index 19e0f0c06b946708a6952ca5bec2d7ac80156053..ed3217c8b82b0f3012c5ff292cb6bf3a87e3e21c 100644 (file)
@@ -4,28 +4,12 @@
 #include "squid.h"
 #include "AsyncEngine.h"
 #include "base/AsyncCall.h"
+#include "comm_err_t.h"
+#include "comm/IoCallback.h"
 #include "StoreIOBuffer.h"
 #include "Array.h"
 #include "ip/Address.h"
 
-#define COMMIO_FD_READCB(fd)    (&commfd_table[(fd)].readcb)
-#define COMMIO_FD_WRITECB(fd)   (&commfd_table[(fd)].writecb)
-
-typedef enum {
-    COMM_OK = 0,
-    COMM_ERROR = -1,
-    COMM_NOMESSAGE = -3,
-    COMM_TIMEOUT = -4,
-    COMM_SHUTDOWN = -5,
-    COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
-    COMM_INPROGRESS = -7,
-    COMM_ERR_CONNECT = -8,
-    COMM_ERR_DNS = -9,
-    COMM_ERR_CLOSING = -10,
-    COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
-    COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
-} comm_err_t;
-
 class DnsLookupDetails;
 typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data);
 
@@ -81,10 +65,6 @@ SQUIDCEXTERN void commSetSelect(int, unsigned int, PF *, void *, time_t);
 SQUIDCEXTERN void commResetSelect(int);
 
 SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen);
-extern void comm_write(int fd, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
-extern void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
-SQUIDCEXTERN void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data);
-extern void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
 SQUIDCEXTERN void commCallCloseHandlers(int fd);
 SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *);
 extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback);
diff --git a/src/comm/IoCallback.cc b/src/comm/IoCallback.cc
new file mode 100644 (file)
index 0000000..b7b0d75
--- /dev/null
@@ -0,0 +1,125 @@
+#include "config.h"
+#include "ClientInfo.h"
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
+#include "CommCalls.h"
+#include "fde.h"
+
+Comm::CbEntry *Comm::iocb_table;
+
+void
+Comm::CallbackTableInit()
+{
+    // XXX: convert this to a std::map<> ?
+    iocb_table = static_cast<CbEntry*>(xcalloc(Squid_MaxFD, sizeof(CbEntry)));
+    for (int pos = 0; pos < Squid_MaxFD; pos++) {
+        iocb_table[pos].fd = pos;
+        iocb_table[pos].readcb.fd = pos;
+        iocb_table[pos].readcb.type = IOCB_READ;
+        iocb_table[pos].writecb.fd = pos;
+        iocb_table[pos].writecb.type = IOCB_WRITE;
+    }
+}
+
+void
+Comm::CallbackTableDestruct()
+{
+    safe_free(iocb_table);
+}
+
+/**
+ * Configure Comm::Callback for I/O
+ *
+ * @param fd            filedescriptor
+ * @param t             IO callback type (read or write)
+ * @param cb            callback
+ * @param buf           buffer, if applicable
+ * @param func          freefunc, if applicable
+ * @param sz            buffer size
+ */
+void
+Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
+{
+    assert(!active());
+    assert(type == t);
+    assert(cb != NULL);
+
+    callback = cb;
+    buf = b;
+    freefunc = f;
+    size = sz;
+    offset = 0;
+}
+
+void
+Comm::IoCallback::selectOrQueueWrite()
+{
+#if DELAY_POOLS
+    // stand in line if there is one
+    if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
+        if (clientInfo->writeLimitingActive) {
+            quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+            clientInfo->kickQuotaQueue();
+            return;
+        }
+    }
+#endif
+
+    commSetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
+}
+
+void
+Comm::IoCallback::cancel(const char *reason)
+{
+    if (!active())
+        return;
+
+    callback->cancel(reason);
+    callback = NULL;
+    reset();
+}
+
+void
+Comm::IoCallback::reset()
+{
+    if (freefunc) {
+        freefunc(buf);
+        buf = NULL;
+        freefunc = NULL;
+    }
+    xerrno = 0;
+
+#if DELAY_POOLS
+    quotaQueueReserv = 0;
+#endif
+}
+
+// Schedule the callback call and clear the callback
+void
+Comm::IoCallback::finish(comm_err_t code, int xerrn)
+{
+    debugs(5, 3, HERE << "called for FD " << fd << " (" << code << ", " << xerrno << ")");
+    assert(active());
+
+    /* free data */
+    if (freefunc) {
+        freefunc(buf);
+        buf = NULL;
+        freefunc = NULL;
+    }
+
+    if (callback != NULL) {
+        typedef CommIoCbParams Params;
+        Params &params = GetCommParams<Params>(callback);
+        params.fd = fd;
+        params.buf = buf;
+        params.size = offset;
+        params.flag = code;
+        params.xerrno = xerrn;
+        ScheduleCallHere(callback);
+        callback = NULL;
+    }
+
+    /* Reset for next round. */
+    reset();
+}
diff --git a/src/comm/IoCallback.h b/src/comm/IoCallback.h
new file mode 100644 (file)
index 0000000..1cf423e
--- /dev/null
@@ -0,0 +1,70 @@
+#ifndef _SQUID_COMM_IOCALLBACK_H
+#define _SQUID_COMM_IOCALLBACK_H
+
+#include "config.h"
+#include "base/AsyncCall.h"
+#include "comm_err_t.h"
+
+namespace Comm {
+
+/// Type of IO callbacks the Comm layer deals with.
+typedef enum {
+    IOCB_NONE,
+    IOCB_READ,
+    IOCB_WRITE
+} iocb_type;
+
+/// Details about a particular Comm IO callback event.
+class IoCallback {
+public:
+    iocb_type type;
+    int fd;
+    AsyncCall::Pointer callback;
+    char *buf;
+    FREE *freefunc;
+    int size;
+    int offset;
+    comm_err_t errcode;
+    int xerrno;
+#if DELAY_POOLS
+    unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+#endif
+
+    bool active() const { return callback != NULL; }
+    void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz);
+
+    /// called when fd needs to write but may need to wait in line for its quota
+    void selectOrQueueWrite();
+
+    /// Actively cancel the given callback
+    void cancel(const char *reason);
+
+    /// finish the IO operation imediately and schedule the callback with the current state.
+    void finish(comm_err_t code, int xerrn);
+
+private:
+    void reset();
+};
+
+/// Entry nodes for the IO callback table: iocb_table
+/// Keyed off the FD which the event applies to.
+class CbEntry {
+public:
+    int fd;
+    IoCallback  readcb;
+    IoCallback  writecb;
+};
+
+/// Table of scheduled IO events which have yet to be processed ??
+/// Callbacks which might be scheduled in future are stored in fd_table.
+extern CbEntry *iocb_table;
+
+extern void CallbackTableInit();
+extern void CallbackTableDestruct();
+
+#define COMMIO_FD_READCB(fd)    (&Comm::iocb_table[(fd)].readcb)
+#define COMMIO_FD_WRITECB(fd)   (&Comm::iocb_table[(fd)].writecb)
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_IOCALLBACK_H */
index 09cb1c107644c513b7bae3c7973666af0895a6a1..6a2c3f38716c589fb58fc125cf15cc792e03fd2c 100644 (file)
@@ -1,13 +1,18 @@
 include $(top_srcdir)/src/Common.am
 include $(top_srcdir)/src/TestHeaders.am
 
-noinst_LTLIBRARIES = libcomm-listener.la
+noinst_LTLIBRARIES = libcomm.la
 
-## Library holding listener comm socket handlers
-libcomm_listener_la_SOURCES= \
+## Library holding comm socket handlers
+libcomm_la_SOURCES= \
        AcceptLimiter.cc \
        AcceptLimiter.h \
        ListenStateData.cc \
        ListenStateData.h \
        \
+       IoCallback.cc \
+       IoCallback.h \
+       Write.cc \
+       Write.h \
+       \
        comm_internal.h
diff --git a/src/comm/Write.cc b/src/comm/Write.cc
new file mode 100644 (file)
index 0000000..a348863
--- /dev/null
@@ -0,0 +1,146 @@
+#include "config.h"
+#if DELAY_POOLS
+#include "ClientInfo.h"
+#endif
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
+#include "fde.h"
+#include "SquidTime.h"
+#include "MemBuf.h"
+
+void
+Comm::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
+{
+    Comm::Write(fd, mb->buf, mb->size, callback, mb->freeFunc());
+}
+
+void
+Comm::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
+{
+    debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback);
+
+    /* Make sure we are open, not closing, and not writing */
+    assert(fd_table[fd].flags.open);
+    assert(!fd_table[fd].closing());
+    Comm::IoCallback *ccb = COMMIO_FD_WRITECB(fd);
+    assert(!ccb->active());
+
+    fd_table[fd].writeStart = squid_curtime;
+    /* Queue the write */
+    ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
+    ccb->selectOrQueueWrite();
+}
+
+/** Write to FD.
+ * This function is used by the lowest level of IO loop which only has access to FD numbers.
+ * We have to use the comm iocb_table to map FD numbers to waiting data.
+ * Once the write has been concluded we schedule the waiting call with success/fail results.
+ */
+void
+Comm::HandleWrite(int fd, void *data)
+{
+    Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
+    int len = 0;
+    int nleft;
+
+    assert(state->fd == fd);
+
+    PROF_start(commHandleWrite);
+    debugs(5, 5, HERE << "FD " << state->fd << ": off " <<
+           (long int) state->offset << ", sz " << (long int) state->size << ".");
+
+    nleft = state->size - state->offset;
+
+#if DELAY_POOLS
+    ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+    if (clientInfo && !clientInfo->writeLimitingActive)
+        clientInfo = NULL; // we only care about quota limits here
+
+    if (clientInfo) {
+        assert(clientInfo->selectWaiting);
+        clientInfo->selectWaiting = false;
+
+        assert(clientInfo->hasQueue());
+        assert(clientInfo->quotaPeekFd() == fd);
+        clientInfo->quotaDequeue(); // we will write or requeue below
+
+       if (nleft > 0) {
+            const int quota = clientInfo->quotaForDequed();
+            if (!quota) {  // if no write quota left, queue this fd
+                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+                clientInfo->kickQuotaQueue();
+                PROF_stop(commHandleWrite);
+                return;
+            }
+
+            const int nleft_corrected = min(nleft, quota);
+            if (nleft != nleft_corrected) {
+                debugs(5, 5, HERE << "FD " << fd << " writes only " <<
+                       nleft_corrected << " out of " << nleft);
+                nleft = nleft_corrected;
+            }
+
+        }
+    }
+#endif
+
+    /* actually WRITE data */
+    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    debugs(5, 5, HERE << "write() returns " << len);
+
+#if DELAY_POOLS
+    if (clientInfo) {
+        if (len > 0) {
+            /* we wrote data - drain them from bucket */
+            clientInfo->bucketSize -= len;
+            if (clientInfo->bucketSize < 0.0) {
+                debugs(5,1, HERE << "drained too much"); // should not happen
+                clientInfo->bucketSize = 0;
+            }
+        }
+
+        // even if we wrote nothing, we were served; give others a chance
+        clientInfo->kickQuotaQueue();
+    }
+#endif
+
+    fd_bytes(fd, len, FD_WRITE);
+    statCounter.syscalls.sock.writes++;
+    // After each successful partial write,
+    // reset fde::writeStart to the current time.
+    fd_table[fd].writeStart = squid_curtime;
+
+    if (len == 0) {
+        /* Note we even call write if nleft == 0 */
+        /* We're done */
+        if (nleft != 0)
+            debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
+
+        state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+    } else if (len < 0) {
+        /* An error */
+        if (fd_table[fd].flags.socket_eof) {
+            debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+            state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+        } else if (ignoreErrno(errno)) {
+            debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+            state->selectOrQueueWrite();
+        } else {
+            debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+            state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+        }
+    } else {
+        /* A successful write, continue */
+        state->offset += len;
+
+        if (state->offset < state->size) {
+            /* Not done, reinstall the write handler and write some more */
+            state->selectOrQueueWrite();
+        } else {
+            state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
+        }
+    }
+
+    PROF_stop(commHandleWrite);
+}
diff --git a/src/comm/Write.h b/src/comm/Write.h
new file mode 100644 (file)
index 0000000..420b760
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef _SQUID_COMM_IOWRITE_H
+#define _SQUID_COMM_IOWRITE_H
+
+#include "base/AsyncCall.h"
+
+namespace Comm {
+
+/**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ *
+ * free_func is used to free the passed buffer when the write has completed.
+ */
+void Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
+
+/**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ */
+void Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
+
+/// Cancel the write pending on FD. No action if none pending.
+void WriteCancel(int fd, const char *reason);
+
+// callback handler to process an FD which is available for writing.
+extern PF HandleWrite;
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_IOWRITE_H */
diff --git a/src/comm_err_t.h b/src/comm_err_t.h
new file mode 100644 (file)
index 0000000..1cad2a3
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef _SQUID_COMM_COMM_ERR_T_H
+#define _SQUID_COMM_COMM_ERR_T_H
+
+#include "config.h"
+
+typedef enum {
+    COMM_OK = 0,
+    COMM_ERROR = -1,
+    COMM_NOMESSAGE = -3,
+    COMM_TIMEOUT = -4,
+    COMM_SHUTDOWN = -5,
+    COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
+    COMM_INPROGRESS = -7,
+    COMM_ERR_CONNECT = -8,
+    COMM_ERR_DNS = -9,
+    COMM_ERR_CLOSING = -10,
+    COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+    COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
+} comm_err_t;
+
+#endif /* _SQUID_COMM_COMM_ERR_T_H */
index da97ef7d91390c52ff068fed0d19ac2567eaec78..c6fe4a04f4d0b77357c610d8ae5e33e73b5538ca 100644 (file)
@@ -39,6 +39,7 @@
 #include "SquidTime.h"
 #include "Store.h"
 #include "comm.h"
+#include "comm/Write.h"
 #include "fde.h"
 #include "ip/tools.h"
 #include "MemBuf.h"
@@ -767,7 +768,10 @@ idnsDoSendQueryVC(nsvc *vc)
 
     commSetTimeout(vc->fd, Config.Timeout.idns_query, NULL, NULL);
 
-    comm_write_mbuf(vc->fd, mb, idnsSentQueryVC, vc);
+    AsyncCall::Pointer call = commCbCall(78, 5, "idnsSentQueryVC",
+                                         CommIoCbPtrFun(&idnsSentQueryVC, vc));
+
+    Comm::Write(vc->fd, mb, call);
 
     delete mb;
 }
index cc8acc96883758f557780df1447ffb1c8f286594..65ccb49495a4fc74631add457ad4f1a2d5ee12bf 100644 (file)
@@ -32,7 +32,7 @@
  *
  */
 #include "config.h"
-
+#include "comm/Write.h"
 #include "errorpage.h"
 #include "auth/UserRequest.h"
 #include "SquidTime.h"
@@ -461,7 +461,9 @@ errorSend(int fd, ErrorState * err)
     rep = err->BuildHttpReply();
 
     MemBuf *mb = rep->pack();
-    comm_write_mbuf(fd, mb, errorSendComplete, err);
+    AsyncCall::Pointer call = commCbCall(78, 5, "errorSendComplete",
+                                         CommIoCbPtrFun(&errorSendComplete, err));
+    Comm::Write(fd, mb, call);
     delete mb;
 
     delete rep;
index 8973d26624da95df32ffd117ea55113f323ced6c..1598a9861ea86dbf41f4bfd4fd015ff5e28d1851 100644 (file)
@@ -34,6 +34,7 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "comm/Write.h"
 #include "comm/ListenStateData.h"
 #include "compat/strtoll.h"
 #include "ConnectionDetail.h"
@@ -1531,10 +1532,7 @@ FtpStateData::writeCommand(const char *buf)
     typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(9, 5,
                                           Dialer, this, FtpStateData::ftpWriteCommandCallback);
-    comm_write(ctrl.fd,
-               ctrl.last_command,
-               strlen(ctrl.last_command),
-               call);
+    Comm::Write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), call, NULL);
 
     scheduleReadControlReply(0);
 }
index 26e232a268d30174ce1f2e0941b334f92f0e66f1..5023d83e9e8f5813e6d01ba7ee71990513688d34 100644 (file)
@@ -34,6 +34,7 @@
  */
 
 #include "squid.h"
+#include "comm/Write.h"
 #include "errorpage.h"
 #include "Store.h"
 #include "html_quote.h"
@@ -985,7 +986,9 @@ gopherSendRequest(int fd, void *data)
     }
 
     debugs(10, 5, "gopherSendRequest: FD " << fd);
-    comm_write(fd, buf, strlen(buf), gopherSendComplete, gopherState, NULL);
+    AsyncCall::Pointer call = commCbCall(5,5, "gopherSendComplete",
+                                         CommIoCbPtrFun(gopherSendComplete, gopherState));
+    Comm::Write(fd, buf, strlen(buf), call, NULL);
 
     if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE))
         gopherState->entry->setPublicKey();    /* Make it public */
index 04ada1d1ea73495b163e2ba5effc1d26d6b16047..e0a69984602fb2bd4002faaba530ffe4ff936652 100644 (file)
@@ -33,6 +33,7 @@
  */
 
 #include "squid.h"
+#include "comm/Write.h"
 #include "helper.h"
 #include "SquidMath.h"
 #include "SquidTime.h"
@@ -1188,11 +1189,9 @@ helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerr
         srv->writebuf = srv->wqueue;
         srv->wqueue = new MemBuf;
         srv->flags.writing = 1;
-        comm_write(srv->wfd,
-                   srv->writebuf->content(),
-                   srv->writebuf->contentSize(),
-                   helperDispatchWriteDone,    /* Handler */
-                   srv, NULL);                 /* Handler-data, freefunc */
+        AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+                                             CommIoCbPtrFun(helperDispatchWriteDone, srv));
+        Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
     }
 }
 
@@ -1234,11 +1233,9 @@ helperDispatch(helper_server * srv, helper_request * r)
         srv->writebuf = srv->wqueue;
         srv->wqueue = new MemBuf;
         srv->flags.writing = 1;
-        comm_write(srv->wfd,
-                   srv->writebuf->content(),
-                   srv->writebuf->contentSize(),
-                   helperDispatchWriteDone,    /* Handler */
-                   srv, NULL);                 /* Handler-data, free func */
+        AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+                                             CommIoCbPtrFun(helperDispatchWriteDone, srv));
+        Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
     }
 
     debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
@@ -1289,11 +1286,9 @@ helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r
     srv->flags.reserved = 1;
     srv->request = r;
     srv->dispatch_time = current_time;
-    comm_write(srv->wfd,
-               r->buf,
-               strlen(r->buf),
-               helperStatefulDispatchWriteDone,        /* Handler */
-               hlp, NULL);                             /* Handler-data, free func */
+    AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
+                                         CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
+    Comm::Write(srv->wfd, r->buf, strlen(r->buf), call, NULL);
     debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
            hlp->id_name << " #" << srv->index + 1 << ", " <<
            (int) strlen(r->buf) << " bytes");
index 37f8996f3a1edc6aaac7571ab96cc3219e9d9ff9..2595c8516074315298917b61d43d58298b58f903 100644 (file)
 #include "base/AsyncJobCalls.h"
 #include "base/TextException.h"
 #include "base64.h"
+#include "comm/Write.h"
 #if DELAY_POOLS
 #include "DelayPools.h"
 #endif
 #include "errorpage.h"
-#include "fde.h"
 #include "http.h"
 #include "HttpControlMsg.h"
 #include "HttpHdrContRange.h"
@@ -2139,7 +2139,7 @@ HttpStateData::sendRequest()
     request->peer_host=_peer?_peer->host:NULL;
     buildRequestPrefix(request, orig_request, entry, &mb);
     debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf);
-    comm_write_mbuf(fd, &mb, requestSender);
+    Comm::Write(fd, &mb, requestSender);
 
     return true;
 }
@@ -2226,7 +2226,7 @@ HttpStateData::finishingBrokenPost()
     typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
     requestSender = JobCallback(11,5,
                                 Dialer, this, HttpStateData::wroteLast);
-    comm_write(fd, "\r\n", 2, requestSender);
+    Comm::Write(fd, "\r\n", 2, requestSender, NULL);
     return true;
 #else
     return false;
@@ -2248,7 +2248,7 @@ HttpStateData::finishingChunkedRequest()
     typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
     requestSender = JobCallback(11,5,
                                 Dialer, this, HttpStateData::wroteLast);
-    comm_write(fd, "0\r\n\r\n", 5, requestSender);
+    Comm::Write(fd, "0\r\n\r\n", 5, requestSender, NULL);
     return true;
 }
 
index 50f7a734e042551a7335b52a1d3bb83024435141..1f9292c748d78ade8f8eb941167648bea90dea94 100644 (file)
@@ -37,6 +37,7 @@
 #if USE_IDENT
 
 #include "comm.h"
+#include "comm/Write.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "MemBuf.h"
@@ -149,7 +150,9 @@ Ident::ConnectDone(int fd, const DnsLookupDetails &, comm_err_t status, int xerr
     mb.Printf("%d, %d\r\n",
               state->my_peer.GetPort(),
               state->me.GetPort());
-    comm_write_mbuf(fd, &mb, NULL, state);
+
+    AsyncCall::Pointer nil;
+    Comm::Write(fd, &mb, nil);
     comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state);
     commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state);
 }
index 92383e790a19496a0d7a159c6932c57f53c3d1ff..873450574a8b0b667851cba9e1d675b2254379d2 100644 (file)
@@ -9,6 +9,7 @@
 #include "config.h"
 #include "comm.h"
 #include "CommCalls.h"
+#include "comm/Write.h"
 #include "base/TextException.h"
 #include "ipc/UdsOp.h"
 
@@ -106,7 +107,7 @@ void Ipc::UdsSender::write()
     typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
     AsyncCall::Pointer writeHandler = JobCallback(54, 5,
                                       Dialer, this, UdsSender::wrote);
-    comm_write(fd(), message.raw(), message.size(), writeHandler);
+    Comm::Write(fd(), message.raw(), message.size(), writeHandler, NULL);
     writing = true;
 }
 
index 3f9dc27e5ec5494493d1a91bcff1b8c90776e120..d249c3a9136948cf1987ab7ad4dd416fc39667c2 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "config.h"
 #include "base/TextException.h"
+#include "comm/Write.h"
 #include "CommCalls.h"
 #include "HttpReply.h"
 #include "ipc/Coordinator.h"
@@ -90,7 +91,7 @@ Mgr::Inquirer::start()
     std::auto_ptr<MemBuf> replyBuf(reply->pack());
     writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
                        CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
-    comm_write_mbuf(fd, replyBuf.get(), writer);
+    Comm::Write(fd, replyBuf.get(), writer);
 }
 
 /// called when we wrote the response header
index 6eb3d22d75e80592d431fbb9632bcfa30f1cb7c9..12273337687ad01d0f444cf740792beda67e4aed 100644 (file)
@@ -8,6 +8,7 @@
 #include "config.h"
 #include "base/TextException.h"
 #include "CommCalls.h"
+#include "comm/Write.h"
 #include "ipc/FdNotes.h"
 #include "mgr/StoreToCommWriter.h"
 #include "StoreClient.h"
@@ -108,7 +109,7 @@ Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
     AsyncCall::Pointer writer =
         asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
                   MyDialer(this, &StoreToCommWriter::noteCommWrote));
-    comm_write(fd, ioBuf.data, ioBuf.length, writer);
+    Comm::Write(fd, ioBuf.data, ioBuf.length, writer, NULL);
 }
 
 void
index 0221003ec5a5f0a5f63e16f334a99ad7aa63044d..a501859f73c8a7e52a2ad6c4cdcd11724d6a893e 100644 (file)
@@ -38,6 +38,7 @@
 #include "HttpRequest.h"
 #include "fde.h"
 #include "comm.h"
+#include "comm/Write.h"
 #include "client_side_request.h"
 #include "acl/FilledChecklist.h"
 #if DELAY_POOLS
@@ -321,8 +322,11 @@ TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &f
         if (from.len == 0 && !fd_closed(to.fd()) ) {
             comm_close(to.fd());
         }
-    } else if (cbdataReferenceValid(this))
-        comm_write(to.fd(), from.buf, len, completion, this, NULL);
+    } else if (cbdataReferenceValid(this)) {
+        AsyncCall::Pointer call = commCbCall(5,5, "SomeTunnelWriteHandler",
+                                         CommIoCbPtrFun(completion, this));
+        Comm::Write(to.fd(), from.buf, len, call, NULL);
+    }
 
     cbdataInternalUnlock(this);        /* ??? */
 }
@@ -531,8 +535,9 @@ tunnelConnected(int fd, void *data)
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState);
     *tunnelState->status_ptr = HTTP_OK;
-    comm_write(tunnelState->client.fd(), conn_established, strlen(conn_established),
-               tunnelConnectedWriteDone, tunnelState, NULL);
+    AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
+                                         CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
+    Comm::Write(tunnelState->client.fd(), conn_established, strlen(conn_established), call, NULL);
 }
 
 static void
@@ -742,7 +747,10 @@ tunnelProxyConnected(int fd, void *data)
     packerClean(&p);
     mb.append("\r\n", 2);
 
-    comm_write_mbuf(tunnelState->server.fd(), &mb, tunnelProxyConnectedWriteDone, tunnelState);
+    AsyncCall::Pointer call = commCbCall(5,5, "tunnelProxyConnectedWriteDone",
+                                         CommIoCbPtrFun(tunnelProxyConnectedWriteDone, tunnelState));
+
+    Comm::Write(tunnelState->server.fd(), &mb, call);
     commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState);
 }
 
index d7297c485817eb5df341cf99ca239fac8fa1616b..fc300d3191d28bddf62ccdea10434d372eaa2e5c 100644 (file)
@@ -34,6 +34,7 @@
  */
 
 #include "squid.h"
+#include "comm/Write.h"
 #include "errorpage.h"
 #include "Store.h"
 #include "HttpReply.h"
@@ -101,7 +102,10 @@ whoisStart(FwdState * fwd)
     String str_print=p->request->urlpath.substr(1,p->request->urlpath.size());
     snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print));
 
-    comm_write(fd, buf, strlen(buf), whoisWriteComplete, p, NULL);
+    AsyncCall::Pointer call = commCbCall(5,5, "whoisWriteComplete",
+                                         CommIoCbPtrFun(whoisWriteComplete, p));
+
+    Comm::Write(fd, buf, strlen(buf), call, NULL);
     comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
     commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p);
 }