]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Merging async-call branch changes to HEAD:
authorrousskov <>
Wed, 13 Feb 2008 05:58:29 +0000 (05:58 +0000)
committerrousskov <>
Wed, 13 Feb 2008 05:58:29 +0000 (05:58 +0000)
Async-call work replaces event-based asynchronous calls with
stand-alone implementation. The common async call API allows Squid
core do call, debug, and troubleshoot all callback handlers in a
uniform way.

An async "job" API is introduced to manage independent logical threads
or work such as protocol transaction handlers on client, server, and
ICAP sides. These jobs should communicate with each other using async
calls to minimize dependencies and avoid reentrant callback loops.

These changes will eventually improve overall code quality, debugging
quality, and Squid robustness.

Below you will find log messages from the async-call branch that are
relevant to the file(s) being committed.

    Added initial implelentation of AsyncCall-based wrappers for
    comm callbacks. The comm layer no longer calls callbacks from
    the select loop. Instead, the select loop schedules async
    calls. Scheduled calls are then called from the main loop (like
    all other async calls), after the select loop completes.

    Removed accept loop. We cannot loop because async calls do not
    get fired during the loop and, hence, cannot register new
    callbacks for new ready FDs.

    The loop is implicit now. When the next callback is registered,
    we check whether the last accept(2) call was successful or
    OPTIMISTIC_IO is defined and call acceptNext() again if yes.
    AcceptNext() may schedule another async call (using the
    being-submitted callback) if the socket was still ready. Since
    callbacks are fired until there are no callabacks left, we
    still have an accept loop.

    Removed CommDispatcher as unused.

    Removed unused IOFCB, IOWCB, and CWCB.

    Removed class fdc_t. After CommCallbackData removal, fdc_t was
    only used to check that a FD is "active" and to check that a FD
    is half_closed.  fd_table flags.open seems to be identical to
    the "active" state flag so we use that now, via newly added
    isOpen() static function. AbortChecker already maintains
    half_closed status.

    The accept-specific functionality is still implemented by
    AcceptFD class. Removed fdc_t used to marshall accept-ralted
    calls to AcceptFD anyway. fdc_table now stores AcceptFDs
    directly. I did not rename the table to ease merging with other
    code, but added a TODO for that.

    Removed calls to comm_iocallbackpending(). They were added to
    "Speed up processing of queued events significantly, to not
    cause large delays when under low load" but I do not see how
    having pending callbacks can be relevant because all pending
    callbacks are (should be) executed before FDs are probed.

    Removed unused nullCallback() methods.

    Removed CommCallbackData and related code. It looks like it
    remained after one of the big comm rewrites, just to handle
    accept errors. We can now schedule an async call to notify of
    those errors and do not seem to need CommCallbackData at all.

    Removed commfd_completed_events: a list of completed (but not
    yet fired) callbacks. We simply schedule the async call now
    instead of maintaining our own list of callbacks to call.  This
    change allows us to eliminate the CommDispatcher class (which
    was the motivation behind these changes), but I have not done
    that yet.

    For comm_io_callback_t, being active seems to be the same as
    having a callback. Replaced active data member with a method
    that checks for the callback presence.

    Relaxed comm_read_cancel() preconditions so that the callers do
    not have to check all the assertions inside before calling that
    function.

    The CommCall code is still more complex than I want it to be.
    However, these are low-level interfaces that most developers
    will not have to deal with and the complexity will be
    significantly reduced if we get rid of old style
    function-pointer based callbacks, which is probably desirable
    for several reasons.

src/CommCalls.cc [new file with mode: 0644]
src/CommCalls.h [new file with mode: 0644]
src/Makefile.am
src/comm.cc
src/comm.h

