/*
- * $Id: comm.cc,v 1.444 2008/02/11 22:30:10 rousskov Exp $
+ * $Id: comm.cc,v 1.445 2008/02/12 22:58:29 rousskov Exp $
*
* DEBUG: section 5 Socket Functions
* AUTHOR: Harvest Derived
#include "MemBuf.h"
#include "pconn.h"
#include "SquidTime.h"
+#include "CommCalls.h"
#include "IPAddress.h"
#if defined(_SQUID_CYGWIN_)
IOCB_WRITE
} iocb_type;
-struct _comm_io_callback {
+struct comm_io_callback_t {
iocb_type type;
int fd;
- IOCB *callback;
- void *callback_data;
+ AsyncCall::Pointer callback;
char *buf;
FREE *freefunc;
int size;
int offset;
- bool active;
- bool completed;
comm_err_t errcode;
int xerrno;
- dlink_node node;
+
+ bool active() const { return callback != NULL; }
};
-typedef struct _comm_io_callback comm_io_callback_t;
struct _comm_fd {
int fd;
typedef struct _comm_fd comm_fd_t;
comm_fd_t *commfd_table;
-dlink_list commfd_completed_events;
-
+// TODO: make this a comm_io_callback_t method?
bool
commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
{
assert(ccb->fd == fd);
assert(ccb->type == type);
- return ccb->active == true;
+ return ccb->active();
}
/*
- * Set the given handler and mark active
+ * Configure comm_io_callback_t for I/O
*
* @param fd filedescriptor
* @param ccb comm io callback
* @param freefunc freefunc, if applicable
* @param size buffer size
*/
-void
-commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, IOCB *cb, void *cbdata, char *buf, FREE *freefunc, int size)
+static void
+commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
+ AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
{
- assert(ccb->active == false);
+ assert(!ccb->active());
assert(ccb->type == type);
+ assert(cb != NULL);
ccb->fd = fd;
ccb->callback = cb;
- ccb->callback_data = cbdataReference(cbdata);
ccb->buf = buf;
ccb->freefunc = freefunc;
ccb->size = size;
- ccb->active = true;
- ccb->completed = false;
ccb->offset = 0;
}
-/*
- * Complete the callback
- *
- * Someone may have already called this function once on a non-completed callback.
- * This happens in the comm_close() routine - the IO may have completed
- * but comm_close() is called bfeore teh callback has been called.
- * In this case, leave the details the same (offset, for example) but just update
- * the error codes.
- */
-void
-commio_complete_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
+// Schedule the callback call and clear the callback
+static void
+commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
{
- debugs(5, 3, "commio_complete_callback: called for " << fd << " (" << code << ", " << xerrno << ")");
- assert(ccb->active == true);
+ debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
+ code << ", " << xerrno << ")");
+ assert(ccb->active());
assert(ccb->fd == fd);
ccb->errcode = code;
ccb->xerrno = xerrno;
- if (! ccb->completed)
- dlinkAddTail(ccb, &ccb->node, &commfd_completed_events);
- ccb->completed = true;
+
+ comm_io_callback_t cb = *ccb;
+
+ /* We've got a copy; blow away the real one */
+ /* XXX duplicate code from commio_cancel_callback! */
+ ccb->xerrno = 0;
+ ccb->callback = NULL; // cb has it
+
+ /* free data */
+ if (cb.freefunc) {
+ cb.freefunc(cb.buf);
+ cb.buf = NULL;
+ }
+
+ if (cb.callback != NULL) {
+ typedef CommIoCbParams Params;
+ Params ¶ms = GetCommParams<Params>(cb.callback);
+ params.fd = cb.fd;
+ params.buf = cb.buf;
+ params.size = cb.offset;
+ params.flag = cb.errcode;
+ params.xerrno = cb.xerrno;
+ ScheduleCallHere(cb.callback);
+ }
}
*
* Remember that the data is cbdataRef'ed.
*/
-void
+// TODO: make this a comm_io_callback_t method
+static void
commio_cancel_callback(int fd, comm_io_callback_t *ccb)
{
- debugs(5, 3, "commio_cancel_callback: called for " << fd);
+ debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
assert(ccb->fd == fd);
- assert(ccb->active == true);
-
- if (ccb->completed == true) {
- dlinkDelete(&ccb->node, &commfd_completed_events);
- }
- if (ccb->callback_data)
- cbdataReferenceDone(ccb->callback_data);
+ assert(ccb->active());
ccb->xerrno = 0;
- ccb->active = false;
- ccb->completed = false;
+// delete ccb->callback;
+ ccb->callback = NULL;
ccb->callback = NULL;
- ccb->callback_data = NULL;
}
/*
void
commio_call_callback(comm_io_callback_t *ccb)
{
- comm_io_callback_t cb = *ccb;
- void *cbdata;
- assert(cb.active == true);
- assert(cb.completed == true);
- debugs(5, 3, "commio_call_callback: called for " << ccb->fd);
-
- /* We've got a copy; blow away the real one */
- /* XXX duplicate code from commio_cancel_callback! */
- dlinkDelete(&ccb->node, &commfd_completed_events);
- ccb->xerrno = 0;
- ccb->active = false;
- ccb->completed = false;
- ccb->callback = NULL;
- ccb->callback_data = NULL;
-
- /* free data */
- if (cb.freefunc) {
- cb.freefunc(cb.buf);
- cb.buf = NULL;
- }
- if (cb.callback && cbdataReferenceValidDone(cb.callback_data, &cbdata)) {
- /* XXX truely ugly for now! */
- cb.callback(cb.fd, cb.buf, cb.offset, cb.errcode, cb.xerrno, cbdata);
- }
-}
-
-void
-commio_call_callbacks(void)
-{
- comm_io_callback_t *ccb;
- while (commfd_completed_events.head != NULL) {
- ccb = (comm_io_callback_t *) commfd_completed_events.head->data;
- commio_call_callback(ccb);
- }
}
-
class ConnectStateData
{
// NP: CANNOT store the default addr:port together as it gets set/reset differently.
IPAddress S;
- CallBack<CNCB> callback;
+ AsyncCall::Pointer callback;
int fd;
int tries;
static PF commConnectFree;
static PF commHandleWrite;
static IPH commConnectDnsHandle;
-static void requireOpenAndActive(int const fd);
static PF comm_accept_try;
{
public:
- AcceptFD() : count(0), finished_(false){}
-
- void doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *);
- void nullCallback();
- void beginAccepting() {count = 0; finished(false);}
-
- size_t acceptCount() const { return count;}
-
- bool finishedAccepting() const;
- CallBack<IOACB> callback;
- bool finished() const;
- void finished(bool);
-
-private:
- static size_t const MAX_ACCEPT_PER_LOOP;
- size_t count;
- bool finished_;
-};
-
-size_t const AcceptFD::MAX_ACCEPT_PER_LOOP(10);
-
-class fdc_t
-{
-
-public:
- void acceptOne(int fd);
- void beginAccepting();
- int acceptCount() const;
- fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; }
+ AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
- fdc_t(int anFD) : active(0), fd(anFD), half_closed(false)
- {
- CommCallbackList.head = NULL;
- CommCallbackList.tail = NULL;
- }
+ void subscribe(AsyncCall::Pointer &call);
+ void acceptNext();
+ void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
- int active;
int fd;
- dlink_list CommCallbackList;
-
- template<class P>
- bool findCallback(P predicate);
- class Accept
- {
-
- public:
- AcceptFD accept;
- ConnectionDetail connDetails;
- };
-
- Accept accept;
+private:
+ bool acceptOne();
- bool half_closed;
+ AsyncCall::Pointer theCallback;
+ bool mayAcceptMore;
};
typedef enum {
COMM_CB_DERIVED,
} comm_callback_t;
-static int CommCallbackSeqnum = 1;
-
-class CommCommonCallback
-{
-
-public:
- CommCommonCallback() : fd (-1), errcode (COMM_OK), xerrno(0), seqnum (CommCallbackSeqnum){}
-
- CommCommonCallback(int anFD, comm_err_t errcode, int anErrno) : fd (anFD), errcode (errcode), xerrno(anErrno), seqnum (CommCallbackSeqnum){}
-
- int fd;
- comm_err_t errcode;
- int xerrno;
- int seqnum;
-};
-
-class CommCallbackData
-{
-
-public:
- MEMPROXY_CLASS(CommCallbackData);
- CommCallbackData(CommCommonCallback const &);
- virtual ~CommCallbackData() {}
-
- virtual comm_callback_t getType() const { return COMM_CB_DERIVED; }
-
- void callACallback();
- void fdClosing();
- virtual void callCallback() = 0;
- void registerSelf();
- void deRegisterSelf();
- char *buf;
- StoreIOBuffer sb;
-
-protected:
- CommCommonCallback result;
- friend void _comm_close(int fd, char const *file, int line);
- friend void comm_calliocallback(void);
-
-private:
- dlink_node fd_node;
- dlink_node h_node;
-};
-
-MEMPROXY_CLASS_INLINE(CommCallbackData)
-
-class CommAcceptCallbackData : public CommCallbackData
-{
-
-public:
- MEMPROXY_CLASS(CommAcceptCallbackData);
- CommAcceptCallbackData(int const anFd, CallBack<IOACB>, comm_err_t, int, int, ConnectionDetail const &);
- virtual void callCallback();
-
-private:
- CallBack<IOACB> callback;
- int newfd;
- ConnectionDetail details;
-};
-
-MEMPROXY_CLASS_INLINE(CommAcceptCallbackData)
-
-class CommFillCallbackData : public CommCallbackData
-{
-
-public:
- MEMPROXY_CLASS(CommFillCallbackData);
- CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t, int);
- virtual void callCallback();
-
-private:
- CallBack<IOFCB> callback;
-};
-
-MEMPROXY_CLASS_INLINE(CommFillCallbackData)
-
struct _fd_debug_t
{
char const *close_file;
typedef struct _fd_debug_t fd_debug_t;
static MemAllocator *conn_close_pool = NULL;
-fdc_t *fdc_table = NULL;
+AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
fd_debug_t *fdd_table = NULL;
-dlink_list CommCallbackList;
-
-
-/* New and improved stuff */
-
-CommCallbackData::CommCallbackData(CommCommonCallback const &newResults) : result (newResults)
-{
- assert(fdc_table[result.fd].active == 1);
- registerSelf();
-}
-
-CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack<IOACB> aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails)
-{}
-
-void
-CommCallbackData::registerSelf()
-{
- /* Add it to the end of the list */
- dlinkAddTail(this, &h_node, &CommCallbackList);
-
- /* and add it to the end of the fd list */
- dlinkAddTail(this, &fd_node, &(fdc_table[result.fd].CommCallbackList));
-}
-
-void
-CommCallbackData::deRegisterSelf()
-{
- dlinkDelete(&h_node, &CommCallbackList);
- dlinkDelete(&fd_node, &(fdc_table[result.fd].CommCallbackList));
-}
-
-/**
- * add an IO callback
- *
- * IO callbacks are added when we want to notify someone that some IO
- * has finished but we don't want to risk re-entering a non-reentrant
- * code block.
- */
-void
-CommAcceptCallbackData::callCallback()
-{
- PROF_start(CommAcceptCallbackData_callCallback);
- callback.handler(result.fd, newfd, &details, result.errcode, result.xerrno, callback.data);
- PROF_stop(CommAcceptCallbackData_callCallback);
-}
-
-void
-CommCallbackData::fdClosing()
-{
- result.errcode = COMM_ERR_CLOSING;
-}
-
-void
-CommCallbackData::callACallback()
-{
- assert(fdc_table[result.fd].active == 1);
- deRegisterSelf();
- callCallback();
-}
-
-/**
- * call the IO callbacks
- *
- * This should be called before comm_select() so code can attempt to
- * initiate some IO.
- *
- * When io callbacks are added, they are added with the current
- * sequence number. The sequence number is incremented in this routine -
- * since callbacks are added to the _tail_ of the list, when we hit a
- * callback with a seqnum _not_ what it was when we entered this routine,
- * we can stop.
- */
-void
-comm_calliocallback(void)
-{
- CommCallbackData *cio;
- int oldseqnum = CommCallbackSeqnum++;
-
- /* Call our callbacks until we hit NULL or the seqnum changes */
-
- /* This will likely rap other counts - again, thats ok (for now)
- * What we should see is the total of the various callback subclasses
- * equaling this counter.
- * If they don't, someone has added a class but not profiled it.
- */
- PROF_start(comm_calliocallback);
-
- debugs(5, 7, "comm_calliocallback: " << CommCallbackList.head);
-
- while (CommCallbackList.head != NULL && oldseqnum != ((CommCallbackData *)CommCallbackList.head->data)->result.seqnum) {
- dlink_node *node = (dlink_node *)CommCallbackList.head;
- cio = (CommCallbackData *)node->data;
- cio->callACallback();
- delete cio;
- }
-
- PROF_stop(comm_calliocallback);
-}
-bool
-comm_iocallbackpending(void)
+static bool
+isOpen(const int fd)
{
- debugs(5, 7, "comm_iocallbackpending: " << CommCallbackList.head);
- return (CommCallbackList.head != NULL) || (commfd_completed_events.head != NULL);
+ return fd_table[fd].flags.open != 0;
}
/**
if (retval < 0 && !ignoreErrno(errno)) {
debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
ccb->offset = 0;
- commio_complete_callback(fd, ccb, COMM_ERROR, errno);
+ commio_finish_callback(fd, ccb, COMM_ERROR, errno);
return;
};
/* Note - read 0 == socket EOF, which is a valid read */
if (retval >= 0) {
fd_bytes(fd, retval, FD_READ);
- ccb->offset = retval;
- commio_complete_callback(fd, ccb, COMM_OK, errno);
+ ccb->offset = retval;
+ commio_finish_callback(fd, ccb, COMM_OK, errno);
return;
}
*/
void
comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
+{
+ AsyncCall::Pointer call = commCbCall(5,4, "SomeCommReadHandler",
+ CommIoCbPtrFun(handler, handler_data));
+ comm_read(fd, buf, size, call);
+}
+
+void
+comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
{
/* Make sure we're not reading anything and we're not closing */
- assert(fdc_table[fd].active == 1);
+ assert(isOpen(fd));
assert(!fd_table[fd].flags.closing);
debugs(5, 4, "comm_read, queueing read for FD " << fd);
/* Queue the read */
/* XXX ugly */
- commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), handler, handler_data, (char *)buf, NULL, size);
+ commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd),
+ callback, (char *)buf, NULL, size);
commSetSelect(fd, COMM_SELECT_READ, commHandleRead, COMMIO_FD_READCB(fd), 0);
}
#endif
}
-static void
-requireOpenAndActive(int const fd)
-{
- assert(fd_table[fd].flags.open == 1);
- assert(fdc_table[fd].active == 1);
-}
/**
* Return whether the FD has a pending completed callback.
int
comm_has_pending_read_callback(int fd)
{
- requireOpenAndActive(fd);
- return COMMIO_FD_READCB(fd)->active && COMMIO_FD_READCB(fd)->completed;
-}
-
-template <class P>
-bool
-fdc_t::findCallback(P predicate)
-{
- /*
- * XXX I don't like having to walk the list!
- * Instead, if this routine is called often enough, we should
- * also maintain a linked list of _read_ events - we can just
- * check if the list head a HEAD..
- * - adrian
- */
- dlink_node *node = CommCallbackList.head;
-
- while (node != NULL) {
- if (predicate((CommCallbackData *)node->data))
- return true;
-
- node = node->next;
- }
-
- /* Not found */
+ assert(isOpen(fd));
+ // XXX: We do not know whether there is a read callback scheduled.
+ // This is used for pconn management that should probably be more
+ // tightly integrated into comm to minimize the chance that a
+ // closing pconn socket will be used for a new transaction.
return false;
}
-/**
- * return whether a file descriptor has a read handler
- *
- * Assumptions: the fd is open
- * the fd is a comm fd.
- *
- * Again - is this "pending read", or "pending completed event", or what?
- * I'll assume its pending read, not pending completed.
- *
- * This makes no sense though - if this is called to check whether there's
- * a pending read -before- submitting a read then it won't matter whether
- * its completed or not! Ie:
- *
- * + if there's no read and you want to schedule one; fine.
- * + if a read has completed then the callback block has been deactivated before
- * the callback is called - if something decides to register for a read
- * callback once again it should find !active and !completed.
- * + scheduling a read event when the fd is ! active -and- completed, thats
- * a bug
- * + like, afaict, anything else is.
- */
+// Does comm check this fd for read readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
bool
-comm_has_pending_read(int fd)
+comm_monitors_read(int fd)
{
- requireOpenAndActive(fd);
- return COMMIO_FD_READCB(fd)->active && (! COMMIO_FD_READCB(fd)->completed);
+ assert(isOpen(fd));
+ // Being active is usually the same as monitoring because we always
+ // start monitoring the FD when we configure comm_io_callback_t for I/O
+ // and we usually configure comm_io_callback_t for I/O when we starting
+ // monitoring a FD for reading. TODO: replace with commio_has_callback
+ return COMMIO_FD_READCB(fd)->active();
}
/**
* Cancel a pending read. Assert that we have the right parameters,
* and that there are no pending read events!
*
+ * XXX: We do not assert that there are no pending read events and
+ * with async calls it becomes even more difficult.
+ * The whole interface should be reworked to do callback->cancel()
+ * instead of searching for places where the callback may be stored and
+ * updating the state of those places.
+ *
* AHC Don't call the comm handlers?
*/
void
comm_read_cancel(int fd, IOCB *callback, void *data)
{
- requireOpenAndActive(fd);
+ if (!isOpen(fd)) {
+ debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
+ return;
+ }
+
+ comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+ // TODO: is "active" == "monitors FD"?
+ if (!cb->active()) {
+ debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
+ return;
+ }
+
+ typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
+ Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
+ if (!call) {
+ debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback");
+ return;
+ }
+
+ typedef CommIoCbParams Params;
+ const Params ¶ms = GetCommParams<Params>(cb->callback);
/* Ok, we can be reasonably sure we won't lose any data here! */
- assert(COMMIO_FD_READCB(fd)->callback == callback);
- assert(COMMIO_FD_READCB(fd)->callback_data == data);
+ assert(call->dialer.handler == callback);
+ assert(params.data == data);
/* Delete the callback */
- commio_cancel_callback(fd, COMMIO_FD_READCB(fd));
+ commio_cancel_callback(fd, cb);
/* And the IO event */
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
}
-
-/**
- * Open a filedescriptor, set some sane defaults
- * XXX DPW 2006-05-30 what is the point of this?
- */
void
-fdc_open(int fd, unsigned int type, char const *desc)
+comm_read_cancel(int fd, AsyncCall::Pointer &callback)
{
- assert(fdc_table[fd].active == 0);
+ callback->cancel("comm_read_cancel");
+
+ if (!isOpen(fd)) {
+ debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
+ return;
+ }
+
+ comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+
+ if (!cb->active()) {
+ debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
+ return;
+ }
+
+ AsyncCall::Pointer call = cb->callback;
+ assert(call != NULL); // XXX: should never fails (active() checks for callback==NULL)
+
+ /* Ok, we can be reasonably sure we won't lose any data here! */
+ assert(call == callback);
- fdc_table[fd].active = 1;
- fdc_table[fd].fd = fd;
- fd_open(fd, type, desc);
+ /* Delete the callback */
+ commio_cancel_callback(fd, cb);
+
+ /* And the IO event */
+ commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
}
bool
comm_has_incomplete_write(int fd)
{
- requireOpenAndActive(fd);
- return COMMIO_FD_WRITECB(fd)->active;
+ assert(isOpen(fd));
+ return COMMIO_FD_WRITECB(fd)->active();
}
/**
/* update fdstat */
debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
+ assert(!isOpen(new_socket));
fd_open(new_socket, FD_SOCKET, note);
fdd_table[new_socket].close_file = NULL;
fdd_table[new_socket].close_line = 0;
- assert(fdc_table[new_socket].active == 0);
-
- fdc_table[new_socket].active = 1;
-
F = &fd_table[new_socket];
F->local_addr = addr;
cbdataFree(address);
}
+
+
void
-commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
+commConnectStart(int fd, const char *host, u_short port, AsyncCall::Pointer &cb)
{
+ debugs(cb->debugSection, cb->debugLevel, "commConnectStart: FD " << fd <<
+ ", cb " << cb << ", " << host << ":" << port); // TODO: just print *cb
+
ConnectStateData *cs;
- debugs(5, 3, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
cs = new ConnectStateData;
cs->fd = fd;
cs->host = xstrdup(host);
cs->default_port = port;
- cs->callback = CallBack<CNCB>(callback, data);
+ cs->callback = cb;
+
comm_add_close_handler(fd, commConnectFree, cs);
ipcache_nbgethostbyname(host, commConnectDnsHandle, cs);
}
+// TODO: Remove this and similar callback registration functions by replacing
+// (callback,data) parameters with an AsyncCall so that we do not have to use
+// a generic call name and debug level when creating an AsyncCall. This will
+// also cut the number of callback registration routines in half.
+void
+commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
+{
+ debugs(5, 5, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
+ AsyncCall::Pointer call = commCbCall(5,3,
+ "SomeCommConnectHandler", CommConnectCbPtrFun(callback, data));
+ commConnectStart(fd, host, port, call);
+}
+
static void
commConnectDnsHandle(const ipcache_addrs * ia, void *data)
{
void
ConnectStateData::callCallback(comm_err_t status, int xerrno)
{
- debugs(5, 3, "commConnectCallback: FD " << fd << ", data " << callback.data << ", status " << status);
+ debugs(5, 3, "commConnectCallback: FD " << fd);
comm_remove_close_handler(fd, commConnectFree, this);
- CallBack<CNCB> aCallback = callback;
- callback = CallBack<CNCB>();
commSetTimeout(fd, -1, NULL, NULL);
- if (aCallback.dataValid())
- aCallback.handler(fd, status, xerrno, aCallback.data);
+ typedef CommConnectCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
+ params.flag = status;
+ params.xerrno = xerrno;
+ ScheduleCallHere(callback);
+ callback = NULL;
commConnectFree(fd, this);
}
{
ConnectStateData *cs = (ConnectStateData *)data;
debugs(5, 3, "commConnectFree: FD " << fd);
- cs->callback = CallBack<CNCB>();
+// delete cs->callback;
+ cs->callback = NULL;
safe_free(cs->host);
delete cs;
}
struct addrinfo *AI = NULL;
IPAddress nul;
- if (!cbdataReferenceValid(callback.data))
- return 0;
+// XXX: do we have to check this?
+//
+// if (!cbdataReferenceValid(callback.data))
+// return 0;
statCounter.syscalls.sock.sockets++;
}
}
}
-
+/*
int
-commSetTimeout(int fd, int timeout, PF * handler, void *data)
+commSetTimeout_old(int fd, int timeout, PF * handler, void *data)
{
debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
assert(fd >= 0);
return F->timeout;
}
+*/
+
+int
+commSetTimeout(int fd, int timeout, PF * handler, void *data)
+{
+ AsyncCall::Pointer call;
+ debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
+ if(handler != NULL)
+ call=commCbCall(5,4, "SomeTimeoutHandler", CommTimeoutCbPtrFun(handler, data));
+ else
+ call = NULL;
+ return commSetTimeout(fd, timeout, call);
+}
+
+
+int commSetTimeout(int fd, int timeout, AsyncCall::Pointer &callback)
+{
+ debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
+ assert(fd >= 0);
+ assert(fd < Squid_MaxFD);
+ fde *F = &fd_table[fd];
+ assert(F->flags.open);
+
+ if (timeout < 0) {
+ F->timeoutHandler = NULL;
+ F->timeout = 0;
+ } else {
+ if (callback != NULL) {
+ typedef CommTimeoutCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
+ F->timeoutHandler = callback;
+ }
+
+ F->timeout = squid_curtime + (time_t) timeout;
+ }
+
+ return F->timeout;
+
+}
int
comm_connect_addr(int sock, const IPAddress &address)
assert(address.GetPort() != 0);
- debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family <<
- ") Old-State=" << fdc_table[sock].active);
+ debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family << ")");
address.GetAddrInfo(AI, F->sock_family);
fd_open(sock, FD_SOCKET, "HTTP Request");
fdd_table[sock].close_file = NULL;
fdd_table[sock].close_line = 0;
- fdc_table[sock].active = 1;
fde *F = &fd_table[sock];
details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
F->remote_port = details.peer.GetPort();
debugs(5, 5, "commCallCloseHandlers: FD " << fd);
while (F->closeHandler != NULL) {
- close_handler ch = *F->closeHandler;
- conn_close_pool->free(F->closeHandler); /* AAA */
- F->closeHandler = ch.next;
- ch.next = NULL;
- debugs(5, 5, "commCallCloseHandlers: ch->handler=" << ch.handler << " data=" << ch.data);
-
- if (cbdataReferenceValid(ch.data))
- ch.handler(fd, ch.data);
-
- cbdataReferenceDone(ch.data);
+ AsyncCall::Pointer call = F->closeHandler;
+ F->closeHandler = call->Next();
+ call->setNext(NULL);
+ // If call is not canceled schedule it for execution else ignore it
+ if(!call->canceled()){
+ debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+ typedef CommCloseCbParams Params;
+ Params ¶ms = GetCommParams<Params>(call);
+ params.fd = fd;
+ ScheduleCallHere(call);
+ }
}
}
}
void
-CommRead::nullCallback()
+CommRead::doCallback(comm_err_t errcode, int xerrno)
{
- callback = CallBack<IOCB>();
+ if (callback != NULL) {
+ typedef CommIoCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
+ params.size = 0;
+ params.flag = errcode;
+ params.xerrno = xerrno;
+ ScheduleCallHere(callback);
+ callback = NULL;
+ }
}
-void
-AcceptFD::nullCallback()
+void
+comm_close_complete(int fd, void *data)
{
- callback = CallBack<IOACB>();
-}
+#if USE_SSL
+ fde *F = &fd_table[fd];
-void
-CommRead::doCallback(comm_err_t errcode, int xerrno)
-{
- if (callback.handler)
- callback.handler(fd, buf, 0, errcode, xerrno, callback.data);
+ if (F->ssl) {
+ SSL_free(F->ssl);
+ F->ssl = NULL;
+ }
- nullCallback();
-}
+#endif
+ fd_close(fd); /* update fdstat */
+
+ close(fd);
+
+ if (AbortChecker::Instance().isMonitoring(fd))
+ AbortChecker::Instance().stopMonitoring(fd);
+
+ fdc_table[fd] = AcceptFD(fd);
+
+ statCounter.syscalls.sock.closes++;
+
+ /* When an fd closes, give accept() a chance, if need be */
+
+ if (fdNFree() >= RESERVED_FD)
+ AcceptLimiter::Instance().kick();
-void
-AcceptFD::doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *connDetails)
-{
- if (callback.handler) {
- CallBack<IOACB> aCallback = callback;
- nullCallback();
- aCallback.handler(fd, newfd, connDetails, errcode, xerrno, aCallback.data);
- }
}
/*
_comm_close(int fd, char const *file, int line)
{
fde *F = NULL;
- dlink_node *node;
- CommCallbackData *cio;
debugs(5, 5, "comm_close: FD " << fd);
assert(fd >= 0);
assert(F->flags.open);
/* The following fails because ipc.c is doing calls to pipe() to create sockets! */
- assert(fdc_table[fd].active == 1);
+ assert(isOpen(fd));
assert(F->type != FD_FILE);
/* new-style read/write handler stuff */
if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
- commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
- commio_call_callback(COMMIO_FD_WRITECB(fd));
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
}
if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
- commio_complete_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
- commio_call_callback(COMMIO_FD_READCB(fd));
+ commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
}
/* Do callbacks for read/accept routines, if any */
- fdc_table[fd].accept.accept.doCallback(fd, -1, COMM_ERR_CLOSING, 0, NULL);
-
- /* Complete (w/ COMM_ERR_CLOSING!) any pending io callbacks */
- while (fdc_table[fd].CommCallbackList.head != NULL) {
- node = fdc_table[fd].CommCallbackList.head;
- cio = (CommCallbackData *)node->data;
- assert(fd == cio->result.fd); /* just paranoid */
- /* We're closing! */
- cio->fdClosing();
- cio->callACallback();
- delete cio;
- }
+ fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
commCallCloseHandlers(fd);
F->pconn.pool->count(F->pconn.uses);
comm_empty_os_read_buffers(fd);
+
-#if USE_SSL
-
- if (F->ssl) {
- SSL_free(F->ssl);
- F->ssl = NULL;
- }
-
-#endif
- fd_close(fd); /* update fdstat */
-
- close(fd);
-
- fdc_table[fd].active = 0;
-
- if (fdc_table[fd].half_closed) {
- AbortChecker::Instance().stopMonitoring(fd);
- fdc_table[fd].half_closed = false;
- }
-
- fdc_table[fd] = fdc_t(fd);
-
- statCounter.syscalls.sock.closes++;
+ AsyncCall::Pointer call=commCbCall(5,4, "comm_close_complete",
+ CommCloseCbPtrFun(comm_close_complete, NULL));
+ typedef CommCloseCbParams Params;
+ Params ¶ms = GetCommParams<Params>(call);
+ params.fd = fd;
+ ScheduleCallHere(call);
PROF_stop(comm_close);
- /* When an fd closes, give accept() a chance, if need be */
-
- if (fdNFree() >= RESERVED_FD)
- AcceptLimiter::Instance().kick();
}
/* Send a udp datagram to specified TO_ADDR. */
void
comm_add_close_handler(int fd, PF * handler, void *data)
{
- close_handler *newHandler = (close_handler *)conn_close_pool->alloc(); /* AAA */
- close_handler *c;
debugs(5, 5, "comm_add_close_handler: FD " << fd << ", handler=" <<
handler << ", data=" << data);
- for (c = fd_table[fd].closeHandler; c; c = c->next)
- assert(c->handler != handler || c->data != data);
+ AsyncCall::Pointer call=commCbCall(5,4, "SomeCloseHandler",
+ CommCloseCbPtrFun(handler, data));
+ comm_add_close_handler(fd, call);
+}
- newHandler->handler = handler;
+void
+comm_add_close_handler(int fd, AsyncCall::Pointer &call)
+{
+ debugs(5, 5, "comm_add_close_handler: FD " << fd << ", AsyncCall=" << call);
- newHandler->data = cbdataReference(data);
+ /*TODO:Check for a similar scheduled AsyncCall*/
+// for (c = fd_table[fd].closeHandler; c; c = c->next)
+// assert(c->handler != handler || c->data != data);
- newHandler->next = fd_table[fd].closeHandler;
+ call->setNext(fd_table[fd].closeHandler);
- fd_table[fd].closeHandler = newHandler;
+ fd_table[fd].closeHandler = call;
}
+
+// remove function-based close handler
void
comm_remove_close_handler(int fd, PF * handler, void *data)
{
- assert (fdc_table[fd].active);
- close_handler *p = NULL;
- close_handler *last = NULL;
+ assert (isOpen(fd));
/* Find handler in list */
debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", handler=" <<
handler << ", data=" << data);
- for (p = fd_table[fd].closeHandler; p != NULL; last = p, p = p->next)
- if (p->handler == handler && p->data == data)
- break; /* This is our handler */
+ AsyncCall::Pointer p;
+ for (p = fd_table[fd].closeHandler; p != NULL; p = p->Next()){
+ typedef CommCbFunPtrCallT<CommCloseCbPtrFun> Call;
+ const Call *call = dynamic_cast<const Call*>(p.getRaw());
+ if (!call) // method callbacks have their own comm_remove_close_handler
+ continue;
+ typedef CommCloseCbParams Params;
+ const Params ¶ms = GetCommParams<Params>(p);
+ if (call->dialer.handler == handler && params.data == data)
+ break; /* This is our handler */
+ }
assert(p != NULL);
+ p->cancel("comm_remove_close_handler");
+}
- /* Remove list entry */
- if (last)
- last->next = p->next;
- else
- fd_table[fd].closeHandler = p->next;
+// remove method-based close handler
+void
+comm_remove_close_handler(int fd, AsyncCall::Pointer &call)
+{
+ assert (isOpen(fd));
+ /* Find handler in list */
+ debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", AsyncCall=" << call);
- cbdataReferenceDone(p->data);
+ // Check to see if really exist the given AsyncCall in comm_close handlers
+ // TODO: optimize: this slow code is only needed for the assert() below
+ AsyncCall::Pointer p;
+ for (p = fd_table[fd].closeHandler; p != NULL && p != call; p = p->Next());
+ assert(p == call);
- conn_close_pool->free(p);
+ call->cancel("comm_remove_close_handler");
}
static void
comm_init(void) {
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
- fdc_table = new fdc_t[Squid_MaxFD];
- commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
+ fdc_table = new AcceptFD[Squid_MaxFD];
for (int pos = 0; pos < Squid_MaxFD; ++pos) {
- fdc_table[pos] = fdc_t(pos);
+ fdc_table[pos] = AcceptFD(pos);
}
+
+ commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
for (int pos = 0; pos < Squid_MaxFD; pos++) {
commfd_table[pos].fd = pos;
commfd_table[pos].readcb.fd = pos;
if (nleft != 0)
debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
- commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
} else if (len < 0) {
/* An error */
if (fd_table[fd].flags.socket_eof) {
debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
} else if (ignoreErrno(errno)) {
debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
commSetSelect(fd,
0);
} else {
debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
}
} else {
/* A successful write, continue */
state,
0);
} else {
- commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
}
}
*/
void
comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
+{
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
+ CommIoCbPtrFun(handler, handler_data));
+
+ comm_write(fd, buf, size, call, free_func);
+}
+
+void
+comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
{
assert(!fd_table[fd].flags.closing);
- debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": hndl " << handler << ": data " << handler_data << ".");
+ debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback << ".");
if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
/* This means that the write has been scheduled, but has not
*/
fatalf ("comm_write: fd %d: pending callback!\n", fd);
}
- /* XXX ugly */
- commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), handler, handler_data, (char *)buf, free_func, size);
+
+ commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd),
+ callback, (char *)buf, free_func, size);
commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, COMMIO_FD_WRITECB(fd), 0);
}
+
/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
void
comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) {
comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
}
+void
+comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback) {
+ comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
+}
+
/*
* hm, this might be too general-purpose for all the places we'd
if (F->flags.ipc) /* don't close inter-process sockets */
continue;
- if (F->timeout_handler) {
- PF *callback = F->timeout_handler;
- void *cbdata = NULL;
- F->timeout_handler = NULL;
+ if (F->timeoutHandler != NULL) {
+ AsyncCall::Pointer callback = F->timeoutHandler;
+ F->timeoutHandler = NULL;
debugs(5, 5, "commCloseAllSockets: FD " << fd << ": Calling timeout handler");
-
- if (cbdataReferenceValidDone(F->timeout_data, &cbdata))
- callback(fd, cbdata);
+ ScheduleCallHere(callback);
} else {
debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_close()");
comm_close(fd);
checkTimeouts(void) {
int fd;
fde *F = NULL;
- PF *callback;
+ AsyncCall::Pointer callback;
for (fd = 0; fd <= Biggest_FD; fd++) {
F = &fd_table[fd];
debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
- if (F->timeout_handler) {
+ if (F->timeoutHandler != NULL) {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
- callback = F->timeout_handler;
- F->timeout_handler = NULL;
- callback(fd, F->timeout_data);
+ callback = F->timeoutHandler;
+ F->timeoutHandler = NULL;
+ ScheduleCallHere(callback);
} else {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
comm_close(fd);
return sock;
}
+// AcceptFD::callback() wrapper
void
-fdc_t::beginAccepting() {
- accept.accept.beginAccepting();
+comm_accept(int fd, IOACB *handler, void *handler_data) {
+ debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
+ assert(isOpen(fd));
+
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
+ CommAcceptCbPtrFun(handler, handler_data));
+ fdc_table[fd].subscribe(call);
}
-int
-fdc_t::acceptCount() const {
- return accept.accept.acceptCount();
+void
+comm_accept(int fd, AsyncCall::Pointer &call) {
+ debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
+ assert(isOpen(fd));
+
+ fdc_table[fd].subscribe(call);
}
+// Called when somebody wants to be notified when our socket accepts new
+// connection. We do not probe the FD until there is such interest.
void
-fdc_t::acceptOne(int fd) {
+AcceptFD::subscribe(AsyncCall::Pointer &call) {
+ /* make sure we're not pending! */
+ assert(!theCallback);
+ theCallback = call;
+
+#if OPTIMISTIC_IO
+ mayAcceptMore = true; // even if we failed to accept last time
+#endif
+
+ if (mayAcceptMore)
+ acceptNext();
+ else
+ commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
+}
+
+bool
+AcceptFD::acceptOne() {
// If there is no callback and we accept, we will leak the accepted FD.
// When we are running out of FDs, there is often no callback.
- if (!accept.accept.callback.handler) {
- debugs(5, 5, "fdc_t::acceptOne orphaned: FD " << fd);
+ if (!theCallback) {
+ debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
// XXX: can we remove this and similar "just in case" calls and
// either listen always or listen only when there is a callback?
if (!AcceptLimiter::Instance().deferring())
commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- accept.accept.finished(true);
- return;
+ return false;
}
/*
*/
/* Accept a new connection */
- int newfd = comm_old_accept(fd, accept.connDetails);
+ ConnectionDetail connDetails;
+ int newfd = comm_old_accept(fd, connDetails);
/* Check for errors */
if (newfd < 0) {
+ assert(theCallback != NULL);
+
if (newfd == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, "fdc_t::acceptOne eof: FD " << fd << " handler: " << (void*)accept.accept.callback.handler);
+ debugs(5, 5, "AcceptFD::acceptOne eof: FD " << fd <<
+ " handler: " << *theCallback);
commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- accept.accept.finished(true);
- return;
+ return false;
}
- /* A non-recoverable error - register an error callback */
- new CommAcceptCallbackData(fd, accept.accept.callback, COMM_ERROR, errno, -1, accept.connDetails);
-
- accept.accept.callback = CallBack<IOACB>();
-
- accept.accept.finished(true);
-
- return;
+ // A non-recoverable error; notify the caller */
+ notify(-1, COMM_ERROR, errno, connDetails);
+ return false;
}
- debugs(5, 5, HERE << "accepted: FD " << fd << " handler: " << (void*)accept.accept.callback.handler << " newfd: " << newfd << " from: " << accept.connDetails.peer);
-
- assert(accept.accept.callback.handler);
-
- accept.accept.doCallback(fd, newfd, COMM_OK, 0, &accept.connDetails);
-
- /* If we weren't re-registed, don't bother trying again! */
-
- if (accept.accept.callback.handler == NULL)
- accept.accept.finished(true);
-}
-
-bool
-AcceptFD::finished() const {
- return finished_;
+ assert(theCallback != NULL);
+ debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
+ " newfd: " << newfd << " from: " << connDetails.peer <<
+ " handler: " << *theCallback);
+ notify(newfd, COMM_OK, 0, connDetails);
+ return true;
}
void
-AcceptFD::finished(bool newValue) {
- finished_ = newValue;
+AcceptFD::acceptNext() {
+ mayAcceptMore = acceptOne();
}
-bool
-AcceptFD::finishedAccepting() const {
- return acceptCount() >= MAX_ACCEPT_PER_LOOP || finished();
+void
+AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
+{
+ if (theCallback != NULL) {
+ typedef CommAcceptCbParams Params;
+ Params ¶ms = GetCommParams<Params>(theCallback);
+ params.fd = fd;
+ params.nfd = newfd;
+ params.details = connDetails;
+ params.flag = errcode;
+ params.xerrno = xerrno;
+ ScheduleCallHere(theCallback);
+ theCallback = NULL;
+ }
}
/*
* to dupe itself and fob off an accept()ed connection
*/
static void
-comm_accept_try(int fd, void *data) {
- assert(fdc_table[fd].active == 1);
-
- fdc_table[fd].beginAccepting();
-
- while (!fdc_table[fd].accept.accept.finishedAccepting())
- fdc_table[fd].acceptOne(fd);
-}
-
-/*
- * Notes:
- * + the current interface will queue _one_ accept per io loop.
- * this isn't very optimal and should be revisited at a later date.
- */
-void
-comm_accept(int fd, IOACB *handler, void *handler_data) {
- debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
- requireOpenAndActive(fd);
-
- /* make sure we're not pending! */
- assert(fdc_table[fd].accept.accept.callback.handler == NULL);
-
- /* Record our details */
- fdc_table[fd].accept.accept.callback = CallBack<IOACB> (handler, handler_data);
-
- /* Kick off the accept */
-#if OPTIMISTIC_IO
-
- comm_accept_try(fd, NULL);
-#else
-
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-#endif
+comm_accept_try(int fd, void *) {
+ assert(isOpen(fd));
+ fdc_table[fd].acceptNext();
}
void CommIO::Initialise() {
void
commMarkHalfClosed(int fd) {
- assert (fdc_table[fd].active && !fdc_table[fd].half_closed);
+ assert (isOpen(fd));
AbortChecker::Instance().monitor(fd);
- fdc_table[fd].half_closed = true;
}
int commIsHalfClosed(int fd) {
- assert (fdc_table[fd].active);
+ assert (isOpen(fd));
- return fdc_table[fd].half_closed;
+ return AbortChecker::Instance().isMonitoring(fd);
}
void
return lhs - rhs;
}
+bool
+AbortChecker::isMonitoring(int fd) const {
+ return contains(fd);
+}
+
bool
AbortChecker::contains (int const fd) const {
fds = fds->splay(fd, IntCompare);
*/
}
-CommRead::CommRead() : fd(-1), buf(NULL), len(0) {}
+CommRead::CommRead() : fd(-1), buf(NULL), len(0), callback(NULL) {}
-CommRead::CommRead(int fd_, char *buf_, int len_, IOCB *handler_, void *data_)
- : fd(fd_), buf(buf_), len(len_), callback(handler_, data_) {}
+CommRead::CommRead(int fd_, char *buf_, int len_, AsyncCall::Pointer &callback_)
+ : fd(fd_), buf(buf_), len(len_), callback(callback_) {}
DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {}
ConnectionDetail::ConnectionDetail() : me(), peer() {
}
-bool
-CommDispatcher::dispatch() {
- bool result = comm_iocallbackpending();
- comm_calliocallback();
- /* and again to deal with indirectly queued events
- * resulting from the first call. These are usually
- * callbacks and should be dealt with immediately.
- */
- comm_calliocallback();
-
- /* Adrian's *new* stuff */
- commio_call_callbacks();
- return result;
-}
-
int
CommSelectEngine::checkEvents(int timeout) {
static time_t last_timeout = 0;