/*
- * $Id: comm.cc,v 1.423 2006/09/03 21:05:20 hno Exp $
+ * $Id: comm.cc,v 1.424 2006/09/19 07:56:57 adrian Exp $
*
* DEBUG: section 5 Socket Functions
* AUTHOR: Harvest Derived
#include <netinet/tcp.h>
#endif
+/*
+ * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
+ */
+
+typedef enum {
+ IOCB_NONE,
+ IOCB_READ,
+ IOCB_WRITE
+} iocb_type;
+
+struct _comm_io_callback {
+ iocb_type type;
+ int fd;
+ IOCB *callback;
+ void *callback_data;
+ char *buf;
+ FREE *freefunc;
+ int size;
+ int offset;
+ bool active;
+ bool completed;
+ comm_err_t errcode;
+ int xerrno;
+ dlink_node node;
+};
+typedef struct _comm_io_callback comm_io_callback_t;
+
+struct _comm_fd {
+ int fd;
+ comm_io_callback_t readcb;
+ comm_io_callback_t writecb;
+};
+typedef struct _comm_fd comm_fd_t;
+comm_fd_t *commfd_table;
+
+dlink_list commfd_completed_events;
+
+bool
+commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
+{
+ assert(ccb->fd == fd);
+ assert(ccb->type == type);
+ return ccb->active == true;
+}
+
+/*
+ * Set the given handler and mark active
+ *
+ * @param fd filedescriptor
+ * @param ccb comm io callback
+ * @param cb callback
+ * @param cbdata callback data (must be cbdata'ed)
+ * @param buf buffer, if applicable
+ * @param freefunc freefunc, if applicable
+ * @param size buffer size
+ */
+void
+commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, IOCB *cb, void *cbdata, char *buf, FREE *freefunc, int size)
+{
+ assert(ccb->active == false);
+ assert(ccb->type == type);
+ ccb->fd = fd;
+ ccb->callback = cb;
+ ccb->callback_data = cbdata;
+ if (cbdata) {
+ cbdataReference(cbdata);
+ }
+ ccb->buf = buf;
+ ccb->freefunc = freefunc;
+ ccb->size = size;
+ ccb->active = true;
+ ccb->completed = false;
+ ccb->offset = 0;
+}
+
+
+/*
+ * Complete the callback
+ *
+ * Someone may have already called this function once on a non-completed callback.
+ * This happens in the comm_close() routine - the IO may have completed
+ * but comm_close() is called bfeore teh callback has been called.
+ * In this case, leave the details the same (offset, for example) but just update
+ * the error codes.
+ */
+void
+commio_complete_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
+{
+ debug(5, 3) ("commio_complete_callback: called for %d (%d, %d)\n", fd, code, xerrno);
+ assert(ccb->active == true);
+ assert(ccb->fd == fd);
+ ccb->errcode = code;
+ ccb->xerrno = xerrno;
+ if (! ccb->completed)
+ dlinkAddTail(ccb, &ccb->node, &commfd_completed_events);
+ ccb->completed = true;
+}
+
+
+/*
+ * Cancel the given callback
+ *
+ * Remember that the data is cbdataRef'ed.
+ */
+void
+commio_cancel_callback(int fd, comm_io_callback_t *ccb)
+{
+ debug(5, 3) ("commio_cancel_callback: called for %d\n", fd);
+ assert(ccb->fd == fd);
+ assert(ccb->active == true);
+
+ if (ccb->completed == true) {
+ dlinkDelete(&ccb->node, &commfd_completed_events);
+ }
+ if (ccb->callback_data)
+ cbdataReferenceDone(ccb->callback_data);
+
+ ccb->xerrno = 0;
+ ccb->active = false;
+ ccb->completed = false;
+ ccb->callback = NULL;
+ ccb->callback_data = NULL;
+}
+
+/*
+ * Call the given comm callback; assumes the callback is valid.
+ *
+ * @param ccb io completion callback
+ */
+void
+commio_call_callback(comm_io_callback_t *ccb)
+{
+ comm_io_callback_t cb = *ccb;
+ void *cbdata;
+ assert(cb.active == true);
+ assert(cb.completed == true);
+ debug(5, 3) ("commio_call_callback: called for %d\n", ccb->fd);
+
+ /* We've got a copy; blow away the real one */
+ /* XXX duplicate code from commio_cancel_callback! */
+ dlinkDelete(&ccb->node, &commfd_completed_events);
+ ccb->xerrno = 0;
+ ccb->active = false;
+ ccb->completed = false;
+ ccb->callback = NULL;
+ ccb->callback_data = NULL;
+
+ /* free data */
+ if (cb.freefunc) {
+ cb.freefunc(cb.buf);
+ cb.buf = NULL;
+ }
+ if (cb.callback && cbdataReferenceValidDone(cb.callback_data, &cbdata)) {
+ /* XXX truely ugly for now! */
+ cb.callback(cb.fd, cb.buf, cb.offset, cb.errcode, cb.xerrno, cbdata);
+ }
+}
+
+void
+commio_call_callbacks(void)
+{
+ comm_io_callback_t *ccb;
+ while (commfd_completed_events.head != NULL) {
+ ccb = (comm_io_callback_t *) commfd_completed_events.head->data;
+ commio_call_callback(ccb);
+ }
+}
+
class ConnectStateData
{
static comm_err_t commBind(int s, struct IN_ADDR, u_short port);
static void commSetReuseAddr(int);
static void commSetNoLinger(int);
-static void CommWriteStateCallbackAndFree(int fd, comm_err_t code);
#ifdef TCP_NODELAY
static void commSetTcpNoDelay(int);
#endif
size_t const AcceptFD::MAX_ACCEPT_PER_LOOP(10);
-class CommWrite
-{
-
-public:
- CommWrite() : buf(NULL), size(0), curofs(0), handler(NULL), handler_data(NULL){}
-
- const char *buf;
- int size;
- int curofs;
- IOCB *handler;
- void *handler_data;
-};
-
class fdc_t
{
void acceptOne(int fd);
void beginAccepting();
int acceptCount() const;
- fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; fill.amountDone = 0; fill.handler = NULL; fill.handler_data = NULL;}
+ fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; }
fdc_t(int anFD) : active(0), fd(anFD), half_closed(false)
{
CommCallbackList.head = NULL;
CommCallbackList.tail = NULL;
- fill.amountDone = 0;
- fill.handler = NULL;
- fill.handler_data = NULL;
- read.fd = anFD;
}
int active;
int fd;
dlink_list CommCallbackList;
- CommRead read;
-
- bool hasIncompleteWrite();
-
template<class P>
bool findCallback(P predicate);
- CommWrite write;
-
class Accept
{
Accept accept;
- struct CommFiller
- {
- StoreIOBuffer requestedData;
- size_t amountDone;
- IOFCB *handler;
- void *handler_data;
- }
-
- fill;
-
bool half_closed;
};
MEMPROXY_CLASS_INLINE(CommCallbackData)
-class CommReadCallbackData : public CommCallbackData
-{
-
-public:
- MEMPROXY_CLASS(CommReadCallbackData);
- CommReadCallbackData(CommCommonCallback const &, CallBack<IOCB> aCallback, int);
- virtual comm_callback_t getType() const { return COMM_CB_READ; }
-
- virtual void callCallback();
-
-private:
- CallBack<IOCB> callback;
- int retval;
-};
-
-MEMPROXY_CLASS_INLINE(CommReadCallbackData);
-
class CommAcceptCallbackData : public CommCallbackData
{
MEMPROXY_CLASS_INLINE(CommFillCallbackData)
-class CommWriteCallbackData : public CommCallbackData
-{
-
-public:
- MEMPROXY_CLASS(CommWriteCallbackData);
- CommWriteCallbackData(int const anFd, CallBack<IOWCB> aCallback, comm_err_t, int, int);
- virtual void callCallback();
-
-private:
- CallBack<IOWCB> callback;
- int retval;
-};
-
-MEMPROXY_CLASS_INLINE(CommWriteCallbackData)
-
struct _fd_debug_t
{
char const *close_file;
typedef struct _fd_debug_t fd_debug_t;
-static MemAllocator *comm_write_pool = NULL;
static MemAllocator *conn_close_pool = NULL;
fdc_t *fdc_table = NULL;
fd_debug_t *fdd_table = NULL;
registerSelf();
}
-CommReadCallbackData::CommReadCallbackData(CommCommonCallback const &aResult, CallBack<IOCB> aCallback, int aRetval) : CommCallbackData(aResult), callback(aCallback), retval(aRetval)
-{}
-
CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack<IOACB> aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails)
{}
-CommFillCallbackData::CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t anErrcode, int anErrno) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback)
-{}
-
-CommWriteCallbackData::CommWriteCallbackData(int const anFd, CallBack<IOWCB> aCallback, comm_err_t anErrcode, int anErrno, int aRetval) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), retval (aRetval)
-{}
-
void
CommCallbackData::registerSelf()
{
* has finished but we don't want to risk re-entering a non-reentrant
* code block.
*/
-static void
-comm_add_fill_callback(int fd, size_t length, comm_err_t errcode, int xerrno)
-{
- CommCallbackData *cio;
-
- cio = new CommFillCallbackData(fd, CallBack<IOFCB>(fdc_table[fd].fill.handler, fdc_table[fd].fill.handler_data), errcode, xerrno);
-
- /* Throw our data into it */
- cio->sb = fdc_table[fd].fill.requestedData;
- cio->sb.length = length;
- /* Clear out fd state */
- fdc_table[fd].fill.handler = NULL;
- fdc_table[fd].fill.handler_data = NULL;
-}
-
-static void
-comm_add_write_callback(int fd, size_t retval, comm_err_t errcode, int xerrno)
-{
- CommCallbackData *cio;
-
- cio = new CommWriteCallbackData(fd, CallBack<IOWCB>(fdc_table[fd].write.handler, fdc_table[fd].write.handler_data), errcode, xerrno, retval);
-
- /* Clear out fd state */
- fdc_table[fd].write.handler = NULL;
- fdc_table[fd].write.handler_data = NULL;
-}
-
-void
-CommReadCallbackData::callCallback()
-{
- PROF_start(CommReadCallbackData_callCallback);
- callback.handler(result.fd, buf, retval, result.errcode, result.xerrno, callback.data);
- PROF_stop(CommReadCallbackData_callCallback);
-}
-
void
CommAcceptCallbackData::callCallback()
{
PROF_stop(CommAcceptCallbackData_callCallback);
}
-void
-CommWriteCallbackData::callCallback()
-{
- PROF_start(CommWriteCallbackData_callCallback);
- callback.handler(result.fd, buf, retval, result.errcode, result.xerrno, callback.data);
- PROF_stop(CommWriteCallbackData_callCallback);
-}
-
-void
-CommFillCallbackData::callCallback()
-{
- PROF_start(CommFillCallbackData_callCallback);
- callback.handler(result.fd, sb, result.errcode, result.xerrno, callback.data);
- PROF_stop(CommFillCallbackData_callCallback);
-}
-
void
CommCallbackData::fdClosing()
{
return CommCallbackList.head != NULL;
}
-void
-CommRead::queueCallback(size_t retval, comm_err_t errcode, int xerrno)
-{
- hasCallbackInvariant();
-
- CommCallbackData *cio;
- cio = new CommReadCallbackData(CommCommonCallback(fd, errcode, xerrno),callback, retval);
-
- /* Throw our data into it */
- cio->buf = buf;
- callback = CallBack<IOCB>();
-}
-
-void
-CommRead::hasCallbackInvariant() const
-{
- assert (hasCallback());
-}
-
-void
-CommRead::hasNoCallbackInvariant() const
-{
- assert (!hasCallback());
-}
-
-bool
-CommRead::hasCallback() const
-{
- return callback.handler != NULL;
-}
-
/*
* Attempt a read
*
* Else, wait for another IO notification.
*/
void
-CommRead::ReadTry(int fd, void *data)
-{
- fdc_t *Fc = &fdc_table[fd];
- assert (Fc->read.fd == fd);
- assert (data == NULL);
- Fc->read.tryReading();
-}
-
-void
-CommRead::tryReading()
+commHandleRead(int fd, void *data)
{
- hasCallbackInvariant();
+ comm_io_callback_t *ccb = (comm_io_callback_t *) data;
+
+ assert(data == COMMIO_FD_READCB(fd));
+ assert(commio_has_callback(fd, IOCB_READ, ccb));
/* Attempt a read */
statCounter.syscalls.sock.reads++;
errno = 0;
int retval;
- retval = FD_READ_METHOD(fd, buf, len);
+ retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
debug(5, 3) ("comm_read_try: FD %d, size %d, retval %d, errno %d\n",
- fd, len, retval, errno);
+ fd, ccb->size, retval, errno);
if (retval < 0 && !ignoreErrno(errno)) {
debug(5, 3) ("comm_read_try: scheduling COMM_ERROR\n");
- queueCallback(0, COMM_ERROR, errno);
+ ccb->offset = 0;
+ commio_complete_callback(fd, ccb, COMM_ERROR, errno);
return;
};
/* Note - read 0 == socket EOF, which is a valid read */
if (retval >= 0) {
fd_bytes(fd, retval, FD_READ);
- queueCallback(retval, COMM_OK, 0);
+ ccb->offset = retval;
+ commio_complete_callback(fd, ccb, COMM_OK, errno);
return;
}
/* Nope, register for some more IO */
- commSetSelect(fd, COMM_SELECT_READ, ReadTry, NULL, 0);
+ commSetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0);
}
/*
{
/* Make sure we're not reading anything and we're not closing */
assert(fdc_table[fd].active == 1);
- fdc_table[fd].read.hasNoCallbackInvariant();
assert(!fd_table[fd].flags.closing);
debug(5,4)("comm_read, queueing read for FD %d\n",fd);
- /* Queue a read */
- fdc_table[fd].read = CommRead(fd, buf, size, handler, handler_data);
- fdc_table[fd].read.read();
-}
-
-void
-CommRead::read()
-{
-#if OPTIMISTIC_IO
-
- tryReading();
-#else
-
- initiateActualRead();
-#endif
-}
-
-void
-CommRead::initiateActualRead()
-{
- /* Register intrest in a FD read */
- commSetSelect(fd, COMM_SELECT_READ, ReadTry, NULL, 0);
-}
-
-static void
-comm_fill_read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
-{
- /* TODO use a reference to the table entry, or use C++ :] */
- fdc_t::CommFiller *fill;
- assert(fdc_table[fd].active == 1);
-
- if (flag != COMM_OK) {
- /* Error! */
- /* XXX This was -1 below, but -1 can't be used for size_t parameters.
- * The callback should set -1 to the client if needed based on the flags
- */
- comm_add_fill_callback(fd, 0, flag, xerrno);
- return;
- }
-
- /* flag is COMM_OK */
- /* We handle EOFs as read lengths of 0! Its eww, but its consistent */
- fill = &fdc_table[fd].fill;
-
- fill->amountDone += len;
-
- assert(fill->amountDone <= fdc_table[fd].fill.requestedData.length);
-
- comm_add_fill_callback(fd, fill->amountDone, COMM_OK, 0);
-}
-
-/*
- * Try filling a StoreIOBuffer with some data, and call a callback when successful
- */
-void
-comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data)
-{
- assert(fdc_table[fd].fill.handler == NULL);
- /* prevent confusion */
- assert (sb.offset == 0);
-
- /* If we don't have any data, record details and schedule a read */
- fdc_table[fd].fill.handler = callback;
- fdc_table[fd].fill.handler_data = data;
- fdc_table[fd].fill.requestedData = sb;
- fdc_table[fd].fill.amountDone = 0;
-
- comm_read(fd, sb.data, sb.length, comm_fill_read, NULL);
+ /* Queue the read */
+ /* XXX ugly */
+ commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), handler, handler_data, (char *)buf, NULL, size);
+ commSetSelect(fd, COMM_SELECT_READ, commHandleRead, COMMIO_FD_READCB(fd), 0);
}
-
/*
* Empty the read buffers
*
}
/*
- * Return whether a file descriptor has any pending read request callbacks
- *
- * Assumptions: the fd is open (ie, its not closing)
+ * Return whether the FD has a pending completed callback.
*/
-
-struct FindReadCallback
-{
- bool operator () (CommCallbackData *cd)
- {
- return cd->getType() == COMM_CB_READ;
- }
-};
-
-
int
comm_has_pending_read_callback(int fd)
{
requireOpenAndActive(fd);
-
- if (fdc_table[fd].findCallback(FindReadCallback()))
- return 1;
-
- return 0;
+ return COMMIO_FD_READCB(fd)->active && COMMIO_FD_READCB(fd)->completed;
}
template <class P>
*
* Assumptions: the fd is open
* the fd is a comm fd.
+ *
+ * Again - is this "pending read", or "pending completed event", or what?
+ * I'll assume its pending read, not pending completed.
+ *
+ * This makes no sense though - if this is called to check whether there's
+ * a pending read -before- submitting a read then it won't matter whether
+ * its completed or not! Ie:
+ *
+ * + if there's no read and you want to schedule one; fine.
+ * + if a read has completed then the callback block has been deactivated before
+ * the callback is called - if something decides to register for a read
+ * callback once again it should find !active and !completed.
+ * + scheduling a read event when the fd is ! active -and- completed, thats
+ * a bug
+ * + like, afaict, anything else is.
*/
bool
comm_has_pending_read(int fd)
{
requireOpenAndActive(fd);
- return (fdc_table[fd].read.hasCallback());
+ return COMMIO_FD_READCB(fd)->active && (! COMMIO_FD_READCB(fd)->completed);
}
/*
* Cancel a pending read. Assert that we have the right parameters,
* and that there are no pending read events!
+ *
+ * AHC Don't call the comm handlers?
*/
void
comm_read_cancel(int fd, IOCB *callback, void *data)
{
requireOpenAndActive(fd);
- assert(fdc_table[fd].read.callback == CallBack<IOCB>(callback,data));
-
- assert(!comm_has_pending_read_callback(fd));
-
/* Ok, we can be reasonably sure we won't lose any data here! */
+ assert(COMMIO_FD_READCB(fd)->callback == callback);
+ assert(COMMIO_FD_READCB(fd)->callback_data == data);
/* Delete the callback */
- fdc_table[fd].read.callback = CallBack<IOCB>();
+ commio_cancel_callback(fd, COMMIO_FD_READCB(fd));
/* And the IO event */
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
}
-/*
- * The new-style comm_write magic
- */
-
-struct FindWriteCallback
-{
- bool operator () (CommCallbackData *cd)
- {
- return dynamic_cast<CommWriteCallbackData *>(cd) != NULL;
- }
-};
-
bool
comm_has_incomplete_write(int fd)
{
requireOpenAndActive(fd);
-
- if (fdc_table[fd].hasIncompleteWrite())
- return true;
-
- return (fdc_table[fd].findCallback(FindWriteCallback()));
-}
-
-bool
-fdc_t::hasIncompleteWrite()
-{
- return write.handler != NULL;
-}
-
-/*
- * Attempt a write
- *
- * If the write attempt succeeds or fails, call the callback.
- * Else, wait for another IO notification.
- */
-static void
-comm_write_try(int fd, void *data)
-{
- fdc_t *Fc = &fdc_table[fd];
- int retval;
-
- /* make sure we actually have a callback */
- assert(Fc->write.handler != NULL);
-
- /* Attempt a write */
- statCounter.syscalls.sock.writes++;
- errno = 0;
- retval = FD_WRITE_METHOD(fd, Fc->write.buf + Fc->write.curofs, Fc->write.size - Fc->write.curofs);
- debug(5, 3) ("comm_write_try: FD %d: tried to write %d bytes, retval %d, errno %d\n",
- fd, Fc->write.size - Fc->write.curofs, retval, errno);
-
- if (retval < 0 && !ignoreErrno(errno)) {
- debug(5, 3) ("comm_write_try: can't ignore error: scheduling COMM_ERROR callback\n");
- comm_add_write_callback(fd, 0, COMM_ERROR, errno);
- return;
- }
-
- if (retval >= 0) {
- fd_bytes(fd, retval, FD_WRITE);
- Fc->write.curofs += retval;
- assert(Fc->write.curofs <= Fc->write.size);
- /* All? */
-
- if (Fc->write.curofs == Fc->write.size) {
- comm_add_write_callback(fd, Fc->write.size, COMM_OK, 0);
- return;
- }
- }
-
- /* if we get here, we need to write more! */
- commSetSelect(fd, COMM_SELECT_WRITE, comm_write_try, NULL, 0);
+ return COMMIO_FD_WRITECB(fd)->active;
}
/*
* Queue a write. handler/handler_data are called when the write fully
* completes, on error, or on file descriptor close.
*/
-void
-comm_write(int fd, const char *buf, size_t size, IOWCB *handler, void *handler_data)
-{
- /* Make sure we're not writing anything and we're not closing */
- assert(fdc_table[fd].active == 1);
- assert(fdc_table[fd].write.handler == NULL);
- assert(!fd_table[fd].flags.closing);
-
- /* Can't queue a write with no callback */
- assert(handler);
-
- /* Queue a write */
- fdc_table[fd].write.buf = buf;
- fdc_table[fd].write.size = size;
- fdc_table[fd].write.handler = handler;
- fdc_table[fd].write.handler_data = handler_data;
- fdc_table[fd].write.curofs = 0;
-
-#if OPTIMISTIC_IO
-
- comm_write_try(fd, NULL);
-#else
- /* Register intrest in a FD read */
- commSetSelect(fd, COMM_SELECT_WRITE, comm_write_try, NULL, 0);
-#endif
-}
-
-/* Older stuff */
-
-static void
-CommWriteStateCallbackAndFree(int fd, comm_err_t code)
-{
- CommWriteStateData *CommWriteState = fd_table[fd].wstate;
- CWCB *callback = NULL;
- void *cbdata;
- fd_table[fd].wstate = NULL;
-
- if (CommWriteState == NULL)
- return;
-
- if (CommWriteState->free_func) {
- FREE *free_func = CommWriteState->free_func;
- void *free_buf = CommWriteState->buf;
- CommWriteState->free_func = NULL;
- CommWriteState->buf = NULL;
- free_func(free_buf);
- }
-
- callback = CommWriteState->handler;
- CommWriteState->handler = NULL;
-
- if (callback && cbdataReferenceValidDone(CommWriteState->handler_data, &cbdata))
- callback(fd, CommWriteState->buf, CommWriteState->offset, code, cbdata);
-
- comm_write_pool->free(CommWriteState);
-}
/* Return the local port associated with fd. */
u_short
commSetTimeout(fd, -1, NULL, NULL);
- CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING);
-
- /* Do callbacks for read/accept/fill routines, if any */
- assert (fd == fdc_table[fd].read.fd);
-
- fdc_table[fd].read.doCallback(COMM_ERR_CLOSING, 0);
+ /* new-style read/write handler stuff */
+ if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
+ commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
+ commio_call_callback(COMMIO_FD_WRITECB(fd));
+ }
+ if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
+ commio_complete_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
+ commio_call_callback(COMMIO_FD_READCB(fd));
+ }
+ /* Do callbacks for read/accept routines, if any */
fdc_table[fd].accept.accept.doCallback(fd, -1, COMM_ERR_CLOSING, 0, NULL);
- if (fdc_table[fd].fill.handler) {
- fdc_table[fd].fill.handler(fd, fdc_table[fd].fill.requestedData, COMM_ERR_CLOSING, 0,
- fdc_table[fd].fill.handler_data);
- fdc_table[fd].fill.handler = NULL;
- }
-
/* Complete (w/ COMM_ERR_CLOSING!) any pending io callbacks */
while (fdc_table[fd].CommCallbackList.head != NULL) {
node = fdc_table[fd].CommCallbackList.head;
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
fdc_table = new fdc_t[Squid_MaxFD];
+ commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
- for (int pos = 0; pos < Squid_MaxFD; ++pos)
+ for (int pos = 0; pos < Squid_MaxFD; ++pos) {
fdc_table[pos] = fdc_t(pos);
+ }
+ for (int pos = 0; pos < Squid_MaxFD; pos++) {
+ commfd_table[pos].fd = pos;
+ commfd_table[pos].readcb.fd = pos;
+ commfd_table[pos].readcb.type = IOCB_READ;
+ commfd_table[pos].writecb.fd = pos;
+ commfd_table[pos].writecb.type = IOCB_WRITE;
+ }
/* XXX account fd_table */
/* Keep a few file descriptors free so that we don't run out of FD's
* Since Squid_MaxFD can be as high as several thousand, don't waste them */
RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
- comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData));
-
conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
}
/* Write to FD. */
static void
commHandleWrite(int fd, void *data) {
- CommWriteStateData *state = (CommWriteStateData *)data;
+ comm_io_callback_t *state = (comm_io_callback_t *)data;
int len = 0;
int nleft;
+ assert(state == COMMIO_FD_WRITECB(fd));
+
PROF_start(commHandleWrite);
debug(5, 5) ("commHandleWrite: FD %d: off %ld, sz %ld.\n",
fd, (long int) state->offset, (long int) state->size);
if (nleft != 0)
debug(5, 1) ("commHandleWrite: FD %d: write failure: connection closed with %d bytes remaining.\n", fd, nleft);
- CommWriteStateCallbackAndFree(fd, nleft ? COMM_ERROR : COMM_OK);
+ commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
} else if (len < 0) {
/* An error */
if (fd_table[fd].flags.socket_eof) {
debug(50, 2) ("commHandleWrite: FD %d: write failure: %s.\n",
fd, xstrerror());
- CommWriteStateCallbackAndFree(fd, COMM_ERROR);
+ commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
} else if (ignoreErrno(errno)) {
debug(50, 10) ("commHandleWrite: FD %d: write failure: %s.\n",
fd, xstrerror());
} else {
debug(50, 2) ("commHandleWrite: FD %d: write failure: %s.\n",
fd, xstrerror());
- CommWriteStateCallbackAndFree(fd, COMM_ERROR);
+ commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
}
} else {
/* A successful write, continue */
state,
0);
} else {
- CommWriteStateCallbackAndFree(fd, COMM_OK);
+ commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
}
}
* free_func is used to free the passed buffer when the write has completed.
*/
void
-comm_old_write(int fd, const char *buf, int size, CWCB * handler, void *handler_data, FREE * free_func) {
- CommWriteStateData *state = fd_table[fd].wstate;
-
+comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
+{
assert(!fd_table[fd].flags.closing);
debug(5, 5) ("comm_write: FD %d: sz %d: hndl %p: data %p.\n",
fd, size, handler, handler_data);
- if (NULL != state) {
+ if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
/* This means that the write has been scheduled, but has not
* triggered yet
*/
- fatalf ("comm_write: fd_table[%d].wstate != NULL\n", fd);
- comm_write_pool->free(state);
- fd_table[fd].wstate = NULL;
+ fatalf ("comm_write: fd %d: pending callback!\n", fd);
}
-
- fd_table[fd].wstate = state = (CommWriteStateData *)comm_write_pool->alloc();
- state->buf = (char *) buf;
- state->size = size;
- state->offset = 0;
- state->handler = handler;
- state->handler_data = cbdataReference(handler_data);
- state->free_func = free_func;
- commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, state, 0);
+ /* XXX ugly */
+ commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), handler, handler_data, (char *)buf, free_func, size);
+ commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, COMMIO_FD_WRITECB(fd), 0);
}
/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
void
-comm_old_write_mbuf(int fd, MemBuf *mb, CWCB * handler, void *handler_data) {
- comm_old_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
+comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) {
+ comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
}
* callbacks and should be dealt with immediately.
*/
comm_calliocallback();
+
+ /* Adrian's *new* stuff */
+ commio_call_callbacks();
return result;
}