diff --git a/src/CommCalls.cc b/src/CommCalls.cc
new file mode 100644 (file)
index 0000000..b5593ef
--- /dev/null
@@ -0,0 +1,206 @@
+#include "squid.h"
+#include "CommCalls.h"
+
+/* CommCommonCbParams */
+
+CommCommonCbParams::CommCommonCbParams(void *aData):
+    data(cbdataReference(aData)), fd(-1), xerrno(0), flag(COMM_OK) 
+{
+}
+
+CommCommonCbParams::CommCommonCbParams(const CommCommonCbParams &p):
+    data(cbdataReference(p.data)), fd(p.fd), xerrno(p.xerrno), flag(p.flag)
+{
+}
+
+CommCommonCbParams::~CommCommonCbParams()
+{
+    cbdataReferenceDone(data);
+}
+
+void
+CommCommonCbParams::print(std::ostream &os) const
+{
+    os << "FD " << fd;
+    if (xerrno)
+        os << ", errno=" << xerrno;
+    if (flag != COMM_OK)
+        os << ", flag=" << flag;
+    if (data)
+        os << ", data=" << data;
+}
+
+
+/* CommAcceptCbParams */
+
+CommAcceptCbParams::CommAcceptCbParams(void *aData): CommCommonCbParams(aData),
+    nfd(-1)
+{
+}
+
+void
+CommAcceptCbParams::print(std::ostream &os) const
+{
+    CommCommonCbParams::print(os);
+    if (nfd >= 0)
+        os << ", newFD " << nfd;
+}
+
+
+/* CommConnectCbParams */
+
+CommConnectCbParams::CommConnectCbParams(void *aData):
+    CommCommonCbParams(aData)
+{
+}
+
+
+/* CommIoCbParams */
+
+CommIoCbParams::CommIoCbParams(void *aData): CommCommonCbParams(aData),
+    buf(NULL), size(0)
+{
+}
+
+void
+CommIoCbParams::print(std::ostream &os) const
+{
+    CommCommonCbParams::print(os);
+    if (buf) {
+        os << ", size=" << size;
+        os << ", buf=" << (void*)buf;
+       }
+}
+
+
+/* CommCloseCbParams */
+
+CommCloseCbParams::CommCloseCbParams(void *aData):
+    CommCommonCbParams(aData)
+{
+}
+
+/* CommTimeoutCbParams */
+
+CommTimeoutCbParams::CommTimeoutCbParams(void *aData):
+    CommCommonCbParams(aData)
+{
+}
+
+
+/* CommAcceptCbPtrFun */
+
+CommAcceptCbPtrFun::CommAcceptCbPtrFun(IOACB *aHandler,
+    const CommAcceptCbParams &aParams):
+    CommDialerParamsT<CommAcceptCbParams>(aParams),
+    handler(aHandler)
+{
+}
+
+void
+CommAcceptCbPtrFun::dial()
+{
+    handler(params.fd, params.nfd, &params.details, params.flag, params.xerrno, params.data);
+}
+
+void
+CommAcceptCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}
+
+
+/* CommConnectCbPtrFun */
+
+CommConnectCbPtrFun::CommConnectCbPtrFun(CNCB *aHandler,
+    const CommConnectCbParams &aParams):
+    CommDialerParamsT<CommConnectCbParams>(aParams),
+    handler(aHandler)
+{
+}
+
+void
+CommConnectCbPtrFun::dial()
+{
+    handler(params.fd, params.flag, params.xerrno, params.data);
+}
+
+void
+CommConnectCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}
+
+
+/* CommIoCbPtrFun */
+
+CommIoCbPtrFun::CommIoCbPtrFun(IOCB *aHandler, const CommIoCbParams &aParams):
+    CommDialerParamsT<CommIoCbParams>(aParams),
+    handler(aHandler)
+{
+}
+
+void
+CommIoCbPtrFun::dial()
+{
+    handler(params.fd, params.buf, params.size, params.flag, params.xerrno, params.data);
+}
+
+void
+CommIoCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}
+
+
+/* CommCloseCbPtrFun */
+
+CommCloseCbPtrFun::CommCloseCbPtrFun(PF *aHandler,
+    const CommCloseCbParams &aParams):
+    CommDialerParamsT<CommCloseCbParams>(aParams),
+    handler(aHandler)
+{
+}
+
+void
+CommCloseCbPtrFun::dial()
+{
+    handler(params.fd, params.data);
+}
+
+void
+CommCloseCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}
+
+/* CommTimeoutCbPtrFun */
+
+CommTimeoutCbPtrFun::CommTimeoutCbPtrFun(PF *aHandler,
+    const CommTimeoutCbParams &aParams):
+    CommDialerParamsT<CommTimeoutCbParams>(aParams),
+    handler(aHandler)
+{
+}
+
+void
+CommTimeoutCbPtrFun::dial()
+{
+    handler(params.fd, params.data);
+}
+
+void
+CommTimeoutCbPtrFun::print(std::ostream &os) const
+{
+    os << '(';
+    params.print(os);
+    os << ')';
+}
diff --git a/src/CommCalls.h b/src/CommCalls.h
new file mode 100644 (file)
index 0000000..afb112a
--- /dev/null
@@ -0,0 +1,291 @@
+
+/*
+ * $Id: CommCalls.h,v 1.1 2008/02/12 22:58:29 rousskov Exp $
+ */
+
+#ifndef SQUID_COMMCALLS_H
+#define SQUID_COMMCALLS_H
+
+#include "comm.h"
+#include "ConnectionDetail.h"
+#include "AsyncCall.h"
+#include "AsyncJobCalls.h"
+
+/* CommCalls implement AsyncCall interface for comm_* callbacks.
+ * The classes cover two call dialer kinds:
+ *     - A C-style call using a function pointer (depricated);
+ *     - A C++-style call to an AsyncJob child.
+ * and three comm_* callback kinds:
+ *     - accept (IOACB),
+ *     - connect (CNCB),
+ *     - I/O (IOCB).
+ */
+
+ /*
+  * TODO: When there are no function-pointer-based callbacks left, all
+  * this complexity can be removed. Jobs that need comm services will just
+  * implement CommReader, CommWriter, etc. interfaces and receive calls
+  * using general (not comm-specific) AsyncCall code. For now, we have to
+  * allow the caller to create a callback that comm can modify to set
+  * parameters, which is not trivial when the caller type/kind is not
+  * known to comm and there are many kinds of parameters.
+  */
+
+
+/* Comm*CbParams classes below handle callback parameters */
+
+// Maintains parameters common to all comm callbacks
+class CommCommonCbParams {
+public:
+    CommCommonCbParams(void *aData);
+    CommCommonCbParams(const CommCommonCbParams &params);
+    ~CommCommonCbParams();
+
+    void print(std::ostream &os) const;
+
+public:
+    void *data; // cbdata-protected
+    int fd;
+    int xerrno;
+    comm_err_t flag;
+
+private:
+    // should not be needed and not yet implemented
+    CommCommonCbParams &operator =(const CommCommonCbParams &params); 
+};
+
+// accept parameters
+class CommAcceptCbParams: public CommCommonCbParams {
+public:
+    CommAcceptCbParams(void *aData);
+
+    void print(std::ostream &os) const;
+
+public:
+    ConnectionDetail details;
+    int nfd; // TODO: rename to fdNew or somesuch
+};
+
+// connect parameters
+class CommConnectCbParams: public CommCommonCbParams {
+public:
+    CommConnectCbParams(void *aData);
+};
+
+// read/write (I/O) parameters
+class CommIoCbParams: public CommCommonCbParams {
+public:
+    CommIoCbParams(void *aData);
+
+    void print(std::ostream &os) const;
+
+public:
+    char *buf;
+    size_t size;
+};
+
+// close parameters
+class CommCloseCbParams: public CommCommonCbParams {
+public:
+    CommCloseCbParams(void *aData);
+};
+
+class CommTimeoutCbParams: public  CommCommonCbParams {
+public:
+    CommTimeoutCbParams(void *aData);
+};
+
+// Interface to expose comm callback parameters of all comm dialers.
+// GetCommParams() uses this interface to access comm parameters.
+template <class Params_>
+class CommDialerParamsT {
+public:
+    typedef Params_ Params;
+    CommDialerParamsT(const Params &io): params(io) {}
+
+public:
+    Params params;
+};
+
+// Get comm params of an async comm call
+template <class Params>
+Params &GetCommParams(AsyncCall::Pointer &call) { 
+       typedef CommDialerParamsT<Params> DialerParams;
+    DialerParams *dp = dynamic_cast<DialerParams*>(call->getDialer());
+    assert(dp);
+    return dp->params;
+}
+
+
+// All job dialers with comm parameters are merged into one since they
+// all have exactly one callback argument and differ in Params type only
+template <class C, class Params_>
+class CommCbMemFunT: public JobDialer, public CommDialerParamsT<Params_>
+{
+public:
+    typedef Params_ Params;
+    typedef void (C::*Method)(const Params &io);
+
+    CommCbMemFunT(C *obj, Method meth): JobDialer(obj),
+        CommDialerParamsT<Params>(obj), object(obj), method(meth) {}
+
+    virtual void print(std::ostream &os) const {
+        os << '('; this->params.print(os); os << ')'; }
+
+public:
+       C *object;
+    Method method;
+
+protected:
+    virtual void doDial() { (object->*method)(this->params); }
+};
+
+
+// accept (IOACB) dialer
+class CommAcceptCbPtrFun: public CallDialer,
+    public CommDialerParamsT<CommAcceptCbParams>
+{
+public:
+    typedef CommAcceptCbParams Params;
+
+    CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
+    void dial();
+
+    virtual void print(std::ostream &os) const;
+
+public:
+    IOACB *handler;
+};
+
+// connect (CNCB) dialer
+class CommConnectCbPtrFun: public CallDialer,
+    public CommDialerParamsT<CommConnectCbParams>
+{
+public:
+    typedef CommConnectCbParams Params;
+
+    CommConnectCbPtrFun(CNCB *aHandler, const Params &aParams);
+    void dial();
+
+    virtual void print(std::ostream &os) const;
+
+public:
+    CNCB *handler;
+};
+
+
+// read/write (IOCB) dialer
+class CommIoCbPtrFun: public CallDialer,
+    public CommDialerParamsT<CommIoCbParams>
+{
+public:
+    typedef CommIoCbParams Params;
+
+    CommIoCbPtrFun(IOCB *aHandler, const Params &aParams);
+    void dial();
+
+    virtual void print(std::ostream &os) const;
+
+public:
+    IOCB *handler;
+};
+
+
+// close (PF) dialer
+class CommCloseCbPtrFun: public CallDialer,
+    public CommDialerParamsT<CommCloseCbParams>
+{
+public:
+    typedef CommCloseCbParams Params;
+
+    CommCloseCbPtrFun(PF *aHandler, const Params &aParams);
+    void dial();
+
+    virtual void print(std::ostream &os) const;
+
+public:
+    PF *handler;
+};
+
+class CommTimeoutCbPtrFun:public CallDialer,
+    public CommDialerParamsT<CommTimeoutCbParams>
+{
+public:
+    typedef CommTimeoutCbParams Params;
+
+    CommTimeoutCbPtrFun(PF *aHandler, const Params &aParams);
+    void dial();
+
+    virtual void print(std::ostream &os) const;
+
+public:
+    PF *handler;
+};
+
+// AsyncCall to comm handlers implemented as global functions.
+// The dialer is one of the Comm*CbPtrFunT above
+// TODO: Get rid of this class by moving canFire() to canDial() method
+// of dialers.
+template <class Dialer>
+class CommCbFunPtrCallT: public AsyncCall {
+public:
+    typedef typename Dialer::Params Params;
+
+    inline CommCbFunPtrCallT(int debugSection, int debugLevel,
+        const char *callName, const Dialer &aDialer);
+
+    virtual CallDialer* getDialer() { return &dialer; }
+
+public:
+    Dialer dialer;
+
+protected:
+    inline virtual bool canFire();
+    inline virtual void fire();
+};
+
+// Conveninece wrapper: It is often easier to call a templated function than
+// to create a templated class.
+template <class Dialer>
+inline
+CommCbFunPtrCallT<Dialer> *commCbCall(int debugSection, int debugLevel,
+    const char *callName, const Dialer &dialer)
+{
+    return new CommCbFunPtrCallT<Dialer>(debugSection, debugLevel, callName,
+        dialer);
+}
+
+/* inlined implementation of templated methods */
+
+/* CommCbFunPtrCallT */
+
+template <class Dialer>
+CommCbFunPtrCallT<Dialer>::CommCbFunPtrCallT(int debugSection, int debugLevel,
+    const char *callName, const Dialer &aDialer): 
+        AsyncCall(debugSection, debugLevel, callName),
+        dialer(aDialer)
+{
+}
+
+
+template <class Dialer>
+bool
+CommCbFunPtrCallT<Dialer>::canFire()
+{
+    if (!AsyncCall::canFire())
+        return false;
+
+    if (!cbdataReferenceValid(dialer.params.data))
+        return cancel("callee gone");
+
+    return true;
+}
+
+template <class Dialer>
+void
+CommCbFunPtrCallT<Dialer>::fire()
+{
+    dialer.dial();
+}
+
+#endif /* SQUID_COMMCALLS_H */
index 6d65e87db6270879d0c8718e2faef222bff562bd..11d5935f54610f986f91c633896d4fad44d7a1f1 100644 (file)
@@ -1,7 +1,7 @@
 #
 #  Makefile for the Squid Object Cache server
 #
-#  $Id: Makefile.am,v 1.199 2008/02/11 22:24:38 rousskov Exp $
+#  $Id: Makefile.am,v 1.200 2008/02/12 22:58:29 rousskov Exp $
 #
 #  Uncomment and customize the following to suit your needs:
 #
@@ -403,8 +403,14 @@ squid_COMMSOURCES = \
 libsquid_la_SOURCES = \
        comm.cc \
        comm.h \
+       CommCalls.cc \
+       CommCalls.h \
        IPInterception.cc \
-       IPInterception.h
+       IPInterception.h \
+        ICAP/AsyncJob.cc \
+        ICAP/AsyncJob.h \
+        TextException.cc \
+        TextException.h
 
 # authentication framework
 libauth_la_SOURCES = \
@@ -425,8 +431,11 @@ squid_SOURCES = \
        ACLChecklist.h \
        $(squid_ACLSOURCES) \
        asn.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        AsyncCall.h \
+       AsyncJobCalls.h \
        AsyncEngine.cc \
        AsyncEngine.h \
        authenticate.cc \
@@ -619,8 +628,6 @@ squid_SOURCES = \
        structs.h \
        SwapDir.cc \
        SwapDir.h \
-       TextException.cc \
-       TextException.h \
        time.cc \
        tools.cc \
        tunnel.cc \
@@ -691,8 +698,6 @@ squid_DEPENDENCIES = $(top_builddir)/lib/libmiscutil.a \
        @ICAP_LIBS@
 
 ICAP_libicap_a_SOURCES = \
-       ICAP/AsyncJob.cc \
-       ICAP/AsyncJob.h \
        ICAP/ICAPClient.cc \
        ICAP/ICAPClient.h \
        ICAP/ICAPInitiator.cc \
