/*
- * $Id: comm.cc,v 1.369 2003/03/04 07:55:04 robertc Exp $
+ * $Id: comm.cc,v 1.370 2003/03/08 09:35:15 robertc Exp $
*
* DEBUG: section 5 Socket Functions
* AUTHOR: Harvest Derived
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
*
+ *
+ * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
*/
#include "squid.h"
#include <netinet/tcp.h>
#endif
-/*
- * This magic determines how many times to call accept()
- * at a go.
- */
-#define MAX_ACCEPT_PER_LOOP 10
-typedef struct
+class ConnectStateData
{
+
+public:
+ static void Connect (int fd, void *me);
+ void connect();
+ void callCallback(comm_err_t status, int xerrno);
+ void defaults();
char *host;
u_short port;
struct sockaddr_in S;
- CNCB *callback;
- void *data;
+ CallBack<CNCB> callback;
struct in_addr in_addr;
int locks;
int tries;
int addrcount;
int connstart;
-}
-
-ConnectStateData;
+};
/* STATIC */
#endif
static void commSetTcpRcvbuf(int, int);
static PF commConnectFree;
-static PF commConnectHandle;
static PF commHandleWrite;
static IPH commConnectDnsHandle;
-static void commConnectCallback(ConnectStateData * cs, comm_err_t status, int xerrno);
static int commResetFD(ConnectStateData * cs);
static int commRetryConnect(ConnectStateData * cs);
CBDATA_TYPE(ConnectStateData);
{
public:
+ AcceptFD() : check_delay(0), count(0), finished_(false){}
+
+ void doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *);
+ void nullCallback();
+ void beginAccepting() {count = 0; finished(false);}
+
+ size_t acceptCount() const { return count;}
+
+ bool finishedAccepting() const;
int check_delay;
- IOACB *handler;
+ CallBack<IOACB> callback;
+ bool finished() const;
+ void finished(bool);
+
+private:
+ static size_t const MAX_ACCEPT_PER_LOOP = 10;
+ size_t count;
+ bool finished_;
+};
+
+class CommWrite
+{
+
+public:
+ CommWrite() : buf(NULL), size(0), curofs(0), handler(NULL), handler_data(NULL){}
+
+ const char *buf;
+ int size;
+ int curofs;
+ IOCB *handler;
void *handler_data;
};
{
public:
- int active;
- int fd;
- dlink_list CommCallbackList;
+ void acceptOne(int fd);
+ void beginAccepting();
+ int acceptCount() const;
+ fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; fill.amountDone = 0; fill.handler = NULL; fill.handler_data = NULL;}
- struct
+ fdc_t(int anFD) : active(0), fd(anFD), half_closed(false)
{
- char *buf;
- int size;
- IOCB *handler;
- void *handler_data;
+ CommCallbackList.head = NULL;
+ CommCallbackList.tail = NULL;
+ fill.amountDone = 0;
+ fill.handler = NULL;
+ fill.handler_data = NULL;
+ read.fd = anFD;
}
- read;
+ int active;
+ int fd;
+ dlink_list CommCallbackList;
- struct
- {
- const char *buf;
- int size;
- int curofs;
- IOCB *handler;
- void *handler_data;
- }
+ CommRead read;
- write;
+ CommWrite write;
struct
{
/* how often (in msec) to re-check if we're out of fds on an accept() */
-
- struct sockaddr_in & me() {return connDetails.me;}
-
- struct sockaddr_in & pn() {return connDetails.peer;}
-
AcceptFD accept;
ConnectionDetail connDetails;
}
bool half_closed;
};
-
typedef enum {
COMM_CB_READ = 1,
- COMM_CB_WRITE,
- COMM_CB_ACCEPT,
- COMM_CB_FILL
+ COMM_CB_DERIVED,
} comm_callback_t;
-struct _CommCallbackData
+static int CommCallbackSeqnum = 1;
+
+class CommCommonCallback
{
- comm_callback_t type;
- dlink_node fd_node;
- dlink_node h_node;
+
+public:
+ CommCommonCallback() : fd (-1), errcode (COMM_OK), xerrno(0), seqnum (CommCallbackSeqnum){}
+
+ CommCommonCallback(int anFD, comm_err_t errcode, int anErrno) : fd (anFD), errcode (errcode), xerrno(anErrno), seqnum (CommCallbackSeqnum){}
+
int fd;
- int newfd; /* for accept() */
- char *buf;
- int retval;
- union {
- IOCB *r_callback;
- IOACB *a_callback;
- IOFCB *f_callback;
- IOWCB *w_callback;
- } c;
- void *callback_data;
comm_err_t errcode;
int xerrno;
int seqnum;
- ConnectionDetail details;
+};
+
+class CommCallbackData
+{
+
+public:
+ void *operator new(size_t);
+ void operator delete(void *);
+ virtual void deleteSelf() const;
+ CommCallbackData(CommCommonCallback const &);
+ virtual comm_callback_t getType() const { return COMM_CB_DERIVED; }
+
+ void callACallback();
+ void fdClosing();
+ virtual void callCallback() = 0;
+ void registerSelf();
+ void deRegisterSelf();
+ char *buf;
StoreIOBuffer sb;
+
+protected:
+ CommCommonCallback result;
+ friend void _comm_close(int fd, char *file, int line);
+ friend void comm_calliocallback(void);
+
+private:
+ static MemPool *Pool;
+ dlink_node fd_node;
+ dlink_node h_node;
};
-typedef struct _CommCallbackData CommCallbackData;
+class CommReadCallbackData : public CommCallbackData
+{
+
+public:
+ void *operator new(size_t);
+ void operator delete(void *);
+ void deleteSelf() const;
+ CommReadCallbackData(CommCommonCallback const &, CallBack<IOCB> aCallback, int);
+ virtual comm_callback_t getType() const { return COMM_CB_READ; }
+
+ virtual void callCallback();
+
+private:
+ static MemPool *Pool;
+ CallBack<IOCB> callback;
+ int retval;
+};
+
+class CommAcceptCallbackData : public CommCallbackData
+{
+
+public:
+ void *operator new(size_t);
+ void operator delete(void *);
+ void deleteSelf() const;
+ CommAcceptCallbackData(int const anFd, CallBack<IOACB>, comm_err_t, int, int, ConnectionDetail const &);
+ virtual void callCallback();
+
+private:
+ static MemPool *Pool;
+ CallBack<IOACB> callback;
+ int newfd;
+ ConnectionDetail details;
+};
+
+class CommFillCallbackData : public CommCallbackData
+{
+
+public:
+ void *operator new(size_t);
+ void operator delete(void *);
+ void deleteSelf() const;
+ CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t, int);
+ virtual void callCallback();
+
+private:
+ static MemPool *Pool;
+ CallBack<IOFCB> callback;
+};
+
+class CommWriteCallbackData : public CommCallbackData
+{
+
+public:
+ void *operator new(size_t);
+ void operator delete(void *);
+ void deleteSelf() const;
+ CommWriteCallbackData(int const anFd, CallBack<IOWCB> aCallback, comm_err_t, int, int);
+ virtual void callCallback();
+
+private:
+ static MemPool *Pool;
+ CallBack<IOWCB> callback;
+ int retval;
+};
struct _fd_debug_t
{
static MemPool *comm_write_pool = NULL;
static MemPool *conn_close_pool = NULL;
-static MemPool *comm_callback_pool = NULL;
fdc_t *fdc_table = NULL;
fd_debug_t *fdd_table = NULL;
dlink_list CommCallbackList;
-static int CommCallbackSeqnum = 1;
/* New and improved stuff */
-/*
- * return whether there are entries in the callback queue
- */
-int
-comm_existsiocallback(void)
+MemPool *CommCallbackData::Pool(NULL);
+void *
+CommCallbackData::operator new (size_t byteCount)
{
- return CommCallbackList.head == NULL;
+ /* derived classes with different sizes must implement their own new */
+ assert (byteCount == sizeof (CommCallbackData));
+
+ if (!Pool)
+ Pool = memPoolCreate("CommCallbackData", sizeof (CommCallbackData));
+
+ return memPoolAlloc(Pool);
}
-/*
- * add an IO callback
- *
- * IO callbacks are added when we want to notify someone that some IO
- * has finished but we don't want to risk re-entering a non-reentrant
- * code block.
- */
-static void
-comm_addreadcallback(int fd, IOCB *callback, char *buf, size_t retval, comm_err_t errcode,
- int xerrno, void *callback_data)
+void
+CommCallbackData::operator delete (void *address)
{
- CommCallbackData *cio;
+ memPoolFree (Pool, address);
+}
- assert(fdc_table[fd].active == 1);
+void
+CommCallbackData::deleteSelf() const
+{
+ delete this;
+}
- /* Allocate a new struct */
- cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+MemPool *CommReadCallbackData::Pool(NULL);
+void *
+CommReadCallbackData::operator new (size_t byteCount)
+{
+ /* derived classes with different sizes must implement their own new */
+ assert (byteCount == sizeof (CommReadCallbackData));
- /* Throw our data into it */
- cio->fd = fd;
- cio->retval = retval;
- cio->xerrno = xerrno;
- cio->errcode = errcode;
- cio->c.r_callback = callback;
- cio->callback_data = callback_data;
- cio->seqnum = CommCallbackSeqnum;
- cio->buf = buf;
- cio->type = COMM_CB_READ;
+ if (!Pool)
+ Pool = memPoolCreate("CommReadCallbackData", sizeof (CommReadCallbackData));
- /* Add it to the end of the list */
- dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
+ return memPoolAlloc(Pool);
+}
- /* and add it to the end of the fd list */
- dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
+void
+CommReadCallbackData::operator delete (void *address)
+{
+ memPoolFree (Pool, address);
+}
+void
+CommReadCallbackData::deleteSelf() const
+{
+ delete this;
}
+MemPool *CommAcceptCallbackData::Pool(NULL);
+void *
+CommAcceptCallbackData::operator new (size_t byteCount)
+{
+ /* derived classes with different sizes must implement their own new */
+ assert (byteCount == sizeof (CommAcceptCallbackData));
+ if (!Pool)
+ Pool = memPoolCreate("CommAcceptCallbackData", sizeof (CommAcceptCallbackData));
-static void
-comm_addacceptcallback(int fd, int newfd, IOACB *callback, ConnectionDetail details, comm_err_t errcode, int xerrno, void *callback_data)
+ return memPoolAlloc(Pool);
+}
+
+void
+CommAcceptCallbackData::operator delete (void *address)
{
- CommCallbackData *cio;
+ memPoolFree (Pool, address);
+}
- assert(fdc_table[fd].active == 1);
+void
+CommAcceptCallbackData::deleteSelf() const
+{
+ delete this;
+}
- /* Allocate a new struct */
- cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+MemPool *CommFillCallbackData::Pool(NULL);
+void *
+CommFillCallbackData::operator new (size_t byteCount)
+{
+ /* derived classes with different sizes must implement their own new */
+ assert (byteCount == sizeof (CommFillCallbackData));
- /* Throw our data into it */
- cio->fd = fd;
- cio->xerrno = xerrno;
- cio->errcode = errcode;
- cio->c.a_callback = callback;
- cio->callback_data = callback_data;
- cio->seqnum = CommCallbackSeqnum;
- cio->type = COMM_CB_ACCEPT;
- cio->newfd = newfd;
- cio->details = details;
+ if (!Pool)
+ Pool = memPoolCreate("CommFillCallbackData", sizeof (CommFillCallbackData));
+
+ return memPoolAlloc(Pool);
+}
+
+void
+CommFillCallbackData::operator delete (void *address)
+{
+ memPoolFree (Pool, address);
+}
+
+void
+CommFillCallbackData::deleteSelf() const
+{
+ delete this;
+}
+
+
+MemPool *CommWriteCallbackData::Pool(NULL);
+void *
+CommWriteCallbackData::operator new (size_t byteCount)
+{
+ /* derived classes with different sizes must implement their own new */
+ assert (byteCount == sizeof (CommWriteCallbackData));
+
+ if (!Pool)
+ Pool = memPoolCreate("CommWriteCallbackData", sizeof (CommWriteCallbackData));
+
+ return memPoolAlloc(Pool);
+}
+
+void
+CommWriteCallbackData::operator delete (void *address)
+{
+ memPoolFree (Pool, address);
+}
+
+void
+CommWriteCallbackData::deleteSelf() const
+{
+ delete this;
+}
+
+CommCallbackData::CommCallbackData(CommCommonCallback const &newResults) : result (newResults)
+{
+ assert(fdc_table[result.fd].active == 1);
+ registerSelf();
+}
+
+CommReadCallbackData::CommReadCallbackData(CommCommonCallback const &aResult, CallBack<IOCB> aCallback, int aRetval) : CommCallbackData(aResult), callback(aCallback), retval(aRetval)
+{}
+
+CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack<IOACB> aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails)
+{}
+
+CommFillCallbackData::CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t anErrcode, int anErrno) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback)
+{}
+CommWriteCallbackData::CommWriteCallbackData(int const anFd, CallBack<IOWCB> aCallback, comm_err_t anErrcode, int anErrno, int aRetval) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), retval (aRetval)
+{}
+
+void
+CommCallbackData::registerSelf()
+{
/* Add it to the end of the list */
- dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
+ dlinkAddTail(this, &h_node, &CommCallbackList);
/* and add it to the end of the fd list */
- dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
+ dlinkAddTail(this, &fd_node, &(fdc_table[result.fd].CommCallbackList));
+}
+void
+CommCallbackData::deRegisterSelf()
+{
+ dlinkDelete(&h_node, &CommCallbackList);
+ dlinkDelete(&fd_node, &(fdc_table[result.fd].CommCallbackList));
}
+/*
+ * add an IO callback
+ *
+ * IO callbacks are added when we want to notify someone that some IO
+ * has finished but we don't want to risk re-entering a non-reentrant
+ * code block.
+ */
static void
-comm_add_fill_callback(int fd, size_t retval, comm_err_t errcode, int xerrno)
+comm_add_fill_callback(int fd, size_t length, comm_err_t errcode, int xerrno)
{
CommCallbackData *cio;
- assert(fdc_table[fd].active == 1);
-
- /* Allocate a new struct */
- cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+ cio = new CommFillCallbackData(fd, CallBack<IOFCB>(fdc_table[fd].fill.handler, fdc_table[fd].fill.handler_data), errcode, xerrno);
/* Throw our data into it */
- cio->fd = fd;
- cio->xerrno = xerrno;
- cio->errcode = errcode;
- cio->c.f_callback = fdc_table[fd].fill.handler;
- cio->callback_data = fdc_table[fd].fill.handler_data;
- cio->seqnum = CommCallbackSeqnum;
- cio->type = COMM_CB_FILL;
- /* retval not used */
- cio->retval = -1;
cio->sb = fdc_table[fd].fill.requestedData;
- cio->sb.length = retval;
+ cio->sb.length = length;
/* Clear out fd state */
fdc_table[fd].fill.handler = NULL;
fdc_table[fd].fill.handler_data = NULL;
-
- /* Add it to the end of the list */
- dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
-
- /* and add it to the end of the fd list */
- dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
}
static void
{
CommCallbackData *cio;
- assert(fdc_table[fd].active == 1);
-
- /* Allocate a new struct */
- cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
-
- /* Throw our data into it */
- cio->fd = fd;
- cio->xerrno = xerrno;
- cio->errcode = errcode;
- cio->c.w_callback = fdc_table[fd].write.handler;
- cio->callback_data = fdc_table[fd].write.handler_data;
- cio->seqnum = CommCallbackSeqnum;
- cio->type = COMM_CB_WRITE;
- cio->retval = retval;
+ cio = new CommWriteCallbackData(fd, CallBack<IOWCB>(fdc_table[fd].write.handler, fdc_table[fd].write.handler_data), errcode, xerrno, retval);
/* Clear out fd state */
fdc_table[fd].write.handler = NULL;
fdc_table[fd].write.handler_data = NULL;
-
- /* Add it to the end of the list */
- dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
-
- /* and add it to the end of the fd list */
- dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
}
-
-
-static void
-comm_call_io_callback(CommCallbackData *cio)
+void
+CommReadCallbackData::callCallback()
{
- switch(cio->type) {
-
- case COMM_CB_READ:
- cio->c.r_callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno,
- cio->callback_data);
- break;
+ callback.handler(result.fd, buf, retval, result.errcode, result.xerrno, callback.data);
+}
- case COMM_CB_WRITE:
- cio->c.w_callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno,
- cio->callback_data);
- break;
+void
+CommAcceptCallbackData::callCallback()
+{
+ callback.handler(result.fd, newfd, &details, result.errcode, result.xerrno, callback.data);
+}
- case COMM_CB_ACCEPT:
- cio->c.a_callback(cio->fd, cio->newfd, &cio->details, cio->errcode,
- cio->xerrno, cio->callback_data);
- break;
+void
+CommWriteCallbackData::callCallback()
+{
+ callback.handler(result.fd, buf, retval, result.errcode, result.xerrno, callback.data);
+}
- case COMM_CB_FILL:
- cio->c.f_callback(cio->fd, cio->sb, cio->errcode,
- cio->xerrno, cio->callback_data);
- break;
+void
+CommFillCallbackData::callCallback()
+{
+ callback.handler(result.fd, sb, result.errcode, result.xerrno, callback.data);
+}
- default:
- fatal("unknown comm io callback type!");
- break;
- };
+void
+CommCallbackData::fdClosing()
+{
+ result.errcode = COMM_ERR_CLOSING;
}
+void
+CommCallbackData::callACallback()
+{
+ assert(fdc_table[result.fd].active == 1);
+ deRegisterSelf();
+ callCallback();
+}
/*
* call the IO callbacks
/* Call our callbacks until we hit NULL or the seqnum changes */
- while (CommCallbackList.head != NULL) {
+ while (CommCallbackList.head != NULL && oldseqnum != ((CommCallbackData *)CommCallbackList.head)->result.seqnum) {
+
node = (dlink_node *)CommCallbackList.head;
cio = (CommCallbackData *)node->data;
+ cio->callACallback();
+ cio->deleteSelf();
+ }
+}
- /* If seqnum isn't the same, its time to die */
-
- if (cio->seqnum != oldseqnum)
- break; /* we've hit newly-added events */
-
- assert(fdc_table[cio->fd].active == 1);
-
- dlinkDelete(&cio->h_node, &CommCallbackList);
-
- dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList));
+void
+CommRead::queueCallback(size_t retval, comm_err_t errcode, int xerrno)
+{
+ hasCallbackInvariant();
- comm_call_io_callback(cio);
+ CommCallbackData *cio;
+ cio = new CommReadCallbackData(CommCommonCallback(fd, errcode, xerrno),callback, retval);
- memPoolFree(comm_callback_pool, cio);
- }
+ /* Throw our data into it */
+ cio->buf = buf;
+ callback = CallBack<IOCB>();
}
-
-/*
- * Queue a callback
- */
-static void
-comm_read_callback(int fd, size_t retval, comm_err_t errcode, int xerrno)
+void
+CommRead::hasCallbackInvariant() const
{
- fdc_t *Fc = &fdc_table[fd];
+ assert (hasCallback());
+}
- assert(Fc->read.handler != NULL);
+void
+CommRead::hasNoCallbackInvariant() const
+{
+ assert (!hasCallback());
+}
- comm_addreadcallback(fd, Fc->read.handler, Fc->read.buf, retval, errcode, xerrno,
- Fc->read.handler_data);
- Fc->read.handler = NULL;
- Fc->read.handler_data = NULL;
+bool
+CommRead::hasCallback() const
+{
+ return callback.handler != NULL;
}
/*
* If the read attempt succeeds or fails, call the callback.
* Else, wait for another IO notification.
*/
-static void
-comm_read_try(int fd, void *data)
+void
+CommRead::ReadTry(int fd, void *data)
{
fdc_t *Fc = &fdc_table[fd];
- int retval;
+ assert (Fc->read.fd == fd);
+ assert (data == NULL);
+ Fc->read.tryReading();
+}
- /* make sure we actually have a callback */
- assert(Fc->read.handler != NULL);
+void
+CommRead::tryReading()
+{
+ hasCallbackInvariant();
/* Attempt a read */
statCounter.syscalls.sock.reads++;
errno = 0;
- retval = FD_READ_METHOD(fd, Fc->read.buf, Fc->read.size);
+ int retval;
+ retval = FD_READ_METHOD(fd, buf, len);
debug(5, 3) ("comm_read_try: fd %d, size %d, retval %d, errno %d\n",
- fd, Fc->read.size, retval, errno);
+ fd, len, retval, errno);
if (retval < 0 && !ignoreErrno(errno)) {
debug(5, 3) ("comm_read_try: scheduling COMM_ERROR\n");
- comm_read_callback(fd, 0, COMM_ERROR, errno);
+ queueCallback(0, COMM_ERROR, errno);
return;
};
/* Note - read 0 == socket EOF, which is a valid read */
if (retval >= 0) {
fd_bytes(fd, retval, FD_READ);
- comm_read_callback(fd, retval, COMM_OK, 0);
+ queueCallback(retval, COMM_OK, 0);
return;
}
/* Nope, register for some more IO */
- commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0);
+ commSetSelect(fd, COMM_SELECT_READ, ReadTry, NULL, 0);
}
/*
{
/* Make sure we're not reading anything and we're not closing */
assert(fdc_table[fd].active == 1);
- assert(fdc_table[fd].read.handler == NULL);
+ fdc_table[fd].read.hasNoCallbackInvariant();
assert(!fd_table[fd].flags.closing);
debug(5,4)("comm_read, queueing read for FD %d\n",fd);
/* Queue a read */
- fdc_table[fd].read.buf = buf;
- fdc_table[fd].read.size = size;
- fdc_table[fd].read.handler = handler;
- fdc_table[fd].read.handler_data = handler_data;
+ fdc_table[fd].read = CommRead(fd, buf, size, handler, handler_data);
+ fdc_table[fd].read.read();
+}
+void
+CommRead::read()
+{
#if OPTIMISTIC_IO
- comm_read_try(fd, NULL);
+ tryReading();
#else
- /* Register intrest in a FD read */
- commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0);
+
+ initiateActualRead();
#endif
}
+void
+CommRead::initiateActualRead()
+{
+ /* Register intrest in a FD read */
+ commSetSelect(fd, COMM_SELECT_READ, ReadTry, NULL, 0);
+}
+
static void
comm_fill_read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
/* TODO use a reference to the table entry, or use C++ :] */
- StoreIOBuffer *sb;
fdc_t::CommFiller *fill;
assert(fdc_table[fd].active == 1);
fill->amountDone += len;
- sb = &fdc_table[fd].fill.requestedData;
+ StoreIOBuffer *sb = &fdc_table[fd].fill.requestedData;
assert(fill->amountDone <= sb->length);
while (node != NULL) {
cd = (CommCallbackData *)node->data;
- if (cd->type == COMM_CB_READ)
+ if (cd->getType() == COMM_CB_READ)
return 1;
node = node->next;
assert(fd_table[fd].flags.open == 1);
assert(fdc_table[fd].active == 1);
- return (fdc_table[fd].read.handler != NULL);
+ return (fdc_table[fd].read.hasCallback());
}
/*
assert(fd_table[fd].flags.open == 1);
assert(fdc_table[fd].active == 1);
- assert(fdc_table[fd].read.handler == callback);
- assert(fdc_table[fd].read.handler_data == data);
+ assert(fdc_table[fd].read.callback == CallBack<IOCB>(callback,data));
assert(!comm_has_pending_read_callback(fd));
/* Ok, we can be reasonably sure we won't lose any data here! */
/* Delete the callback */
- fdc_table[fd].read.handler = NULL;
- fdc_table[fd].read.handler_data = NULL;
+ fdc_table[fd].read.callback = CallBack<IOCB>();
/* And the IO event */
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
return comm_openex(sock_type, proto, addr, port, flags, 0, note);
}
+static bool
+limitError(int const anErrno)
+{
+ return anErrno == ENFILE || anErrno == EMFILE;
+}
/* Create a socket. Default is blocking, stream (TCP) socket. IO_TYPE
* is OR of flags specified in defines.h:COMM_* */
* are failing because the open file table is full. This
* limits the number of simultaneous clients */
- switch (errno) {
-
- case ENFILE:
-
- case EMFILE:
+ if (limitError(errno)) {
debug(50, 1) ("comm_open: socket failure: %s\n", xstrerror());
fdAdjustReserved();
- break;
-
- default:
+ } else {
debug(50, 0) ("comm_open: socket failure: %s\n", xstrerror());
}
cs->fd = fd;
cs->host = xstrdup(host);
cs->port = port;
- cs->callback = callback;
- cs->data = cbdataReference(data);
+ cs->callback = CallBack<CNCB>(callback,cbdataReference(data));
comm_add_close_handler(fd, commConnectFree, cs);
cs->locks++;
ipcache_nbgethostbyname(host, commConnectDnsHandle, cs);
}
assert(dns_error_message != NULL);
- commConnectCallback(cs, COMM_ERR_DNS, 0);
+ cs->callCallback(COMM_ERR_DNS, 0);
return;
}
ipcacheCycleAddr(cs->host, NULL);
cs->addrcount = ia->count;
cs->connstart = squid_curtime;
- commConnectHandle(cs->fd, cs);
+ cs->connect();
}
-static void
-commConnectCallback(ConnectStateData * cs, comm_err_t status, int xerrno)
-{
- CNCB *callback = cs->callback;
- void *cbdata = cs->data;
- int fd = cs->fd;
- debug(5, 3) ("commConnectCallback: fd %d, data %p\n", fd, cbdata);
- comm_remove_close_handler(fd, commConnectFree, cs);
- cs->callback = NULL;
- cs->data = NULL;
+void
+ConnectStateData::callCallback(comm_err_t status, int xerrno)
+{
+ debug(5, 3) ("commConnectCallback: fd %d, data %p\n", fd, callback.data);
+ comm_remove_close_handler(fd, commConnectFree, this);
+ CallBack<CNCB> aCallback = callback;
+ callback = CallBack<CNCB>();
commSetTimeout(fd, -1, NULL, NULL);
- commConnectFree(fd, cs);
+ commConnectFree(fd, this);
- if (cbdataReferenceValid(cbdata))
- callback(fd, status, xerrno, cbdata);
+ if (cbdataReferenceValid(aCallback.data))
+ aCallback.handler(fd, status, xerrno, aCallback.data);
- cbdataReferenceDone(cbdata);
+ cbdataReferenceDone(aCallback.data);
}
static void
{
ConnectStateData *cs = (ConnectStateData *)data;
debug(5, 3) ("commConnectFree: FD %d\n", fd);
- cbdataReferenceDone(cs->data);
+ cbdataReferenceDone(cs->callback.data);
safe_free(cs->host);
cbdataFree(cs);
}
+static void
+copyFDFlags(int to, fde *F)
+{
+ if (F->flags.close_on_exec)
+ commSetCloseOnExec(to);
+
+ if (F->flags.nonblocking)
+ commSetNonBlocking(to);
+
+#ifdef TCP_NODELAY
+
+ if (F->flags.nodelay)
+ commSetTcpNoDelay(to);
+
+#endif
+
+ if (Config.tcpRcvBufsz > 0)
+ commSetTcpRcvbuf(to, Config.tcpRcvBufsz);
+}
+
/* Reset FD so that we can connect() again */
static int
commResetFD(ConnectStateData * cs)
{
- int fd2;
- fde *F;
-
- if (!cbdataReferenceValid(cs->data))
+ if (!cbdataReferenceValid(cs->callback.data))
return 0;
statCounter.syscalls.sock.sockets++;
- fd2 = socket(AF_INET, SOCK_STREAM, 0);
+ int fd2 = socket(AF_INET, SOCK_STREAM, 0);
statCounter.syscalls.sock.sockets++;
}
close(fd2);
- F = &fd_table[cs->fd];
+ fde *F = &fd_table[cs->fd];
fd_table[cs->fd].flags.called_connect = 0;
/*
* yuck, this has assumptions about comm_open() arguments for
#ifdef IP_TOS
if (F->tos) {
- int tos = F->tos;
-
- if (setsockopt(cs->fd, IPPROTO_IP, IP_TOS, (char *) &tos, sizeof(int)) < 0)
+ if (setsockopt(cs->fd, IPPROTO_IP, IP_TOS, (char *) &F->tos, sizeof(int)) < 0)
debug(50, 1) ("commResetFD: setsockopt(IP_TOS) on FD %d: %s\n", cs->fd, xstrerror());
}
#endif
- if (F->flags.close_on_exec)
- commSetCloseOnExec(cs->fd);
-
- if (F->flags.nonblocking)
- commSetNonBlocking(cs->fd);
-
-#ifdef TCP_NODELAY
-
- if (F->flags.nodelay)
- commSetTcpNoDelay(cs->fd);
-
-#endif
-
- if (Config.tcpRcvBufsz > 0)
- commSetTcpRcvbuf(cs->fd, Config.tcpRcvBufsz);
+ copyFDFlags (cs->fd, F);
return 1;
}
}
/* Connect SOCK to specified DEST_PORT at DEST_HOST. */
-static void
-commConnectHandle(int fd, void *data)
+void
+ConnectStateData::Connect (int fd, void *me)
{
- ConnectStateData *cs = (ConnectStateData *)data;
+ ConnectStateData *cs = (ConnectStateData *)me;
+ assert (cs->fd == fd);
+ cs->connect();
+}
+
+void
+ConnectStateData::defaults()
+{
+ S.sin_family = AF_INET;
+ S.sin_addr = in_addr;
+ S.sin_port = htons(port);
- if (cs->S.sin_addr.s_addr == 0) {
- cs->S.sin_family = AF_INET;
- cs->S.sin_addr = cs->in_addr;
- cs->S.sin_port = htons(cs->port);
+ if (Config.onoff.log_fqdn)
+ fqdncache_gethostbyaddr(S.sin_addr, FQDN_LOOKUP_IF_MISS);
+}
- if (Config.onoff.log_fqdn)
- fqdncache_gethostbyaddr(cs->S.sin_addr, FQDN_LOOKUP_IF_MISS);
- }
+void
+ConnectStateData::connect()
+{
+ if (S.sin_addr.s_addr == 0)
+ defaults();
- switch (comm_connect_addr(fd, &cs->S)) {
+ switch (comm_connect_addr(fd, &S)) {
case COMM_INPROGRESS:
debug(5, 5) ("commConnectHandle: FD %d: COMM_INPROGRESS\n", fd);
- commSetSelect(fd, COMM_SELECT_WRITE, commConnectHandle, cs, 0);
+ commSetSelect(fd, COMM_SELECT_WRITE, ConnectStateData::Connect, this, 0);
break;
case COMM_OK:
- ipcacheMarkGoodAddr(cs->host, cs->S.sin_addr);
- commConnectCallback(cs, COMM_OK, 0);
+ ipcacheMarkGoodAddr(host, S.sin_addr);
+ callCallback(COMM_OK, 0);
break;
default:
- cs->tries++;
- ipcacheMarkBadAddr(cs->host, cs->S.sin_addr);
+ tries++;
+ ipcacheMarkBadAddr(host, S.sin_addr);
if (Config.onoff.test_reachability)
- netdbDeleteAddrNetwork(cs->S.sin_addr);
+ netdbDeleteAddrNetwork(S.sin_addr);
- if (commRetryConnect(cs)) {
- cs->locks++;
- ipcache_nbgethostbyname(cs->host, commConnectDnsHandle, cs);
+ if (commRetryConnect(this)) {
+ locks++;
+ ipcache_nbgethostbyname(host, commConnectDnsHandle, this);
} else {
- commConnectCallback(cs, COMM_ERR_CONNECT, errno);
+ callCallback(COMM_ERR_CONNECT, errno);
}
-
- break;
}
}
int
commSetTimeout(int fd, int timeout, PF * handler, void *data)
{
- fde *F;
debug(5, 3) ("commSetTimeout: FD %d timeout %d\n", fd, timeout);
assert(fd >= 0);
assert(fd < Squid_MaxFD);
- F = &fd_table[fd];
+ fde *F = &fd_table[fd];
assert(F->flags.open);
if (timeout < 0) {
comm_close(fd);
}
+void
+CommRead::nullCallback()
+{
+ callback = CallBack<IOCB>();
+}
+
+void
+AcceptFD::nullCallback()
+{
+ callback = CallBack<IOACB>();
+}
+
+void
+CommRead::doCallback(comm_err_t errcode, int xerrno)
+{
+ if (callback.handler)
+ callback.handler(fd, buf, 0, errcode, xerrno, callback.data);
+
+ nullCallback();
+}
+
+void
+AcceptFD::doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *connDetails)
+{
+ if (callback.handler) {
+ CallBack<IOACB> aCallback = callback;
+ nullCallback();
+ aCallback.handler(fd, newfd, connDetails, errcode, xerrno, aCallback.data);
+ }
+}
/*
* Close the socket fd.
}
/* Do callbacks for read/accept/fill routines, if any */
- if (fdc_table[fd].read.handler) {
- fdc_table[fd].read.handler(fd, fdc_table[fd].read.buf, 0,
- COMM_ERR_CLOSING, 0, fdc_table[fd].read.handler_data);
- fdc_table[fd].read.handler = NULL;
- }
+ assert (fd == fdc_table[fd].read.fd);
- if (fdc_table[fd].accept.accept.handler) {
- fdc_table[fd].accept.accept.handler(fd, -1, NULL, COMM_ERR_CLOSING,
- 0, fdc_table[fd].accept.accept.handler_data);
- fdc_table[fd].accept.accept.handler = NULL;
- }
+ fdc_table[fd].read.doCallback(COMM_ERR_CLOSING, 0);
+
+ fdc_table[fd].accept.accept.doCallback(fd, -1, COMM_ERR_CLOSING, 0, NULL);
if (fdc_table[fd].fill.handler) {
fdc_table[fd].fill.handler(fd, fdc_table[fd].fill.requestedData, COMM_ERR_CLOSING, 0,
while (fdc_table[fd].CommCallbackList.head != NULL) {
node = fdc_table[fd].CommCallbackList.head;
cio = (CommCallbackData *)node->data;
- assert(fd == cio->fd); /* just paranoid */
- dlinkDelete(&cio->h_node, &CommCallbackList);
- dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList));
+ assert(fd == cio->result.fd); /* just paranoid */
/* We're closing! */
- cio->errcode = COMM_ERR_CLOSING;
- comm_call_io_callback(cio);
- memPoolFree(comm_callback_pool, cio);
+ cio->fdClosing();
+ cio->callACallback();
+ cio->deleteSelf();
}
commCallCloseHandlers(fd);
fdc_table[fd].half_closed = false;
}
- bzero(&fdc_table[fd], sizeof(fdc_t));
+ fdc_table[fd] = fdc_t(fd);
statCounter.syscalls.sock.closes++;
x = sendto(fd, buf, len, 0, (struct sockaddr *) to_addr, addr_len);
PROF_stop(comm_udp_sendto);
- if (x < 0)
- {
+ if (x >= 0)
+ return x;
+
#ifdef _SQUID_LINUX_
- if (ECONNREFUSED != errno)
+ if (ECONNREFUSED != errno)
#endif
- debug(50, 1) ("comm_udp_sendto: FD %d, %s, port %d: %s\n",
- fd,
- inet_ntoa(to_addr->sin_addr),
- (int) htons(to_addr->sin_port),
- xstrerror());
-
- return COMM_ERROR;
- }
+ debug(50, 1) ("comm_udp_sendto: FD %d, %s, port %d: %s\n",
+ fd,
+ inet_ntoa(to_addr->sin_addr),
+ (int) htons(to_addr->sin_port),
+ xstrerror());
- return x;
+ return COMM_ERROR;
}
void
{
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
- fdc_table = (fdc_t *)xcalloc(Squid_MaxFD, sizeof(fdc_t));
+ fdc_table = new fdc_t[Squid_MaxFD];
+
+ for (int pos = 0; pos < Squid_MaxFD; ++pos)
+ fdc_table[pos] = fdc_t(pos);
+
/* XXX account fd_table */
/* Keep a few file descriptors free so that we don't run out of FD's
* after accepting a client but before it opens a socket or a file.
* Since Squid_MaxFD can be as high as several thousand, don't waste them */
RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
+
CBDATA_INIT_TYPE(ConnectStateData);
- comm_callback_pool = memPoolCreate("comm callbacks", sizeof(CommCallbackData));
comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData));
+
conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
}
}
}
+static bool
+AlreadyTimedOut(fde *F)
+{
+ if (!F->flags.open)
+ return true;
+
+ if (F->timeout == 0)
+ return true;
+
+ if (F->timeout > squid_curtime)
+ return true;
+
+ return false;
+}
+
void
checkTimeouts(void)
{
for (fd = 0; fd <= Biggest_FD; fd++) {
F = &fd_table[fd];
- if (!F->flags.open)
- continue;
-
- if (F->timeout == 0)
- continue;
-
- if (F->timeout > squid_curtime)
+ if (AlreadyTimedOut(F))
continue;
debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);
return sock;
}
+void
+fdc_t::beginAccepting()
+{
+ accept.accept.beginAccepting();
+}
-/*
- * This callback is called whenever a filedescriptor is ready
- * to dupe itself and fob off an accept()ed connection
- */
-static void
-comm_accept_try(int fd, void *data)
+int
+fdc_t::acceptCount() const
{
- int newfd;
- fdc_t *Fc;
- int count;
- IOACB *hdl;
+ return accept.accept.acceptCount();
+}
- assert(fdc_table[fd].active == 1);
+void
+fdc_t::acceptOne(int fd)
+{
+ /* If we're out of fds, register an event and return now */
- Fc = &(fdc_table[fd]);
+ if (fdNFree() < RESERVED_FD) {
+ debug(5, 3) ("comm_accept_try: we're out of fds - deferring io!\n");
+ eventAdd("comm_accept_check_event", comm_accept_check_event, this,
+ 1000.0 / (double)(accept.accept.check_delay), 1);
+ accept.accept.finished(true);
+ return;
+ }
- for (count = 0; count < MAX_ACCEPT_PER_LOOP; count++) {
- /* If we're out of fds, register an event and return now */
+ /* Accept a new connection */
+ int newfd = comm_old_accept(fd, accept.connDetails);
- if (fdNFree() < RESERVED_FD) {
- debug(5, 3) ("comm_accept_try: we're out of fds - deferring io!\n");
- eventAdd("comm_accept_check_event", comm_accept_check_event, &fdc_table[fd],
- 1000.0 / (double)(fdc_table[fd].accept.accept.check_delay), 1);
+ /* Check for errors */
+ if (newfd < 0) {
+ if (newfd == COMM_NOMESSAGE) {
+ /* register interest again */
+ commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
+ accept.accept.finished(true);
return;
}
- /* Accept a new connection */
- newfd = comm_old_accept(fd, Fc->accept.connDetails);
+ /* A non-recoverable error - register an error callback */
+ new CommAcceptCallbackData(fd, accept.accept.callback, COMM_ERROR, errno, -1, accept.connDetails);
- /* Check for errors */
- if (newfd < 0) {
- if (newfd == COMM_NOMESSAGE) {
- /* register interest again */
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return;
- }
+ accept.accept.callback = CallBack<IOACB>();
- /* A non-recoverable error - register an error callback */
- comm_addacceptcallback(fd, -1, Fc->accept.accept.handler, Fc->accept.connDetails, COMM_ERROR, errno, Fc->accept.accept.handler_data);
+ accept.accept.finished(true);
- Fc->accept.accept.handler = NULL;
+ return;
+ }
- Fc->accept.accept.handler_data = NULL;
+ accept.accept.doCallback(fd, newfd, COMM_OK, 0, &accept.connDetails);
- return;
- }
+ /* If we weren't re-registed, don't bother trying again! */
- /* Try the callback! */
- hdl = Fc->accept.accept.handler;
+ if (accept.accept.callback.handler == NULL)
+ accept.accept.finished(true);
+}
- Fc->accept.accept.handler = NULL;
+bool
+AcceptFD::finished() const
+{
+ return finished_;
+}
- hdl(fd, newfd, &Fc->accept.connDetails, COMM_OK, 0, Fc->accept.accept.handler_data);
+void
+AcceptFD::finished(bool newValue)
+{
+ finished_ = newValue;
+}
- /* If we weren't re-registed, don't bother trying again! */
- if (Fc->accept.accept.handler == NULL)
- return;
- }
+bool
+AcceptFD::finishedAccepting() const
+{
+ return acceptCount() >= MAX_ACCEPT_PER_LOOP || finished();
}
+/*
+ * This callback is called whenever a filedescriptor is ready
+ * to dupe itself and fob off an accept()ed connection
+ */
+static void
+comm_accept_try(int fd, void *data)
+{
+ assert(fdc_table[fd].active == 1);
+
+ fdc_table[fd].beginAccepting();
+
+ while (!fdc_table[fd].accept.accept.finishedAccepting())
+ fdc_table[fd].acceptOne(fd);
+}
/*
* Notes:
assert(fdc_table[fd].active == 1);
/* make sure we're not pending! */
- assert(fdc_table[fd].accept.accept.handler == NULL);
+ assert(fdc_table[fd].accept.accept.callback.handler == NULL);
/* Record our details */
Fc = &fdc_table[fd];
- Fc->accept.accept.handler = handler;
- Fc->accept.accept.handler_data = handler_data;
+ Fc->accept.accept.callback = CallBack<IOACB> (handler, handler_data);
/* Kick off the accept */
#if OPTIMISTIC_IO
*/
}
-CommRead::CommRead() : fd(-1), buf(NULL), len(0), handler(NULL), data(NULL)
+CommRead::CommRead() : fd(-1), buf(NULL), len(0)
{}
CommRead::CommRead(int fd_, char *buf_, int len_, IOCB *handler_, void *data_)
- : fd(fd_), buf(buf_), len(len_), handler(handler_), data(data_)
+ : fd(fd_), buf(buf_), len(len_), callback(handler_, data_)
{}
DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false)
{
cancelled = true;
}
+
+ConnectionDetail::ConnectionDetail()
+{
+ bzero(&me, sizeof(me));
+ bzero(&peer, sizeof(peer));
+}