@@ -769,6 +774,8 @@ ufsdump_SOURCES = \
        time.cc \
        ufsdump.cc \
        url.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        AsyncCall.h \
        BodyPipe.cc \
@@ -898,8 +905,6 @@ ufsdump_SOURCES = \
        store_swapout.cc \
        structs.h \
        SwapDir.cc \
-       TextException.cc \
-       TextException.h \
        tools.cc \
        typedefs.h \
        $(UNLINKDSOURCE) \
@@ -1328,6 +1333,8 @@ tests_testCacheManager_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -1429,7 +1436,6 @@ tests_testCacheManager_SOURCES = \
        StoreMetaURL.cc \
        StoreMetaVary.cc \
        StoreSwapLogData.cc \
-       TextException.cc \
        tools.cc \
        tunnel.cc \
        SwapDir.cc \
@@ -1498,6 +1504,8 @@ tests_testEvent_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -1598,7 +1606,6 @@ tests_testEvent_SOURCES = \
        StoreMetaURL.cc \
        StoreMetaVary.cc \
        StoreSwapLogData.cc \
-       TextException.cc \
        tools.cc \
        tunnel.cc \
        SwapDir.cc \
@@ -1654,6 +1661,8 @@ tests_testEventLoop_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -1754,7 +1763,6 @@ tests_testEventLoop_SOURCES = \
        StoreMetaURL.cc \
        StoreMetaVary.cc \
        StoreSwapLogData.cc \
-       TextException.cc \
        tools.cc \
        tunnel.cc \
        SwapDir.cc \
@@ -1834,6 +1842,8 @@ tests_test_http_range_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -1940,7 +1950,6 @@ tests_test_http_range_SOURCES = \
        StoreSwapLogData.cc \
        String.cc \
        SwapDir.cc \
-       TextException.cc \
        time.cc \
        tools.cc \
        tunnel.cc \
@@ -1997,6 +2006,8 @@ tests_testHttpRequest_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -2098,7 +2109,6 @@ tests_testHttpRequest_SOURCES = \
        StoreMetaURL.cc \
        StoreMetaVary.cc \
        StoreSwapLogData.cc \
-       TextException.cc \
        tools.cc \
        tunnel.cc \
        SwapDir.cc \
@@ -2357,6 +2367,8 @@ tests_testURL_SOURCES = \
        ACLStringData.cc \
        ACLRegexData.cc \
        ACLUserData.cc \
+       AsyncCallQueue.cc \
+       AsyncCallQueue.h \
        AsyncCall.cc \
        authenticate.cc \
        BodyPipe.cc \
@@ -2457,7 +2469,6 @@ tests_testURL_SOURCES = \
        StoreMetaURL.cc \
        StoreMetaVary.cc \
        StoreSwapLogData.cc \
-       TextException.cc \
        tools.cc \
        tunnel.cc \
        SwapDir.cc \
index 8e7789065b06dac7fcf7ab5ce8d32ebd728673c2..d0b038d35316ebb77f2a3d0345d420ac5e303654 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: comm.cc,v 1.444 2008/02/11 22:30:10 rousskov Exp $
+ * $Id: comm.cc,v 1.445 2008/02/12 22:58:29 rousskov Exp $
  *
  * DEBUG: section 5     Socket Functions
  * AUTHOR: Harvest Derived
@@ -46,6 +46,7 @@
 #include "MemBuf.h"
 #include "pconn.h"
 #include "SquidTime.h"
+#include "CommCalls.h"
 #include "IPAddress.h"
 
 #if defined(_SQUID_CYGWIN_)
@@ -65,22 +66,19 @@ typedef enum {
        IOCB_WRITE
 } iocb_type;
 
-struct _comm_io_callback {
+struct comm_io_callback_t {
        iocb_type type;
        int fd;
-       IOCB *callback;
-       void *callback_data;
+       AsyncCall::Pointer callback;
        char *buf;
        FREE *freefunc;
        int size;
        int offset;
-       bool active;
-       bool completed;
        comm_err_t errcode;
        int xerrno;
-       dlink_node node;
+
+       bool active() const { return callback != NULL; }
 };
-typedef struct _comm_io_callback comm_io_callback_t;
 
 struct _comm_fd {
        int fd;
@@ -90,18 +88,17 @@ struct _comm_fd {
 typedef struct _comm_fd comm_fd_t;
 comm_fd_t *commfd_table;
 
-dlink_list commfd_completed_events;
-
+// TODO: make this a comm_io_callback_t method?
 bool
 commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
 {
        assert(ccb->fd == fd);
        assert(ccb->type == type);
-       return ccb->active == true;
+       return ccb->active();
 }
 
 /*
- * Set the given handler and mark active
+ * Configure comm_io_callback_t for I/O
  *
  * @param fd           filedescriptor
  * @param ccb          comm io callback
@@ -111,43 +108,56 @@ commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
  * @param freefunc     freefunc, if applicable
  * @param size         buffer size
  */
-void
-commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, IOCB *cb, void *cbdata, char *buf, FREE *freefunc, int size)
+static void
+commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
+    AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
 {
-       assert(ccb->active == false);
+       assert(!ccb->active());
        assert(ccb->type == type);
+       assert(cb != NULL);
        ccb->fd = fd;
        ccb->callback = cb;
-       ccb->callback_data = cbdataReference(cbdata);
        ccb->buf = buf;
        ccb->freefunc = freefunc;
        ccb->size = size;
-       ccb->active = true;
-       ccb->completed = false;
        ccb->offset = 0;
 }
 
 
-/*
- * Complete the callback
- *
- * Someone may have already called this function once on a non-completed callback.
- * This happens in the comm_close() routine - the IO may have completed
- * but comm_close() is called bfeore teh callback has been called.
- * In this case, leave the details the same (offset, for example) but just update
- * the error codes.
- */
-void
-commio_complete_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
+// Schedule the callback call and clear the callback
+static void
+commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
 {
-        debugs(5, 3, "commio_complete_callback: called for " << fd << " (" << code << ", " << xerrno << ")");
-       assert(ccb->active == true);
+    debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
+        code << ", " << xerrno << ")");
+       assert(ccb->active());
        assert(ccb->fd == fd);
        ccb->errcode = code;
        ccb->xerrno = xerrno;
-       if (! ccb->completed)
-               dlinkAddTail(ccb, &ccb->node, &commfd_completed_events);
-       ccb->completed = true;
+
+       comm_io_callback_t cb = *ccb;
+
+       /* We've got a copy; blow away the real one */
+       /* XXX duplicate code from commio_cancel_callback! */
+       ccb->xerrno = 0;
+       ccb->callback = NULL; // cb has it
+
+       /* free data */
+       if (cb.freefunc) {
+               cb.freefunc(cb.buf);
+               cb.buf = NULL;
+       }
+
+       if (cb.callback != NULL) {
+        typedef CommIoCbParams Params;
+        Params &params = GetCommParams<Params>(cb.callback);
+        params.fd = cb.fd;
+        params.buf = cb.buf;
+        params.size = cb.offset;
+        params.flag = cb.errcode;
+        params.xerrno = cb.xerrno;
+        ScheduleCallHere(cb.callback);
+       }
 }
 
 
@@ -156,24 +166,18 @@ commio_complete_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int x
  *
  * Remember that the data is cbdataRef'ed.
  */
-void
+// TODO: make this a comm_io_callback_t method
+static void
 commio_cancel_callback(int fd, comm_io_callback_t *ccb)
 {
-        debugs(5, 3, "commio_cancel_callback: called for " << fd);
+    debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
        assert(ccb->fd == fd);
-       assert(ccb->active == true);
-
-       if (ccb->completed == true) {
-               dlinkDelete(&ccb->node, &commfd_completed_events);
-       }
-       if (ccb->callback_data)
-               cbdataReferenceDone(ccb->callback_data);
+       assert(ccb->active());
 
        ccb->xerrno = 0;
-       ccb->active = false;
-       ccb->completed = false;
+//     delete ccb->callback;
+       ccb->callback = NULL;
        ccb->callback = NULL;
-       ccb->callback_data = NULL;
 }
 
 /*
@@ -184,43 +188,8 @@ commio_cancel_callback(int fd, comm_io_callback_t *ccb)
 void
 commio_call_callback(comm_io_callback_t *ccb)
 {
-       comm_io_callback_t cb = *ccb;
-       void *cbdata;
-       assert(cb.active == true);
-       assert(cb.completed == true);
-        debugs(5, 3, "commio_call_callback: called for " << ccb->fd);
-
-       /* We've got a copy; blow away the real one */
-       /* XXX duplicate code from commio_cancel_callback! */
-       dlinkDelete(&ccb->node, &commfd_completed_events);
-       ccb->xerrno = 0;
-       ccb->active = false;
-       ccb->completed = false;
-       ccb->callback = NULL;
-       ccb->callback_data = NULL;
-
-       /* free data */
-       if (cb.freefunc) {
-               cb.freefunc(cb.buf);
-               cb.buf = NULL;
-       }
-       if (cb.callback && cbdataReferenceValidDone(cb.callback_data, &cbdata)) {
-               /* XXX truely ugly for now! */
-               cb.callback(cb.fd, cb.buf, cb.offset, cb.errcode, cb.xerrno, cbdata);
-       }
-}
-
-void
-commio_call_callbacks(void)
-{
-       comm_io_callback_t *ccb;
-       while (commfd_completed_events.head != NULL) {
-               ccb = (comm_io_callback_t *) commfd_completed_events.head->data;
-               commio_call_callback(ccb);
-       }
 }
 
-
 class ConnectStateData
 {
 
@@ -239,7 +208,7 @@ public:
     // NP: CANNOT store the default addr:port together as it gets set/reset differently.
 
     IPAddress S;
-    CallBack<CNCB> callback;
+    AsyncCall::Pointer callback;
 
     int fd;
     int tries;
@@ -264,7 +233,6 @@ static void commSetTcpRcvbuf(int, int);
 static PF commConnectFree;
 static PF commHandleWrite;
 static IPH commConnectDnsHandle;
-static void requireOpenAndActive(int const fd);
 
 static PF comm_accept_try;
 
@@ -272,60 +240,19 @@ class AcceptFD
 {
 
 public:
-    AcceptFD() : count(0), finished_(false){}
-
-    void doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *);
-    void nullCallback();
-    void beginAccepting() {count = 0; finished(false);}
-
-    size_t acceptCount() const { return count;}
-
-    bool finishedAccepting() const;
-    CallBack<IOACB> callback;
-    bool finished() const;
-    void finished(bool);
-
-private:
-    static size_t const MAX_ACCEPT_PER_LOOP;
-    size_t count;
-    bool finished_;
-};
-
-size_t const AcceptFD::MAX_ACCEPT_PER_LOOP(10);
-
-class fdc_t
-{
-
-public:
-    void acceptOne(int fd);
-    void beginAccepting();
-    int acceptCount() const;
-    fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; }
+    AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
 
-    fdc_t(int anFD) : active(0), fd(anFD), half_closed(false)
-    {
-        CommCallbackList.head = NULL;
-        CommCallbackList.tail = NULL;
-    }
+    void subscribe(AsyncCall::Pointer &call);
+    void acceptNext();
+    void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
 
-    int active;
     int fd;
-    dlink_list CommCallbackList;
-
-    template<class P>
-    bool findCallback(P predicate);
 
-    class Accept
-    {
-
-    public:
-        AcceptFD accept;
-        ConnectionDetail connDetails;
-    };
-
-    Accept accept;
+private:
+    bool acceptOne();
 
-    bool half_closed;
+    AsyncCall::Pointer theCallback;
+    bool mayAcceptMore;
 };
 
 typedef enum {
@@ -333,82 +260,6 @@ typedef enum {
     COMM_CB_DERIVED,
 } comm_callback_t;
 
-static int CommCallbackSeqnum = 1;
-
-class CommCommonCallback
-{
-
-public:
-    CommCommonCallback() : fd (-1), errcode (COMM_OK), xerrno(0), seqnum (CommCallbackSeqnum){}
-
-    CommCommonCallback(int anFD, comm_err_t errcode, int anErrno) : fd (anFD), errcode (errcode), xerrno(anErrno), seqnum (CommCallbackSeqnum){}
-
-    int fd;
-    comm_err_t errcode;
-    int xerrno;
-    int seqnum;
-};
-
-class CommCallbackData
-{
-
-public:
-    MEMPROXY_CLASS(CommCallbackData);
-    CommCallbackData(CommCommonCallback const &);
-    virtual ~CommCallbackData() {}
-
-    virtual comm_callback_t getType() const { return COMM_CB_DERIVED; }
-
-    void callACallback();
-    void fdClosing();
-    virtual void callCallback() = 0;
-    void registerSelf();
-    void deRegisterSelf();
-    char *buf;
-    StoreIOBuffer sb;
-
-protected:
-    CommCommonCallback result;
-    friend void _comm_close(int fd, char const *file, int line);
-    friend void comm_calliocallback(void);
-
-private:
-    dlink_node fd_node;
-    dlink_node h_node;
-};
-
-MEMPROXY_CLASS_INLINE(CommCallbackData)
-
-class CommAcceptCallbackData : public CommCallbackData
-{
-
-public:
-    MEMPROXY_CLASS(CommAcceptCallbackData);
-    CommAcceptCallbackData(int const anFd, CallBack<IOACB>, comm_err_t, int, int, ConnectionDetail const &);
-    virtual void callCallback();
-
-private:
-    CallBack<IOACB> callback;
-    int newfd;
-    ConnectionDetail details;
-};
-
-MEMPROXY_CLASS_INLINE(CommAcceptCallbackData)
-
-class CommFillCallbackData : public CommCallbackData
-{
-
-public:
-    MEMPROXY_CLASS(CommFillCallbackData);
-    CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t, int);
-    virtual void callCallback();
-
-private:
-    CallBack<IOFCB> callback;
-};
-
-MEMPROXY_CLASS_INLINE(CommFillCallbackData)
-
 struct _fd_debug_t
 {
     char const *close_file;
@@ -418,112 +269,13 @@ struct _fd_debug_t
 typedef struct _fd_debug_t fd_debug_t;
 
 static MemAllocator *conn_close_pool = NULL;
-fdc_t *fdc_table = NULL;
+AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
 fd_debug_t *fdd_table = NULL;
-dlink_list CommCallbackList;
-
-
-/* New and improved stuff */
-
-CommCallbackData::CommCallbackData(CommCommonCallback const &newResults) : result (newResults)
-{
-    assert(fdc_table[result.fd].active == 1);
-    registerSelf();
-}
-
-CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack<IOACB> aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails)
-{}
-
-void
-CommCallbackData::registerSelf()
-{
-    /* Add it to the end of the list */
-    dlinkAddTail(this, &h_node, &CommCallbackList);
-
-    /* and add it to the end of the fd list */
-    dlinkAddTail(this, &fd_node, &(fdc_table[result.fd].CommCallbackList));
-}
-
-void
-CommCallbackData::deRegisterSelf()
-{
-    dlinkDelete(&h_node, &CommCallbackList);
-    dlinkDelete(&fd_node, &(fdc_table[result.fd].CommCallbackList));
-}
-
-/**
- * add an IO callback
- *
- * IO callbacks are added when we want to notify someone that some IO
- * has finished but we don't want to risk re-entering a non-reentrant
- * code block.
- */
-void
-CommAcceptCallbackData::callCallback()
-{
-    PROF_start(CommAcceptCallbackData_callCallback);
-    callback.handler(result.fd, newfd, &details, result.errcode, result.xerrno, callback.data);
-    PROF_stop(CommAcceptCallbackData_callCallback);
-}
-
-void
-CommCallbackData::fdClosing()
-{
-    result.errcode = COMM_ERR_CLOSING;
-}
-
-void
-CommCallbackData::callACallback()
-{
-    assert(fdc_table[result.fd].active == 1);
-    deRegisterSelf();
-    callCallback();
-}
-
-/**
- * call the IO callbacks
- *
- * This should be called before comm_select() so code can attempt to
- * initiate some IO.
- *
- * When io callbacks are added, they are added with the current
- * sequence number. The sequence number is incremented in this routine -
- * since callbacks are added to the _tail_ of the list, when we hit a
- * callback with a seqnum _not_ what it was when we entered this routine,    
- * we can stop.
- */
-void
-comm_calliocallback(void)
-{
-    CommCallbackData *cio;
-    int oldseqnum = CommCallbackSeqnum++;
-
-    /* Call our callbacks until we hit NULL or the seqnum changes */
-
-    /* This will likely rap other counts - again, thats ok (for now)
-     * What we should see is the total of the various callback subclasses
-     * equaling this counter.
-     * If they don't, someone has added a class but not profiled it.
-     */
-    PROF_start(comm_calliocallback);
-
-    debugs(5, 7, "comm_calliocallback: " << CommCallbackList.head);
-
-    while (CommCallbackList.head != NULL && oldseqnum != ((CommCallbackData *)CommCallbackList.head->data)->result.seqnum) {
-        dlink_node *node = (dlink_node *)CommCallbackList.head;
-        cio = (CommCallbackData *)node->data;
-        cio->callACallback();
-        delete cio;
-    }
-
-    PROF_stop(comm_calliocallback);
-}
 
-bool
-comm_iocallbackpending(void)
+static bool
+isOpen(const int fd)
 {
-    debugs(5, 7, "comm_iocallbackpending: " << CommCallbackList.head);
-    return (CommCallbackList.head != NULL) || (commfd_completed_events.head != NULL);
+   return fd_table[fd].flags.open != 0;
 }
 
 /**
@@ -549,7 +301,7 @@ commHandleRead(int fd, void *data)
     if (retval < 0 && !ignoreErrno(errno)) {
         debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
        ccb->offset = 0;
-       commio_complete_callback(fd, ccb, COMM_ERROR, errno);
+       commio_finish_callback(fd, ccb, COMM_ERROR, errno);
         return;
     };
 
@@ -557,8 +309,8 @@ commHandleRead(int fd, void *data)
     /* Note - read 0 == socket EOF, which is a valid read */
     if (retval >= 0) {
         fd_bytes(fd, retval, FD_READ);
-       ccb->offset = retval;
-       commio_complete_callback(fd, ccb, COMM_OK, errno);
+       ccb->offset = retval;    
+       commio_finish_callback(fd, ccb, COMM_OK, errno);
         return;
     }
 
@@ -572,16 +324,25 @@ commHandleRead(int fd, void *data)
  */
 void
 comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
+{
+    AsyncCall::Pointer call = commCbCall(5,4, "SomeCommReadHandler",
+                                        CommIoCbPtrFun(handler, handler_data));
+    comm_read(fd, buf, size, call);
+}
+
+void
+comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
 {
     /* Make sure we're not reading anything and we're not closing */
-    assert(fdc_table[fd].active == 1);
+    assert(isOpen(fd));
     assert(!fd_table[fd].flags.closing);
 
     debugs(5, 4, "comm_read, queueing read for FD " << fd);
 
     /* Queue the read */
     /* XXX ugly */
-    commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), handler, handler_data, (char *)buf, NULL, size);
+    commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd),
+        callback, (char *)buf, NULL, size);
     commSetSelect(fd, COMM_SELECT_READ, commHandleRead, COMMIO_FD_READCB(fd), 0);
 }
 
@@ -606,12 +367,6 @@ comm_empty_os_read_buffers(int fd)
 #endif
 }
 
-static void
-requireOpenAndActive(int const fd)
-{
-    assert(fd_table[fd].flags.open == 1);
-    assert(fdc_table[fd].active == 1);
-}
 
 /**
  * Return whether the FD has a pending completed callback.
@@ -619,97 +374,104 @@ requireOpenAndActive(int const fd)
 int
 comm_has_pending_read_callback(int fd)
 {
-    requireOpenAndActive(fd);
-    return COMMIO_FD_READCB(fd)->active && COMMIO_FD_READCB(fd)->completed;
-}
-
-template <class P>
-bool
-fdc_t::findCallback(P predicate)
-{
-    /*
-     * XXX I don't like having to walk the list!
-     * Instead, if this routine is called often enough, we should
-     * also maintain a linked list of _read_ events - we can just
-     * check if the list head a HEAD..
-     * - adrian
-     */
-    dlink_node *node = CommCallbackList.head;
-
-    while (node != NULL) {
-        if (predicate((CommCallbackData *)node->data))
-            return true;
-
-        node = node->next;
-    }
-
-    /* Not found */
+    assert(isOpen(fd));
+    // XXX: We do not know whether there is a read callback scheduled.
+    // This is used for pconn management that should probably be more
+    // tightly integrated into comm to minimize the chance that a
+    // closing pconn socket will be used for a new transaction.
     return false;
 }
 
-/**
- * return whether a file descriptor has a read handler
- *
- * Assumptions: the fd is open
- *             the fd is a comm fd.
- *
- * Again - is this "pending read", or "pending completed event", or what?
- * I'll assume its pending read, not pending completed.
- *
- * This makes no sense though - if this is called to check whether there's
- * a pending read -before- submitting a read then it won't matter whether
- * its completed or not! Ie:
- *
- * + if there's no read and you want to schedule one; fine.
- * + if a read has completed then the callback block has been deactivated before
- *   the callback is called - if something decides to register for a read
- *   callback once again it should find !active and !completed.
- * + scheduling a read event when the fd is ! active -and- completed, thats
- *   a bug
- * + like, afaict, anything else is.
- */
+// Does comm check this fd for read readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
 bool
-comm_has_pending_read(int fd)
+comm_monitors_read(int fd)
 {
-    requireOpenAndActive(fd);
-    return COMMIO_FD_READCB(fd)->active && (! COMMIO_FD_READCB(fd)->completed);
+    assert(isOpen(fd));
+    // Being active is usually the same as monitoring because we always
+    // start monitoring the FD when we configure comm_io_callback_t for I/O
+    // and we usually configure comm_io_callback_t for I/O when we starting
+    // monitoring a FD for reading. TODO: replace with commio_has_callback
+    return COMMIO_FD_READCB(fd)->active();
 }
 
 /**
  * Cancel a pending read. Assert that we have the right parameters,
  * and that there are no pending read events!
  *
+ * XXX: We do not assert that there are no pending read events and
+ * with async calls it becomes even more difficult.
+ * The whole interface should be reworked to do callback->cancel()
+ * instead of searching for places where the callback may be stored and
+ * updating the state of those places.
+ *
  * AHC Don't call the comm handlers?
  */
 void
 comm_read_cancel(int fd, IOCB *callback, void *data)
 {
-    requireOpenAndActive(fd);
+    if (!isOpen(fd)) {
+        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
+        return;
+       }
+
+    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+    // TODO: is "active" == "monitors FD"?
+    if (!cb->active()) {
+        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
+        return;
+       }
+
+    typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
+    Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
+    if (!call) {
+        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback");
+        return;
+       }
+
+    typedef CommIoCbParams Params;
+    const Params &params = GetCommParams<Params>(cb->callback);
 
     /* Ok, we can be reasonably sure we won't lose any data here! */
-    assert(COMMIO_FD_READCB(fd)->callback == callback);
-    assert(COMMIO_FD_READCB(fd)->callback_data == data);
+    assert(call->dialer.handler == callback);
+    assert(params.data == data);
 
     /* Delete the callback */
-    commio_cancel_callback(fd, COMMIO_FD_READCB(fd));
+    commio_cancel_callback(fd, cb);
 
     /* And the IO event */
     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
 }
 
-
-/**
- * Open a filedescriptor, set some sane defaults
- * XXX DPW 2006-05-30 what is the point of this?
- */
 void
-fdc_open(int fd, unsigned int type, char const *desc)
+comm_read_cancel(int fd, AsyncCall::Pointer &callback)
 {
-    assert(fdc_table[fd].active == 0);
+    callback->cancel("comm_read_cancel");
+    
+    if (!isOpen(fd)) {
+        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
+        return;
+    }
+
+    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+
+    if (!cb->active()) {
+        debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
+        return;
+    }
+
+    AsyncCall::Pointer call = cb->callback;
+    assert(call != NULL); // XXX: should never fails (active() checks for callback==NULL)
+    
+    /* Ok, we can be reasonably sure we won't lose any data here! */
+    assert(call == callback);
 
-    fdc_table[fd].active = 1;
-    fdc_table[fd].fd = fd;
-    fd_open(fd, type, desc);
+    /* Delete the callback */
+    commio_cancel_callback(fd, cb);
+
+    /* And the IO event */
+    commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
 }
 
 
@@ -755,8 +517,8 @@ comm_udp_send(int s, const void *buf, size_t len, int flags)
 bool
 comm_has_incomplete_write(int fd)
 {
-    requireOpenAndActive(fd);
-    return COMMIO_FD_WRITECB(fd)->active;
+    assert(isOpen(fd));
+    return COMMIO_FD_WRITECB(fd)->active();
 }
 
 /**
@@ -938,16 +700,13 @@ comm_openex(int sock_type,
     /* update fdstat */
     debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
 
+    assert(!isOpen(new_socket));
     fd_open(new_socket, FD_SOCKET, note);
 
     fdd_table[new_socket].close_file = NULL;
 
     fdd_table[new_socket].close_line = 0;
 
-    assert(fdc_table[new_socket].active == 0);
-
-    fdc_table[new_socket].active = 1;
-
     F = &fd_table[new_socket];
 
     F->local_addr = addr;
@@ -1023,20 +782,38 @@ ConnectStateData::operator delete (void *address)
     cbdataFree(address);
 }
 
+
+
 void
-commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
+commConnectStart(int fd, const char *host, u_short port, AsyncCall::Pointer &cb)
 {
+    debugs(cb->debugSection, cb->debugLevel, "commConnectStart: FD " << fd <<
+        ", cb " << cb << ", " << host << ":" << port); // TODO: just print *cb
+
     ConnectStateData *cs;
-    debugs(5, 3, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
     cs = new ConnectStateData;
     cs->fd = fd;
     cs->host = xstrdup(host);
     cs->default_port = port;
-    cs->callback = CallBack<CNCB>(callback, data);
+    cs->callback = cb;
+
     comm_add_close_handler(fd, commConnectFree, cs);
     ipcache_nbgethostbyname(host, commConnectDnsHandle, cs);
 }
 
+// TODO: Remove this and similar callback registration functions by replacing
+// (callback,data) parameters with an AsyncCall so that we do not have to use
+// a generic call name and debug level when creating an AsyncCall. This will
+// also cut the number of callback registration routines in half.
+void
+commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
+{
+    debugs(5, 5, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
+    AsyncCall::Pointer call = commCbCall(5,3,
+                                        "SomeCommConnectHandler", CommConnectCbPtrFun(callback, data));
+    commConnectStart(fd, host, port, call);
+}
+
 static void
 commConnectDnsHandle(const ipcache_addrs * ia, void *data)
 {
@@ -1072,15 +849,18 @@ commConnectDnsHandle(const ipcache_addrs * ia, void *data)
 void
 ConnectStateData::callCallback(comm_err_t status, int xerrno)
 {
-    debugs(5, 3, "commConnectCallback: FD " << fd << ", data " << callback.data << ", status " << status);
+    debugs(5, 3, "commConnectCallback: FD " << fd);
 
     comm_remove_close_handler(fd, commConnectFree, this);
-    CallBack<CNCB> aCallback = callback;
-    callback = CallBack<CNCB>();
     commSetTimeout(fd, -1, NULL, NULL);
 
-    if (aCallback.dataValid())
-        aCallback.handler(fd, status, xerrno, aCallback.data);
+    typedef CommConnectCbParams Params;
+    Params &params = GetCommParams<Params>(callback);
+    params.fd = fd;
+    params.flag = status;
+    params.xerrno = xerrno;
+    ScheduleCallHere(callback);
+    callback = NULL;
 
     commConnectFree(fd, this);
 }
@@ -1090,7 +870,8 @@ commConnectFree(int fd, void *data)
 {
     ConnectStateData *cs = (ConnectStateData *)data;
     debugs(5, 3, "commConnectFree: FD " << fd);
-    cs->callback = CallBack<CNCB>();
+//    delete cs->callback;
+    cs->callback = NULL;
     safe_free(cs->host);
     delete cs;
 }
@@ -1122,8 +903,10 @@ ConnectStateData::commResetFD()
     struct addrinfo *AI = NULL;
     IPAddress nul;
 
-    if (!cbdataReferenceValid(callback.data))
-        return 0;
+// XXX: do we have to check this?
+//
+//    if (!cbdataReferenceValid(callback.data))
+//        return 0;
 
     statCounter.syscalls.sock.sockets++;
 
@@ -1275,9 +1058,9 @@ ConnectStateData::connect()
         }
     }
 }
-
+/*
 int
-commSetTimeout(int fd, int timeout, PF * handler, void *data)
+commSetTimeout_old(int fd, int timeout, PF * handler, void *data)
 {
     debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
     assert(fd >= 0);
@@ -1301,6 +1084,46 @@ commSetTimeout(int fd, int timeout, PF * handler, void *data)
 
     return F->timeout;
 }
+*/
+
+int
+commSetTimeout(int fd, int timeout, PF * handler, void *data)
+{
+    AsyncCall::Pointer call;
+    debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
+    if(handler != NULL)
+       call=commCbCall(5,4, "SomeTimeoutHandler", CommTimeoutCbPtrFun(handler, data));
+    else
+       call = NULL;
+    return commSetTimeout(fd, timeout, call);
+}
+
+
+int commSetTimeout(int fd, int timeout, AsyncCall::Pointer &callback)
+{
+    debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
+    assert(fd >= 0);
+    assert(fd < Squid_MaxFD);
+    fde *F = &fd_table[fd];
+    assert(F->flags.open);
+
+    if (timeout < 0) {
+        F->timeoutHandler = NULL;
+        F->timeout = 0;
+    } else {
+        if (callback != NULL) {
+           typedef CommTimeoutCbParams Params;
+           Params &params = GetCommParams<Params>(callback);
+           params.fd = fd;
+            F->timeoutHandler = callback;
+        }
+
+        F->timeout = squid_curtime + (time_t) timeout;
+    }
+
+    return F->timeout;
+
+}
 
 int
 comm_connect_addr(int sock, const IPAddress &address)
@@ -1315,8 +1138,7 @@ comm_connect_addr(int sock, const IPAddress &address)
 
     assert(address.GetPort() != 0);
 
-    debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family <<
-                 ") Old-State=" << fdc_table[sock].active);
+    debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family << ")");
 
     address.GetAddrInfo(AI, F->sock_family);
 
@@ -1488,7 +1310,6 @@ comm_old_accept(int fd, ConnectionDetail &details)
     fd_open(sock, FD_SOCKET, "HTTP Request");
     fdd_table[sock].close_file = NULL;
     fdd_table[sock].close_line = 0;
-    fdc_table[sock].active = 1;
     fde *F = &fd_table[sock];
     details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
     F->remote_port = details.peer.GetPort();
@@ -1513,16 +1334,17 @@ commCallCloseHandlers(int fd)
     debugs(5, 5, "commCallCloseHandlers: FD " << fd);
 
     while (F->closeHandler != NULL) {
-        close_handler ch = *F->closeHandler;
-        conn_close_pool->free(F->closeHandler);        /* AAA */
-        F->closeHandler = ch.next;
-        ch.next = NULL;
-        debugs(5, 5, "commCallCloseHandlers: ch->handler=" << ch.handler << " data=" << ch.data);
-
-        if (cbdataReferenceValid(ch.data))
-            ch.handler(fd, ch.data);
-
-        cbdataReferenceDone(ch.data);
+        AsyncCall::Pointer call = F->closeHandler;
+       F->closeHandler = call->Next();
+       call->setNext(NULL);
+       // If call is not canceled schedule it for execution else ignore it
+       if(!call->canceled()){
+           debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+           typedef CommCloseCbParams Params;
+           Params &params = GetCommParams<Params>(call);
+           params.fd = fd;
+           ScheduleCallHere(call);
+       }
     }
 }
 
@@ -1591,34 +1413,48 @@ comm_reset_close(int fd)
 }
 
 void
-CommRead::nullCallback()
+CommRead::doCallback(comm_err_t errcode, int xerrno)
 {
-    callback = CallBack<IOCB>();
+    if (callback != NULL) {
+        typedef CommIoCbParams Params;
+        Params &params = GetCommParams<Params>(callback);
+        params.fd = fd;
+        params.size = 0;
+        params.flag = errcode;
+        params.xerrno = xerrno;
+        ScheduleCallHere(callback);
+        callback = NULL;
+    }
 }
 
-void
-AcceptFD::nullCallback()
+void 
+comm_close_complete(int fd, void *data)
 {
-    callback = CallBack<IOACB>();
-}
+#if USE_SSL
+    fde *F = &fd_table[fd];
 
-void
-CommRead::doCallback(comm_err_t errcode, int xerrno)
-{
-    if (callback.handler)
-        callback.handler(fd, buf, 0, errcode, xerrno, callback.data);
+    if (F->ssl) {
+        SSL_free(F->ssl);
+        F->ssl = NULL;
+    }
 
-    nullCallback();
-}
+#endif
+    fd_close(fd);              /* update fdstat */
+
+    close(fd);
+
+    if (AbortChecker::Instance().isMonitoring(fd))
+        AbortChecker::Instance().stopMonitoring(fd);
+
+    fdc_table[fd] = AcceptFD(fd);
+
+    statCounter.syscalls.sock.closes++;
+
+    /* When an fd closes, give accept() a chance, if need be */
+
+    if (fdNFree() >= RESERVED_FD)
+        AcceptLimiter::Instance().kick();
 
-void
-AcceptFD::doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *connDetails)
-{
-    if (callback.handler) {
-        CallBack<IOACB> aCallback = callback;
-        nullCallback();
-        aCallback.handler(fd, newfd, connDetails, errcode, xerrno, aCallback.data);
-    }
 }
 
 /*
@@ -1635,8 +1471,6 @@ void
 _comm_close(int fd, char const *file, int line)
 {
     fde *F = NULL;
-    dlink_node *node;
-    CommCallbackData *cio;
 
     debugs(5, 5, "comm_close: FD " << fd);
     assert(fd >= 0);
@@ -1654,7 +1488,7 @@ _comm_close(int fd, char const *file, int line)
     assert(F->flags.open);
 
     /* The following fails because ipc.c is doing calls to pipe() to create sockets! */
-    assert(fdc_table[fd].active == 1);
+    assert(isOpen(fd));
 
     assert(F->type != FD_FILE);
 
@@ -1673,27 +1507,14 @@ _comm_close(int fd, char const *file, int line)
 
     /* new-style read/write handler stuff */
     if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
-        commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
-        commio_call_callback(COMMIO_FD_WRITECB(fd));
+        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
     }
     if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
-        commio_complete_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
-        commio_call_callback(COMMIO_FD_READCB(fd));
+        commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
     }
 
     /* Do callbacks for read/accept routines, if any */
-    fdc_table[fd].accept.accept.doCallback(fd, -1, COMM_ERR_CLOSING, 0, NULL);
-
-    /* Complete (w/ COMM_ERR_CLOSING!) any pending io callbacks */
-    while (fdc_table[fd].CommCallbackList.head != NULL) {
-        node = fdc_table[fd].CommCallbackList.head;
-        cio = (CommCallbackData *)node->data;
-        assert(fd == cio->result.fd); /* just paranoid */
-        /* We're closing! */
-        cio->fdClosing();
-        cio->callACallback();
-        delete cio;
-    }
+    fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
 
     commCallCloseHandlers(fd);
 
@@ -1701,35 +1522,16 @@ _comm_close(int fd, char const *file, int line)
         F->pconn.pool->count(F->pconn.uses);
 
     comm_empty_os_read_buffers(fd);
+    
 
-#if USE_SSL
-
-    if (F->ssl) {
-        SSL_free(F->ssl);
-        F->ssl = NULL;
-    }
-
-#endif
-    fd_close(fd);              /* update fdstat */
-
-    close(fd);
-
-    fdc_table[fd].active = 0;
-
-    if (fdc_table[fd].half_closed) {
-        AbortChecker::Instance().stopMonitoring(fd);
-        fdc_table[fd].half_closed = false;
-    }
-
-    fdc_table[fd] = fdc_t(fd);
-
-    statCounter.syscalls.sock.closes++;
+    AsyncCall::Pointer call=commCbCall(5,4, "comm_close_complete",
+                                      CommCloseCbPtrFun(comm_close_complete, NULL));
+    typedef CommCloseCbParams Params;
+    Params &params = GetCommParams<Params>(call);
+    params.fd = fd;
+    ScheduleCallHere(call);
 
     PROF_stop(comm_close);
-    /* When an fd closes, give accept() a chance, if need be */
-
-    if (fdNFree() >= RESERVED_FD)
-        AcceptLimiter::Instance().kick();
 }
 
 /* Send a udp datagram to specified TO_ADDR. */
@@ -1776,48 +1578,69 @@ comm_udp_sendto(int fd,
 void
 comm_add_close_handler(int fd, PF * handler, void *data)
 {
-    close_handler *newHandler = (close_handler *)conn_close_pool->alloc();             /* AAA */
-    close_handler *c;
     debugs(5, 5, "comm_add_close_handler: FD " << fd << ", handler=" <<
            handler << ", data=" << data);
 
-    for (c = fd_table[fd].closeHandler; c; c = c->next)
-        assert(c->handler != handler || c->data != data);
+    AsyncCall::Pointer call=commCbCall(5,4, "SomeCloseHandler",
+                                         CommCloseCbPtrFun(handler, data));
+    comm_add_close_handler(fd, call);
+}
 
-    newHandler->handler = handler;
+void
+comm_add_close_handler(int fd, AsyncCall::Pointer &call)
+{
+    debugs(5, 5, "comm_add_close_handler: FD " << fd << ", AsyncCall=" << call);
 
-    newHandler->data = cbdataReference(data);
+    /*TODO:Check for a similar scheduled AsyncCall*/
+//    for (c = fd_table[fd].closeHandler; c; c = c->next)
+//        assert(c->handler != handler || c->data != data);
 
-    newHandler->next = fd_table[fd].closeHandler;
+    call->setNext(fd_table[fd].closeHandler);
 
-    fd_table[fd].closeHandler = newHandler;
+    fd_table[fd].closeHandler = call;
 }
 
+
+// remove function-based close handler
 void
 comm_remove_close_handler(int fd, PF * handler, void *data)
 {
-    assert (fdc_table[fd].active);
-    close_handler *p = NULL;
-    close_handler *last = NULL;
+    assert (isOpen(fd));
     /* Find handler in list */
     debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", handler=" <<
            handler << ", data=" << data);
 
-    for (p = fd_table[fd].closeHandler; p != NULL; last = p, p = p->next)
-        if (p->handler == handler && p->data == data)
-            break;             /* This is our handler */
+    AsyncCall::Pointer p;
+    for (p = fd_table[fd].closeHandler; p != NULL; p = p->Next()){
+        typedef CommCbFunPtrCallT<CommCloseCbPtrFun> Call;
+        const Call *call = dynamic_cast<const Call*>(p.getRaw());
+        if (!call) // method callbacks have their own comm_remove_close_handler
+            continue;
 
+        typedef CommCloseCbParams Params;
+        const Params &params = GetCommParams<Params>(p);
+        if (call->dialer.handler == handler && params.data == data)
+            break;             /* This is our handler */
+    }
     assert(p != NULL);
+    p->cancel("comm_remove_close_handler");
+}
 
-    /* Remove list entry */
-    if (last)
-        last->next = p->next;
-    else
-        fd_table[fd].closeHandler = p->next;
+// remove method-based close handler
+void
+comm_remove_close_handler(int fd, AsyncCall::Pointer &call)
+{
+    assert (isOpen(fd));
+    /* Find handler in list */
+    debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", AsyncCall=" << call);
 
-    cbdataReferenceDone(p->data);
+    // Check to see if really exist  the given AsyncCall in comm_close handlers
+    // TODO: optimize: this slow code is only needed for the assert() below
+    AsyncCall::Pointer p;
+    for (p = fd_table[fd].closeHandler; p != NULL && p != call; p = p->Next());
+    assert(p == call);
 
-    conn_close_pool->free(p);
+    call->cancel("comm_remove_close_handler");
 }
 
 static void
@@ -1994,12 +1817,13 @@ void
 comm_init(void) {
     fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
     fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
-    fdc_table = new fdc_t[Squid_MaxFD];
-    commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
 
+    fdc_table = new AcceptFD[Squid_MaxFD];
     for (int pos = 0; pos < Squid_MaxFD; ++pos) {
-        fdc_table[pos] = fdc_t(pos);
+        fdc_table[pos] = AcceptFD(pos);
     }
+
+    commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
     for (int pos = 0; pos < Squid_MaxFD; pos++) {
        commfd_table[pos].fd = pos;
        commfd_table[pos].readcb.fd = pos;
@@ -2054,13 +1878,13 @@ commHandleWrite(int fd, void *data) {
         if (nleft != 0)
             debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
 
-        commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
     } else if (len < 0) {
         /* An error */
 
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
         } else if (ignoreErrno(errno)) {
             debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
             commSetSelect(fd,
@@ -2070,7 +1894,7 @@ commHandleWrite(int fd, void *data) {
                           0);
         } else {
             debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
         }
     } else {
         /* A successful write, continue */
@@ -2084,7 +1908,7 @@ commHandleWrite(int fd, void *data) {
                           state,
                           0);
         } else {
-            commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
+            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
         }
     }
 
@@ -2099,10 +1923,19 @@ commHandleWrite(int fd, void *data) {
  */
 void
 comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
+{
+    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
+        CommIoCbPtrFun(handler, handler_data));
+
+    comm_write(fd, buf, size, call, free_func);
+}
+
+void
+comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
 {
     assert(!fd_table[fd].flags.closing);
 
-    debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": hndl " << handler << ": data " << handler_data << ".");
+    debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback  << ".");
 
     if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
         /* This means that the write has been scheduled, but has not
@@ -2110,17 +1943,24 @@ comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data
          */
         fatalf ("comm_write: fd %d: pending callback!\n", fd);
     }
-    /* XXX ugly */
-    commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), handler, handler_data, (char *)buf, free_func, size);
+
+    commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd),
+        callback, (char *)buf, free_func, size);
     commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, COMMIO_FD_WRITECB(fd), 0);
 }
 
+
 /* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
 void
 comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) {
     comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
 }
 
+void
+comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback) {
+    comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
+}
+
 
 /*
  * hm, this might be too general-purpose for all the places we'd
@@ -2172,14 +2012,11 @@ commCloseAllSockets(void) {
         if (F->flags.ipc)      /* don't close inter-process sockets */
             continue;
 
-        if (F->timeout_handler) {
-            PF *callback = F->timeout_handler;
-            void *cbdata = NULL;
-            F->timeout_handler = NULL;
+        if (F->timeoutHandler != NULL) {
+            AsyncCall::Pointer callback = F->timeoutHandler;
+            F->timeoutHandler = NULL;
             debugs(5, 5, "commCloseAllSockets: FD " << fd << ": Calling timeout handler");
-
-            if (cbdataReferenceValidDone(F->timeout_data, &cbdata))
-                callback(fd, cbdata);
+           ScheduleCallHere(callback);
         } else {
             debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_close()");
             comm_close(fd);
@@ -2205,7 +2042,7 @@ void
 checkTimeouts(void) {
     int fd;
     fde *F = NULL;
-    PF *callback;
+    AsyncCall::Pointer callback;
 
     for (fd = 0; fd <= Biggest_FD; fd++) {
         F = &fd_table[fd];
@@ -2215,11 +2052,11 @@ checkTimeouts(void) {
 
             debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
 
-        if (F->timeout_handler) {
+        if (F->timeoutHandler != NULL) {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
-            callback = F->timeout_handler;
-            F->timeout_handler = NULL;
-            callback(fd, F->timeout_data);
+            callback = F->timeoutHandler;
+            F->timeoutHandler = NULL;
+           ScheduleCallHere(callback);
         } else {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
             comm_close(fd);
@@ -2268,28 +2105,54 @@ comm_listen(int sock) {
     return sock;
 }
 
+// AcceptFD::callback() wrapper
 void
-fdc_t::beginAccepting() {
-    accept.accept.beginAccepting();
+comm_accept(int fd, IOACB *handler, void *handler_data) {
+    debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
+    assert(isOpen(fd));
+
+    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
+        CommAcceptCbPtrFun(handler, handler_data));
+    fdc_table[fd].subscribe(call);
 }
 
-int
-fdc_t::acceptCount() const {
-    return accept.accept.acceptCount();
+void
+comm_accept(int fd, AsyncCall::Pointer &call) {
+    debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
+    assert(isOpen(fd));
+    
+    fdc_table[fd].subscribe(call);
 }
 
+// Called when somebody wants to be notified when our socket accepts new 
+// connection. We do not probe the FD until there is such interest.
 void
-fdc_t::acceptOne(int fd) {
+AcceptFD::subscribe(AsyncCall::Pointer &call) {
+    /* make sure we're not pending! */
+    assert(!theCallback);
+    theCallback = call;
+
+#if OPTIMISTIC_IO
+    mayAcceptMore = true; // even if we failed to accept last time
+#endif
+
+    if (mayAcceptMore)
+        acceptNext();
+    else
+        commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
+}
+
+bool
+AcceptFD::acceptOne() {
     // If there is no callback and we accept, we will leak the accepted FD.
     // When we are running out of FDs, there is often no callback.
-    if (!accept.accept.callback.handler) {
-        debugs(5, 5, "fdc_t::acceptOne orphaned: FD " << fd);
+    if (!theCallback) {
+        debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
         // XXX: can we remove this and similar "just in case" calls and 
         // either listen always or listen only when there is a callback?
         if (!AcceptLimiter::Instance().deferring())
             commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-        accept.accept.finished(true);
-        return;
+        return false;
     }
 
     /*
@@ -2299,54 +2162,54 @@ fdc_t::acceptOne(int fd) {
      */
 
     /* Accept a new connection */
-    int newfd = comm_old_accept(fd, accept.connDetails);
+    ConnectionDetail connDetails;
+    int newfd = comm_old_accept(fd, connDetails);
 
     /* Check for errors */
 
     if (newfd < 0) {
+        assert(theCallback != NULL);
+
         if (newfd == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, "fdc_t::acceptOne eof: FD " << fd << " handler: " << (void*)accept.accept.callback.handler);
+            debugs(5, 5, "AcceptFD::acceptOne eof: FD " << fd <<
+                " handler: " << *theCallback);
             commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-            accept.accept.finished(true);
-            return;
+            return false;
         }
 
-        /* A non-recoverable error - register an error callback */
-        new CommAcceptCallbackData(fd, accept.accept.callback, COMM_ERROR, errno, -1, accept.connDetails);
-
-        accept.accept.callback = CallBack<IOACB>();
-
-        accept.accept.finished(true);
-
-        return;
+        // A non-recoverable error; notify the caller */
+        notify(-1, COMM_ERROR, errno, connDetails);
+        return false;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd << " handler: " << (void*)accept.accept.callback.handler << " newfd: " << newfd << " from: " << accept.connDetails.peer);
-
-    assert(accept.accept.callback.handler);
-
-    accept.accept.doCallback(fd, newfd, COMM_OK, 0, &accept.connDetails);
-
-    /* If we weren't re-registed, don't bother trying again! */
-
-    if (accept.accept.callback.handler == NULL)
-        accept.accept.finished(true);
-}
-
-bool
-AcceptFD::finished() const {
-    return finished_;
+    assert(theCallback != NULL);
+    debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
+        " newfd: " << newfd << " from: " << connDetails.peer <<
+        " handler: " << *theCallback);
+    notify(newfd, COMM_OK, 0, connDetails);
+    return true;
 }
 
 void
-AcceptFD::finished(bool newValue) {
-    finished_ = newValue;
+AcceptFD::acceptNext() {
+    mayAcceptMore = acceptOne();
 }
 
-bool
-AcceptFD::finishedAccepting() const {
-    return acceptCount() >= MAX_ACCEPT_PER_LOOP || finished();
+void
+AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
+{
+    if (theCallback != NULL) {
+        typedef CommAcceptCbParams Params;
+        Params &params = GetCommParams<Params>(theCallback);
+        params.fd = fd;
+        params.nfd = newfd;
+        params.details = connDetails;
+        params.flag = errcode;
+        params.xerrno = xerrno;
+        ScheduleCallHere(theCallback);
+        theCallback = NULL;
+    }
 }
 
 /*
@@ -2354,39 +2217,9 @@ AcceptFD::finishedAccepting() const {
  * to dupe itself and fob off an accept()ed connection
  */
 static void
-comm_accept_try(int fd, void *data) {
-    assert(fdc_table[fd].active == 1);
-
-    fdc_table[fd].beginAccepting();
-
-    while (!fdc_table[fd].accept.accept.finishedAccepting())
-        fdc_table[fd].acceptOne(fd);
-}
-
-/*
- * Notes:
- * + the current interface will queue _one_ accept per io loop.
- *   this isn't very optimal and should be revisited at a later date.
- */
-void
-comm_accept(int fd, IOACB *handler, void *handler_data) {
-    debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
-    requireOpenAndActive(fd);
-
-    /* make sure we're not pending! */
-    assert(fdc_table[fd].accept.accept.callback.handler == NULL);
-
-    /* Record our details */
-    fdc_table[fd].accept.accept.callback = CallBack<IOACB> (handler, handler_data);
-
-    /* Kick off the accept */
-#if OPTIMISTIC_IO
-
-    comm_accept_try(fd, NULL);
-#else
-
-    commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-#endif
+comm_accept_try(int fd, void *) {
+    assert(isOpen(fd));
+    fdc_table[fd].acceptNext();
 }
 
 void CommIO::Initialise() {
@@ -2474,15 +2307,14 @@ AcceptLimiter::kick() {
 
 void
 commMarkHalfClosed(int fd) {
-    assert (fdc_table[fd].active && !fdc_table[fd].half_closed);
+    assert (isOpen(fd));
     AbortChecker::Instance().monitor(fd);
-    fdc_table[fd].half_closed = true;
 }
 
 int commIsHalfClosed(int fd) {
-    assert (fdc_table[fd].active);
+    assert (isOpen(fd));
 
-    return fdc_table[fd].half_closed;
+    return AbortChecker::Instance().isMonitoring(fd);
 }
 
 void
@@ -2554,6 +2386,11 @@ AbortChecker::IntCompare (int const &lhs, int const &rhs) {
     return lhs - rhs;
 }
 
+bool
+AbortChecker::isMonitoring(int fd) const {
+    return contains(fd);
+}
+
 bool
 AbortChecker::contains (int const fd) const {
     fds = fds->splay(fd, IntCompare);
@@ -2593,10 +2430,10 @@ AbortChecker::removeCheck (int const fd) {
     */
 }
 
-CommRead::CommRead() : fd(-1), buf(NULL), len(0) {}
+CommRead::CommRead() : fd(-1), buf(NULL), len(0), callback(NULL) {}
 
-CommRead::CommRead(int fd_, char *buf_, int len_, IOCB *handler_, void *data_)
-        : fd(fd_), buf(buf_), len(len_), callback(handler_, data_) {}
+CommRead::CommRead(int fd_, char *buf_, int len_, AsyncCall::Pointer &callback_)
+        : fd(fd_), buf(buf_), len(len_), callback(callback_) {}
 
 DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {}
 
@@ -2691,21 +2528,6 @@ DeferredRead::markCancelled() {
 ConnectionDetail::ConnectionDetail() : me(), peer() {
 }
 
-bool
-CommDispatcher::dispatch() {
-    bool result = comm_iocallbackpending();
-    comm_calliocallback();
-    /* and again to deal with indirectly queued events
-     * resulting from the first call. These are usually
-     * callbacks and should be dealt with immediately.
-     */
-    comm_calliocallback();
-
-    /* Adrian's *new* stuff */
-    commio_call_callbacks();
-    return result;
-}
-
 int
 CommSelectEngine::checkEvents(int timeout) {
     static time_t last_timeout = 0;
index d0a4fcdb0c34bdd08a9a4d4e7bdaa8956004ee2e..b87df719b9d892fd4da450ef251e429d85016d52 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "squid.h"
 #include "AsyncEngine.h"
-#include "CompletionDispatcher.h"
+#include "AsyncCall.h"
 #include "StoreIOBuffer.h"
 #include "Array.h"
 #include "IPAddress.h"
@@ -23,16 +23,13 @@ typedef enum {
     COMM_ERR_DNS = -9,
     COMM_ERR_CLOSING = -10,
 } comm_err_t;
-typedef void IOFCB(int fd, StoreIOBuffer receivedData, comm_err_t flag, int xerrno, void *data);
-typedef void IOWCB(int fd, char *buffer, size_t len, comm_err_t flag, int xerrno, void *data);
 
-typedef void CWCB(int fd, char *, size_t size, comm_err_t flag, void *data);
 typedef void CNCB(int fd, comm_err_t status, int xerrno, void *data);
 
 typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data);
 
+
 /* comm.c */
-extern void comm_calliocallback(void);
 extern bool comm_iocallbackpending(void); /* inline candidate */
 
 extern int comm_listen(int fd);
@@ -47,6 +44,7 @@ SQUIDCEXTERN void comm_reset_close(int fd);
 SQUIDCEXTERN void comm_lingering_close(int fd);
 #endif
 SQUIDCEXTERN void commConnectStart(int fd, const char *, u_short, CNCB *, void *);
+void commConnectStart(int fd, const char *, u_short, AsyncCall::Pointer &cb);
 
 SQUIDCEXTERN int comm_connect_addr(int sock, const IPAddress &addr);
 SQUIDCEXTERN void comm_init(void);
@@ -63,9 +61,12 @@ SQUIDCEXTERN void commResetSelect(int);
 
 SQUIDCEXTERN int comm_udp_sendto(int sock, const IPAddress &to, const void *buf, int buflen);
 extern void comm_write(int fd, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
+extern void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
 SQUIDCEXTERN void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data);
+extern void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
 SQUIDCEXTERN void commCallCloseHandlers(int fd);
 SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *);
+extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback);
 SQUIDCEXTERN int ignoreErrno(int);
 SQUIDCEXTERN void commCloseAllSockets(void);
 SQUIDCEXTERN void checkTimeouts(void);
@@ -81,14 +82,19 @@ SQUIDCEXTERN void comm_quick_poll_required(void);
 class ConnectionDetail;
 typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data);
 extern void comm_accept(int fd, IOACB *handler, void *handler_data);
+extern void comm_accept(int fd, AsyncCall::Pointer &call);
 extern void comm_add_close_handler(int fd, PF *, void *);
+extern void comm_add_close_handler(int fd, AsyncCall::Pointer &);
 extern void comm_remove_close_handler(int fd, PF *, void *);
+extern void comm_remove_close_handler(int fd, AsyncCall::Pointer &);
+
 
 extern int comm_has_pending_read_callback(int fd);
-extern bool comm_has_pending_read(int fd);
+extern bool comm_monitors_read(int fd);
 extern void comm_read(int fd, char *buf, int len, IOCB *handler, void *data);
+extern void comm_read(int fd, char *buf, int len, AsyncCall::Pointer &callback);
 extern void comm_read_cancel(int fd, IOCB *callback, void *data);
-extern void fdc_open(int fd, unsigned int type, char const *desc);
+extern void comm_read_cancel(int fd, AsyncCall::Pointer &callback);
 extern int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, IPAddress &from);
 extern int comm_udp_recv(int fd, void *buf, size_t len, int flags);
 extern ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags);
@@ -142,6 +148,8 @@ public:
      * instance with it).
      */
     static IOCB AbortCheckReader;
+
+    bool isMonitoring(int fd) const;
     void monitor (int);
     void stopMonitoring (int);
     void doIOLoop();
@@ -169,15 +177,6 @@ private:
     void removeCheck (int const);
 };
 
-/* a dispatcher for comms events */
-
-class CommDispatcher : public CompletionDispatcher
-{
-
-public:
-    virtual bool dispatch();
-};
-
 /* A comm engine that calls comm_select */
 
 class CommSelectEngine : public AsyncEngine