From: wessels <> Date: Tue, 21 Jul 1998 23:03:48 +0000 (+0000) Subject: forked comm_select.c from comm.c X-Git-Tag: SQUID_3_0_PRE1~3027 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=6a98830880e6218baef702470777098e2d895f06;p=thirdparty%2Fsquid.git forked comm_select.c from comm.c --- diff --git a/src/Makefile.in b/src/Makefile.in index 5accef0212..90589f5748 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.in,v 1.154 1998/07/17 00:48:21 wessels Exp $ +# $Id: Makefile.in,v 1.155 1998/07/21 17:03:48 wessels Exp $ # # Uncomment and customize the following to suit your needs: # @@ -89,6 +89,7 @@ OBJS = \ client_db.o \ client_side.o \ comm.o \ + comm_select.o \ debug.o \ disk.o \ dns.o \ diff --git a/src/comm.cc b/src/comm.cc index 7ef441d08a..be6f0ecdd7 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -1,7 +1,7 @@ /* - * $Id: comm.cc,v 1.275 1998/07/20 20:20:56 wessels Exp $ + * $Id: comm.cc,v 1.276 1998/07/21 17:03:50 wessels Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -106,14 +106,13 @@ */ #include "squid.h" -#include #ifdef HAVE_NETINET_TCP_H #include #endif #if USE_ASYNC_IO -#define MAX_POLL_TIME 50 +#define MAX_POLL_TIME 10 #else #define MAX_POLL_TIME 1000 #endif @@ -133,15 +132,12 @@ typedef struct { } ConnectStateData; /* STATIC */ -static int incame = 0; static int commBind(int s, struct in_addr, u_short port); #if !HAVE_POLL static int examine_select(fd_set *, fd_set *); #endif -static void checkTimeouts(void); static void commSetReuseAddr(int); static void commSetNoLinger(int); -static void comm_incoming(void); static void CommWriteStateCallbackAndFree(int fd, int code); #ifdef TCP_NODELAY static void commSetTcpNoDelay(int); @@ -150,66 +146,10 @@ static void commSetTcpRcvbuf(int, int); static PF commConnectFree; static PF commConnectHandle; static PF commHandleWrite; -static int fdIsHttpOrIcp(int fd); static IPH commConnectDnsHandle; static void commConnectCallback(ConnectStateData * cs, int status); -static int commDeferRead(int fd); static int commResetFD(ConnectStateData * cs); static int commRetryConnect(ConnectStateData * cs); -static OBJH commIncomingStats; - -static struct timeval zero_tv; - -/* - * Automatic tuning for incoming requests: - * - * INCOMING sockets are the ICP and HTTP ports. We need to check these - * fairly regularly, but how often? When the load increases, we - * want to check the incoming sockets more often. If we have a lot - * of incoming ICP, then we need to check these sockets more than - * if we just have HTTP. - * - * The variable 'incoming_interval' determines how many normal I/O - * events to process before checking incoming sockets again. - * Note we store the incoming_interval multipled by a factor - * of 16 (e.g. <<4) to have some pseudo-floating point precision. - * - * The variable 'io_events' counts how many normal I/O events have - * been processed. When io_events > incoming_interval, its time - * to check incoming sockets. - * - * Every time we check incoming sockets, we count how many new messages - * or connections were processed. This is used to adjust the - * incoming_interval for the next iteration. The new incoming_interval - * is calculated as the average of the current incoming_interval and - * 32 divided by the number of incoming events just processed. e.g. - * - * 1 1 32 - * incoming_interval = - incoming_interval + - ----------------- - * 2 2 incoming_events - * - * You can see the current value of incoming_interval, as well as - * a histogram of 'incoming_events' by asking the cache manager - * for 'comm_incoming', e.g.: - * - * % ./client mgr:comm_incoming - * - * Bugs: - * - * - We have 32 as a magic upper limit on incoming_interval. - * - INCOMING_TOTAL_MAX = INCOMING_ICP_MAX + INCOMING_HTTP_MAX, - * but this assumes only one ICP socket and one HTTP socket. - * If there are multiple incoming HTTP sockets, the we could - * conceivably process more than INCOMING_TOTAL_MAX events - * in comm_incoming(). - * - * The 'invert32[]' array is a pre-calculated array of division for 32/i - * - */ -static int io_events = 0; -static int incoming_interval = 16 << 4; -static int invert32[INCOMING_TOTAL_MAX]; -#define commCheckIncoming (++io_events > (incoming_interval>>4)) static void CommWriteStateCallbackAndFree(int fd, int code) @@ -364,7 +304,7 @@ void commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data) { ConnectStateData *cs = xcalloc(1, sizeof(ConnectStateData)); - debug(5,3)("commConnectStart: FD %d, %s:%d\n", fd, host, (int) port); + debug(5, 3) ("commConnectStart: FD %d, %s:%d\n", fd, host, (int) port); cbdataAdd(cs, MEM_NONE); cs->fd = fd; cs->host = xstrdup(host); @@ -755,323 +695,6 @@ commSetDefer(int fd, DEFER * func, void *data) F->defer_data = data; } -static int -commDeferRead(int fd) -{ - fde *F = &fd_table[fd]; - if (F->defer_check == NULL) - return 0; - return F->defer_check(fd, F->defer_data); -} - -static void -comm_incoming(void) -{ - int j; - incame = 0; - io_events = 0; -#if !ALARM_UPDATES_TIME - getCurrentTime(); -#endif - if (theInIcpConnection > 0) { - icpHandleUdp(theInIcpConnection, &incame); - if (theInIcpConnection != theOutIcpConnection) - icpHandleUdp(theOutIcpConnection, &incame); - if (IcpQueueHead) - icpUdpSendQueue(theOutIcpConnection, NULL); - } - for (j = 0; j < NHttpSockets; j++) { - if (HttpSockets[j] < 0) - continue; - httpAccept(HttpSockets[j], &incame); - } - statHistCount(&Counter.comm_incoming, incame); - if (incame < INCOMING_TOTAL_MAX) - incoming_interval = (incoming_interval >> 1) + (invert32[incame] << 3); -} - -static int -fdIsHttpOrIcp(int fd) -{ - int j; - if (fd == theInIcpConnection) - return 1; - if (fd == theOutIcpConnection) - return 1; - for (j = 0; j < NHttpSockets; j++) { - if (fd == HttpSockets[j]) - return 1; - } - return 0; -} - -#if HAVE_POLL -/* poll all sockets; call handlers for those that are ready. */ -int -comm_poll(int msec) -{ - struct pollfd pfds[SQUID_MAXFD]; - PF *hdl = NULL; - int fd; - int i; - int maxfd; - unsigned long nfds; - int num; - static time_t last_timeout = 0; - static int lastinc = 0; - double timeout = current_dtime + (msec / 1000.0); - do { -#if !ALARM_UPDATES_TIME - getCurrentTime(); -#endif - if (shutting_down) { - serverConnectionsClose(); - dnsShutdownServers(); - redirectShutdownServers(); - /* shutting_down will be set to - * +1 for SIGTERM - * -1 for SIGINT */ - if (shutting_down > 0) - setSocketShutdownLifetimes(Config.shutdownLifetime); - else - setSocketShutdownLifetimes(1); - } -#if USE_ASYNC_IO - aioCheckCallbacks(); -#endif - comm_incoming(); - nfds = 0; - maxfd = Biggest_FD + 1; - for (i = 0; i < maxfd; i++) { - int events; - events = 0; - /* Check each open socket for a handler. */ - if (fd_table[i].read_handler && !commDeferRead(i)) - events |= POLLRDNORM; - if (fd_table[i].write_handler) - events |= POLLWRNORM; - if (events) { - pfds[nfds].fd = i; - pfds[nfds].events = events; - pfds[nfds].revents = 0; - nfds++; - } - } - if (shutting_down) - debug(5, 2) ("comm_poll: Still waiting on %d FDs\n", nfds); - if (nfds == 0) - return COMM_SHUTDOWN; - if (msec > MAX_POLL_TIME) - msec = MAX_POLL_TIME; - for (;;) { - num = poll(pfds, nfds, msec); - Counter.select_loops++; - if (num >= 0) - break; - if (ignoreErrno(errno)) - continue; - debug(5, 0) ("comm_poll: poll failure: %s\n", xstrerror()); - assert(errno != EINVAL); - return COMM_ERROR; - /* NOTREACHED */ - } - debug(5, num ? 5 : 8) ("comm_poll: %d sockets ready\n", num); - /* Check timeout handlers ONCE each second. */ - if (squid_curtime > last_timeout) { - last_timeout = squid_curtime; - checkTimeouts(); - } - if (num == 0) - continue; - /* scan each socket but the accept socket. Poll this - * more frequently to minimize losses due to the 5 connect - * limit in SunOS */ - for (i = 0; i < nfds; i++) { - int revents; - if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1)) - continue; - if (fdIsHttpOrIcp(fd)) - continue; - if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) { - debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd); - if ((hdl = fd_table[fd].read_handler)) { - fd_table[fd].read_handler = NULL; - hdl(fd, fd_table[fd].read_data); - } - if (commCheckIncoming) - comm_incoming(); - } - if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) { - debug(5, 5) ("comm_poll: FD %d ready for writing\n", fd); - if ((hdl = fd_table[fd].write_handler)) { - fd_table[fd].write_handler = NULL; - hdl(fd, fd_table[fd].write_data); - } - if (commCheckIncoming) - comm_incoming(); - } - if (revents & POLLNVAL) { - close_handler *ch; - fde *F = &fd_table[fd]; - debug(5, 0) ("WARNING: FD %d has handlers, but it's invalid.\n", fd); - debug(5, 0) ("FD %d is a %s\n", fd, fdTypeStr[fd_table[fd].type]); - debug(5, 0) ("--> %s\n", fd_table[fd].desc); - debug(5, 0) ("tmout:%p read:%p write:%p\n", - F->timeout_handler, - F->read_handler, - F->write_handler); - for (ch = F->close_handler; ch; ch = ch->next) - debug(5, 0) (" close handler: %p\n", ch->handler); - if (F->close_handler) { - commCallCloseHandlers(fd); - } else if (F->timeout_handler) { - debug(5, 0) ("comm_poll: Calling Timeout Handler\n"); - F->timeout_handler(fd, F->timeout_data); - } - F->close_handler = NULL; - F->timeout_handler = NULL; - F->read_handler = NULL; - F->write_handler = NULL; - if (F->open != 0) - fd_close(fd); - } - lastinc = incame; - } - return COMM_OK; - } while (timeout > current_dtime); - debug(5, 8) ("comm_poll: time out: %d.\n", squid_curtime); - return COMM_TIMEOUT; -} - -#else - -/* Select on all sockets; call handlers for those that are ready. */ -int -comm_select(int msec) -{ - fd_set readfds; - fd_set writefds; - PF *hdl = NULL; - int fd; - int i; - int maxfd; - int nfds; - int num; - static time_t last_timeout = 0; - struct timeval poll_time; - static int lastinc; - double timeout = current_dtime + (msec / 1000.0); - - do { -#if !ALARM_UPDATES_TIME - getCurrentTime(); -#endif - -#if USE_ASYNC_IO - aioCheckCallbacks(); -#endif - - FD_ZERO(&readfds); - FD_ZERO(&writefds); - if (shutting_down) { - serverConnectionsClose(); - dnsShutdownServers(); - redirectShutdownServers(); - /* shutting_down will be set to - * +1 for SIGTERM - * -1 for SIGINT */ - if (shutting_down > 0) - setSocketShutdownLifetimes(Config.shutdownLifetime); - else - setSocketShutdownLifetimes(1); - } - comm_incoming(); - nfds = 0; - maxfd = Biggest_FD + 1; - for (i = 0; i < maxfd; i++) { - /* Check each open socket for a handler. */ - if (fd_table[i].read_handler && !commDeferRead(i)) { - nfds++; - FD_SET(i, &readfds); - } - if (fd_table[i].write_handler) { - nfds++; - FD_SET(i, &writefds); - } - } - if (shutting_down) - debug(5, 2) ("comm_select: Still waiting on %d FDs\n", nfds); - if (nfds == 0) - return COMM_SHUTDOWN; - if (msec > MAX_POLL_TIME) - msec = MAX_POLL_TIME; - for (;;) { - poll_time.tv_sec = msec / 1000; - poll_time.tv_usec = (msec % 1000) * 1000; - num = select(maxfd, &readfds, &writefds, NULL, &poll_time); - Counter.select_loops++; - if (num >= 0) - break; - if (ignoreErrno(errno)) - break; - debug(50, 0) ("comm_select: select failure: %s\n", - xstrerror()); - examine_select(&readfds, &writefds); - return COMM_ERROR; - /* NOTREACHED */ - } - if (num < 0) - continue; - debug(5, num ? 5 : 8) ("comm_select: %d sockets ready at %d\n", - num, (int) squid_curtime); - - /* Check lifetime and timeout handlers ONCE each second. - * Replaces brain-dead check every time through the loop! */ - if (squid_curtime > last_timeout) { - last_timeout = squid_curtime; - checkTimeouts(); - } - if (num == 0) - continue; - - /* scan each socket but the accept socket. Poll this - * more frequently to minimize losses due to the 5 connect - * limit in SunOS */ - - for (fd = 0; fd < maxfd; fd++) { - if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds)) - continue; - if (fdIsHttpOrIcp(fd)) - continue; - if (FD_ISSET(fd, &readfds)) { - debug(5, 6) ("comm_select: FD %d ready for reading\n", fd); - if (fd_table[fd].read_handler) { - hdl = fd_table[fd].read_handler; - fd_table[fd].read_handler = NULL; - hdl(fd, fd_table[fd].read_data); - } - if (commCheckIncoming) - comm_incoming(); - } - if (FD_ISSET(fd, &writefds)) { - debug(5, 5) ("comm_select: FD %d ready for writing\n", fd); - if (fd_table[fd].write_handler) { - hdl = fd_table[fd].write_handler; - fd_table[fd].write_handler = NULL; - hdl(fd, fd_table[fd].write_data); - } - if (commCheckIncoming) - comm_incoming(); - } - lastinc = incame; - } - return COMM_OK; - } while (timeout > current_dtime); - debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime); - return COMM_TIMEOUT; -} -#endif - void commSetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout) { @@ -1098,7 +721,7 @@ comm_add_close_handler(int fd, PF * handler, void *data) close_handler *c; debug(5, 5) ("comm_add_close_handler: FD %d, handler=%p, data=%p\n", fd, handler, data); - for (c = fd_table[fd].close_handler; c; c=c->next) + for (c = fd_table[fd].close_handler; c; c = c->next) assert(c->handler != handler || c->data != data); new->handler = handler; new->data = data; @@ -1197,24 +820,16 @@ commSetTcpNoDelay(int fd) } #endif + void comm_init(void) { - int i; fd_table = xcalloc(Squid_MaxFD, sizeof(fde)); /* XXX account fd_table */ /* Keep a few file descriptors free so that we don't run out of FD's * after accepting a client but before it opens a socket or a file. * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); - zero_tv.tv_sec = 0; - zero_tv.tv_usec = 0; - invert32[0] = 32; - for (i = 1; i < INCOMING_TOTAL_MAX; i++) - invert32[i] = (int) (32.0 / (double) i + 0.5); - cachemgrRegister("comm_incoming", - "comm_incoming() stats", - commIncomingStats, 0, 1); } @@ -1286,33 +901,6 @@ examine_select(fd_set * readfds, fd_set * writefds) } #endif -static void -checkTimeouts(void) -{ - int fd; - fde *F = NULL; - PF *callback; - for (fd = 0; fd <= Biggest_FD; fd++) { - F = &fd_table[fd]; - if (F->open != FD_OPEN) - continue; - if (F->timeout == 0) - continue; - if (F->timeout > squid_curtime) - continue; - debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd); - if (F->timeout_handler) { - debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd); - callback = F->timeout_handler; - F->timeout_handler = NULL; - callback(fd, F->timeout_data); - } else { - debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd); - comm_close(fd); - } - } -} - /* Write to FD. */ static void commHandleWrite(int fd, void *data) @@ -1326,7 +914,7 @@ commHandleWrite(int fd, void *data) nleft = state->size - state->offset; len = write(fd, state->buf + state->offset, nleft); - debug(5,5)("commHandleWrite: write() returns %d\n", len); + debug(5, 5) ("commHandleWrite: write() returns %d\n", len); fd_bytes(fd, len, FD_WRITE); if (len == 0) { @@ -1424,15 +1012,3 @@ ignoreErrno(int ierrno) } /* NOTREACHED */ } - -static void -commIncomingStats(StoreEntry * sentry) -{ - StatCounters *f = &Counter; - storeAppendPrintf(sentry, "Current incoming_interval: %d\n", - incoming_interval >> 4); - storeAppendPrintf(sentry, "\n"); - storeAppendPrintf(sentry, "Histogram of number of incoming sockets or\n"); - storeAppendPrintf(sentry, "Messages handled per comm_incoming() call:\n"); - statHistDump(&f->comm_incoming, sentry, statHistIntDumper); -} diff --git a/src/comm_select.cc b/src/comm_select.cc new file mode 100644 index 0000000000..da6c45737a --- /dev/null +++ b/src/comm_select.cc @@ -0,0 +1,775 @@ + + +/* + * $Id: comm_select.cc,v 1.1 1998/07/21 17:03:49 wessels Exp $ + * + * DEBUG: section 5 Socket Functions + * + * SQUID Internet Object Cache http://squid.nlanr.net/Squid/ + * -------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from the + * Internet community. Development is led by Duane Wessels of the + * National Laboratory for Applied Network Research and funded by + * the National Science Foundation. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#include "squid.h" + +#if USE_ASYNC_IO +#define MAX_POLL_TIME 10 +#else +#define MAX_POLL_TIME 1000 +#endif + +/* STATIC */ +#if !HAVE_POLL +static int examine_select(fd_set *, fd_set *); +#endif +static int fdIsHttp(int fd); +static int fdIsIcp(int fd); +static int commDeferRead(int fd); +static void checkTimeouts(void); +static OBJH commIncomingStats; +#if HAVE_POLL +static int comm_check_incoming_poll_handlers(int nfds, int *fds); +#else +static int comm_check_incoming_select_handlers(int nfds, int *fds); +#endif + +static struct timeval zero_tv; + +/* + * Automatic tuning for incoming requests: + * + * INCOMING sockets are the ICP and HTTP ports. We need to check these + * fairly regularly, but how often? When the load increases, we + * want to check the incoming sockets more often. If we have a lot + * of incoming ICP, then we need to check these sockets more than + * if we just have HTTP. + * + * The variables 'incoming_icp_interval' and 'incoming_http_interval' + * determine how many normal I/O events to process before checking + * incoming sockets again. Note we store the incoming_interval + * multipled by a factor of (2^INCOMING_FACTOR) to have some + * pseudo-floating point precision. + * + * The variable 'icp_io_events' and 'http_io_events' counts how many normal + * I/O events have been processed since the last check on the incoming + * sockets. When io_events > incoming_interval, its time to check incoming + * sockets. + * + * Every time we check incoming sockets, we count how many new messages + * or connections were processed. This is used to adjust the + * incoming_interval for the next iteration. The new incoming_interval + * is calculated as the current incoming_interval plus what we would + * like to see as an average number of events minus the number of + * events just processed. + * + * incoming_interval = incoming_interval + 1 - number_of_events_processed + * + * There are separate incoming_interval counters for both HTTP and ICP events + * + * You can see the current values of the incoming_interval's, as well as + * a histogram of 'incoming_events' by asking the cache manager + * for 'comm_incoming', e.g.: + * + * % ./client mgr:comm_incoming + * + * Caveats: + * + * - We have MAX_INCOMING_INTEGER as a magic upper limit on + * incoming_interval for both types of sockets. At the + * largest value the cache will effectively be idling. + * + * - The higher the INCOMING_FACTOR, the slower the algorithm will + * respond to load spikes/increases/decreases in demand. A value + * between 3 and 8 is recommended. + */ + +#define MAX_INCOMING_INTEGER 256 +#define INCOMING_FACTOR 5 +#define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR) +static int icp_io_events = 0; +static int http_io_events = 0; +static int incoming_icp_interval = 16 << INCOMING_FACTOR; +static int incoming_http_interval = 16 << INCOMING_FACTOR; +#define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>> INCOMING_FACTOR)) +#define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>> INCOMING_FACTOR)) + +static int +commDeferRead(int fd) +{ + fde *F = &fd_table[fd]; + if (F->defer_check == NULL) + return 0; + return F->defer_check(fd, F->defer_data); +} + +static int +fdIsIcp(int fd) +{ + if (fd == theInIcpConnection) + return 1; + if (fd == theOutIcpConnection) + return 1; + return 0; +} + +static int +fdIsHttp(int fd) +{ + int j; + for (j = 0; j < NHttpSockets; j++) { + if (fd == HttpSockets[j]) + return 1; + } + return 0; +} + +#if HAVE_POLL +static int +comm_check_incoming_poll_handlers(int nfds, int *fds) +{ + int i; + int fd; + int incame = 0; + PF *hdl = NULL; + int npfds; + struct pollfd pfds[3 + MAXHTTPPORTS]; + for (i = npfds = 0; i < nfds; i++) { + int events; + fd = fds[i]; + events = 0; + if (fd_table[fd].read_handler) + events |= POLLRDNORM; + if (fd_table[fd].write_handler) + events |= POLLWRNORM; + if (events) { + pfds[npfds].fd = fd; + pfds[npfds].events = events; + pfds[npfds].revents = 0; + npfds++; + } + } + if (!nfds) + return incame; +#if !ALARM_UPDATES_TIME + getCurrentTime(); +#endif + if (poll(pfds, npfds, 0) < 1) + return incame; + for (i = 0; i < npfds; i++) { + int revents; + if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1)) + continue; + if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) { + if ((hdl = fd_table[fd].read_handler)) { + fd_table[fd].read_handler = NULL; + hdl(fd, &incame); + } else + debug(5, 1) ("comm_poll_incoming: NULL read handler\n"); + } + if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) { + if ((hdl = fd_table[fd].write_handler)) { + fd_table[fd].write_handler = NULL; + hdl(fd, &incame); + } else + debug(5, 1) ("comm_poll_incoming: NULL write handler\n"); + } + } + return incame; +} + +static void +comm_poll_icp_incoming(void) +{ + int nfds = 0; + int fds[2]; + int nevents; + icp_io_events = 0; + if (theInIcpConnection >= 0) + fds[nfds++] = theInIcpConnection; + if (theInIcpConnection != theOutIcpConnection) + if (theOutIcpConnection >= 0) + fds[nfds++] = theOutIcpConnection; + if (nfds == 0) + return; + nevents = comm_check_incoming_poll_handlers(nfds, fds); + incoming_icp_interval = incoming_icp_interval + 1 - nevents; + if (incoming_icp_interval < 0) + incoming_icp_interval = 0; + if (incoming_icp_interval > MAX_INCOMING_INTERVAL) + incoming_icp_interval = MAX_INCOMING_INTERVAL; + if (nevents > INCOMING_ICP_MAX) + nevents = INCOMING_ICP_MAX; + statHistCount(&Counter.comm_icp_incoming, nevents); +} + +static void +comm_poll_http_incoming(void) +{ + int nfds = 0; + int fds[MAXHTTPPORTS]; + int j; + int nevents; + http_io_events = 0; + for (j = 0; j < NHttpSockets; j++) { + if (HttpSockets[j] < 0) + continue; + if (commDeferRead(HttpSockets[j])) + continue; + fds[nfds++] = HttpSockets[j]; + } + nevents = comm_check_incoming_poll_handlers(nfds, fds); + incoming_http_interval = incoming_http_interval + 1 - nevents; + if (incoming_http_interval < 0) + incoming_http_interval = 0; + if (incoming_http_interval > MAX_INCOMING_INTERVAL) + incoming_http_interval = MAX_INCOMING_INTERVAL; + if (nevents > INCOMING_HTTP_MAX) + nevents = INCOMING_HTTP_MAX; + statHistCount(&Counter.comm_http_incoming, nevents); +} + +/* poll all sockets; call handlers for those that are ready. */ +int +comm_poll(int msec) +{ + struct pollfd pfds[SQUID_MAXFD]; + PF *hdl = NULL; + int fd; + int i; + int maxfd; + unsigned long nfds; + int num; + int callicp = 0, callhttp = 0; + static time_t last_timeout = 0; + double timeout = current_dtime + (msec / 1000.0); + do { +#if !ALARM_UPDATES_TIME + getCurrentTime(); +#endif + if (shutting_down) { + serverConnectionsClose(); + dnsShutdownServers(); + redirectShutdownServers(); + /* shutting_down will be set to + * +1 for SIGTERM + * -1 for SIGINT */ + if (shutting_down > 0) + setSocketShutdownLifetimes(Config.shutdownLifetime); + else + setSocketShutdownLifetimes(1); + } +#if USE_ASYNC_IO + aioCheckCallbacks(); +#endif + if (commCheckICPIncoming) + comm_poll_icp_incoming(); + if (commCheckHTTPIncoming) + comm_poll_http_incoming(); + callicp = callhttp = 0; + nfds = 0; + maxfd = Biggest_FD + 1; + for (i = 0; i < maxfd; i++) { + int events; + events = 0; + /* Check each open socket for a handler. */ + if (fd_table[i].read_handler && !commDeferRead(i)) + events |= POLLRDNORM; + if (fd_table[i].write_handler) + events |= POLLWRNORM; + if (events) { + pfds[nfds].fd = i; + pfds[nfds].events = events; + pfds[nfds].revents = 0; + nfds++; + } + } + if (shutting_down) + debug(5, 2) ("comm_poll: Still waiting on %d FDs\n", nfds); + if (nfds == 0) + return COMM_SHUTDOWN; + if (msec > MAX_POLL_TIME) + msec = MAX_POLL_TIME; + for (;;) { + num = poll(pfds, nfds, msec); + Counter.select_loops++; + if (num >= 0) + break; + if (ignoreErrno(errno)) + continue; + debug(5, 0) ("comm_poll: poll failure: %s\n", xstrerror()); + assert(errno != EINVAL); + return COMM_ERROR; + /* NOTREACHED */ + } + debug(5, num ? 5 : 8) ("comm_poll: %d sockets ready\n", num); + /* Check timeout handlers ONCE each second. */ + if (squid_curtime > last_timeout) { + last_timeout = squid_curtime; + checkTimeouts(); + } + if (num == 0) + continue; + /* scan each socket but the accept socket. Poll this + * more frequently to minimize losses due to the 5 connect + * limit in SunOS */ + for (i = 0; i < nfds; i++) { + int revents; + if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1)) + continue; + if (fdIsIcp(fd)) { + callicp = 1; + continue; + } + if (fdIsHttp(fd)) { + callhttp = 1; + continue; + } + if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) { + debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd); + if ((hdl = fd_table[fd].read_handler)) { + fd_table[fd].read_handler = NULL; + hdl(fd, fd_table[fd].read_data); + } + if (commCheckICPIncoming) + comm_poll_icp_incoming(); + if (commCheckHTTPIncoming) + comm_poll_http_incoming(); + } + if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) { + debug(5, 5) ("comm_poll: FD %d ready for writing\n", fd); + if ((hdl = fd_table[fd].write_handler)) { + fd_table[fd].write_handler = NULL; + hdl(fd, fd_table[fd].write_data); + } + if (commCheckICPIncoming) + comm_poll_icp_incoming(); + if (commCheckHTTPIncoming) + comm_poll_http_incoming(); + } + if (revents & POLLNVAL) { + close_handler *ch; + fde *F = &fd_table[fd]; + debug(5, 0) ("WARNING: FD %d has handlers, but it's invalid.\n", fd); + debug(5, 0) ("FD %d is a %s\n", fd, fdTypeStr[fd_table[fd].type]); + debug(5, 0) ("--> %s\n", fd_table[fd].desc); + debug(5, 0) ("tmout:%p read:%p write:%p\n", + F->timeout_handler, + F->read_handler, + F->write_handler); + for (ch = F->close_handler; ch; ch = ch->next) + debug(5, 0) (" close handler: %p\n", ch->handler); + if (F->close_handler) { + commCallCloseHandlers(fd); + } else if (F->timeout_handler) { + debug(5, 0) ("comm_poll: Calling Timeout Handler\n"); + F->timeout_handler(fd, F->timeout_data); + } + F->close_handler = NULL; + F->timeout_handler = NULL; + F->read_handler = NULL; + F->write_handler = NULL; + if (F->open != 0) + fd_close(fd); + } + } + if (callicp) + comm_poll_icp_incoming(); + if (callhttp) + comm_poll_http_incoming(); + return COMM_OK; + } while (timeout > current_dtime); + debug(5, 8) ("comm_poll: time out: %d.\n", squid_curtime); + return COMM_TIMEOUT; +} + +#else + +static int +comm_check_incoming_select_handlers(int nfds, int *fds) +{ + int i; + int fd; + int incame = 0; + int maxfd = 0; + PF *hdl = NULL; + fd_set read_mask; + fd_set write_mask; + FD_ZERO(&read_mask); + FD_ZERO(&write_mask); + for (i = 0; i < nfds; i++) { + fd = fds[i]; + if (fd_table[fd].read_handler) { + FD_SET(fd, &read_mask); + if (fd > maxfd) + maxfd = fd; + } + if (fd_table[fd].write_handler) { + FD_SET(fd, &write_mask); + if (fd > maxfd) + maxfd = fd; + } + } + if (maxfd++ == 0) + return incame; +#if !ALARM_UPDATES_TIME + getCurrentTime(); +#endif + if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1) + return incame; + for (i = 0; i < nfds; i++) { + fd = fds[i]; + if (FD_ISSET(fd, &read_mask)) { + if ((hdl = fd_table[fd].read_handler) != NULL) { + fd_table[fd].read_handler = NULL; + hdl(fd, &incame); + } else { + debug(5, 1) ("comm_select_incoming: NULL read handler\n"); + } + } + if (FD_ISSET(fd, &write_mask)) { + if ((hdl = fd_table[fd].write_handler) != NULL) { + fd_table[fd].write_handler = NULL; + hdl(fd, &incame); + } else { + debug(5, 1) ("comm_select_incoming: NULL write handler\n"); + } + } + } + return incame; +} + +static void +comm_select_icp_incoming(void) +{ + int nfds = 0; + int fds[2]; + int nevents; + icp_io_events = 0; + if (theInIcpConnection >= 0) + fds[nfds++] = theInIcpConnection; + if (theInIcpConnection != theOutIcpConnection) + if (theOutIcpConnection >= 0) + fds[nfds++] = theOutIcpConnection; + if (nfds == 0) + return; + nevents = comm_check_incoming_select_handlers(nfds, fds); + incoming_icp_interval = incoming_icp_interval + 1 - nevents; + if (incoming_icp_interval < 0) + incoming_icp_interval = 0; + if (incoming_icp_interval > MAX_INCOMING_INTERVAL) + incoming_icp_interval = MAX_INCOMING_INTERVAL; + if (nevents > INCOMING_ICP_MAX) + nevents = INCOMING_ICP_MAX; + statHistCount(&Counter.comm_icp_incoming, nevents); +} + +static void +comm_select_http_incoming(void) +{ + int nfds = 0; + int fds[MAXHTTPPORTS]; + int j; + int nevents; + http_io_events = 0; + for (j = 0; j < NHttpSockets; j++) { + if (HttpSockets[j] < 0) + continue; + if (commDeferRead(HttpSockets[j])) + continue; + fds[nfds++] = HttpSockets[j]; + } + nevents = comm_check_incoming_select_handlers(nfds, fds); + incoming_http_interval = incoming_http_interval + 1 - nevents; + if (incoming_http_interval < 0) + incoming_http_interval = 0; + if (incoming_http_interval > MAX_INCOMING_INTERVAL) + incoming_http_interval = MAX_INCOMING_INTERVAL; + if (nevents > INCOMING_HTTP_MAX) + nevents = INCOMING_HTTP_MAX; + statHistCount(&Counter.comm_http_incoming, nevents); +} + +/* Select on all sockets; call handlers for those that are ready. */ +int +comm_select(int msec) +{ + fd_set readfds; + fd_set writefds; + PF *hdl = NULL; + int fd; + int i; + int maxfd; + int nfds; + int num; + int callicp = 0, callhttp = 0; + static time_t last_timeout = 0; + struct timeval poll_time; + double timeout = current_dtime + (msec / 1000.0); + do { +#if !ALARM_UPDATES_TIME + getCurrentTime(); +#endif +#if USE_ASYNC_IO + aioCheckCallbacks(); +#endif + FD_ZERO(&readfds); + FD_ZERO(&writefds); + if (shutting_down) { + serverConnectionsClose(); + dnsShutdownServers(); + redirectShutdownServers(); + /* shutting_down will be set to + * +1 for SIGTERM + * -1 for SIGINT */ + if (shutting_down > 0) + setSocketShutdownLifetimes(Config.shutdownLifetime); + else + setSocketShutdownLifetimes(1); + } + if (commCheckICPIncoming) + comm_select_icp_incoming(); + if (commCheckHTTPIncoming) + comm_select_http_incoming(); + callicp = callhttp = 0; + nfds = 0; + maxfd = Biggest_FD + 1; + for (i = 0; i < maxfd; i++) { + /* Check each open socket for a handler. */ + if (fd_table[i].read_handler && !commDeferRead(i)) { + nfds++; + FD_SET(i, &readfds); + } + if (fd_table[i].write_handler) { + nfds++; + FD_SET(i, &writefds); + } + } + if (shutting_down) + debug(5, 2) ("comm_select: Still waiting on %d FDs\n", nfds); + if (nfds == 0) + return COMM_SHUTDOWN; + if (msec > MAX_POLL_TIME) + msec = MAX_POLL_TIME; + for (;;) { + poll_time.tv_sec = msec / 1000; + poll_time.tv_usec = (msec % 1000) * 1000; + num = select(maxfd, &readfds, &writefds, NULL, &poll_time); + Counter.select_loops++; + if (num >= 0) + break; + if (ignoreErrno(errno)) + break; + debug(50, 0) ("comm_select: select failure: %s\n", + xstrerror()); + examine_select(&readfds, &writefds); + return COMM_ERROR; + /* NOTREACHED */ + } + if (num < 0) + continue; + debug(5, num ? 5 : 8) ("comm_select: %d sockets ready at %d\n", + num, (int) squid_curtime); + /* Check lifetime and timeout handlers ONCE each second. + * Replaces brain-dead check every time through the loop! */ + if (squid_curtime > last_timeout) { + last_timeout = squid_curtime; + checkTimeouts(); + } + if (num == 0) + continue; + /* scan each socket but the accept socket. Poll this + * more frequently to minimize losses due to the 5 connect + * limit in SunOS */ + for (fd = 0; fd < maxfd; fd++) { + if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds)) + continue; + if (fdIsIcp(fd)) { + callicp = 1; + continue; + } + if (fdIsHttp(fd)) { + callhttp = 1; + continue; + } + if (FD_ISSET(fd, &readfds)) { + debug(5, 6) ("comm_select: FD %d ready for reading\n", fd); + if (fd_table[fd].read_handler) { + hdl = fd_table[fd].read_handler; + fd_table[fd].read_handler = NULL; + hdl(fd, fd_table[fd].read_data); + } + if (commCheckICPIncoming) + comm_select_icp_incoming(); + if (commCheckHTTPIncoming) + comm_select_http_incoming(); + } + if (FD_ISSET(fd, &writefds)) { + debug(5, 5) ("comm_select: FD %d ready for writing\n", fd); + if (fd_table[fd].write_handler) { + hdl = fd_table[fd].write_handler; + fd_table[fd].write_handler = NULL; + hdl(fd, fd_table[fd].write_data); + } + if (commCheckICPIncoming) + comm_select_icp_incoming(); + if (commCheckHTTPIncoming) + comm_select_http_incoming(); + } + } + if (callicp) + comm_select_icp_incoming(); + if (callhttp) + comm_select_http_incoming(); + return COMM_OK; + } while (timeout > current_dtime); + debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime); + return COMM_TIMEOUT; +} +#endif + +void +comm_select_init(void) +{ + zero_tv.tv_sec = 0; + zero_tv.tv_usec = 0; + cachemgrRegister("comm_incoming", + "comm_incoming() stats", + commIncomingStats, 0, 1); +} + +#if !HAVE_POLL +/* + * examine_select - debug routine. + * + * I spend the day chasing this core dump that occurs when both the client + * and the server side of a cache fetch simultaneoulsy abort the + * connection. While I haven't really studied the code to figure out how + * it happens, the snippet below may prevent the cache from exitting: + * + * Call this from where the select loop fails. + */ +static int +examine_select(fd_set * readfds, fd_set * writefds) +{ + int fd = 0; + fd_set read_x; + fd_set write_x; + int num; + struct timeval tv; + close_handler *ch = NULL; + fde *F = NULL; + debug(5, 0) ("examine_select: Examining open file descriptors...\n"); + for (fd = 0; fd < Squid_MaxFD; fd++) { + FD_ZERO(&read_x); + FD_ZERO(&write_x); + tv.tv_sec = tv.tv_usec = 0; + if (FD_ISSET(fd, readfds)) + FD_SET(fd, &read_x); + else if (FD_ISSET(fd, writefds)) + FD_SET(fd, &write_x); + else + continue; + num = select(Squid_MaxFD, &read_x, &write_x, NULL, &tv); + if (num > -1) { + debug(5, 5) ("FD %d is valid.\n", fd); + continue; + } + F = &fd_table[fd]; + debug(5, 0) ("FD %d: %s\n", fd, xstrerror()); + debug(5, 0) ("WARNING: FD %d has handlers, but it's invalid.\n", fd); + debug(5, 0) ("FD %d is a %s called '%s'\n", + fd, + fdTypeStr[fd_table[fd].type], + F->desc); + debug(5, 0) ("tmout:%p read:%p write:%p\n", + F->timeout_handler, + F->read_handler, + F->write_handler); + for (ch = F->close_handler; ch; ch = ch->next) + debug(5, 0) (" close handler: %p\n", ch->handler); + if (F->close_handler) { + commCallCloseHandlers(fd); + } else if (F->timeout_handler) { + debug(5, 0) ("examine_select: Calling Timeout Handler\n"); + F->timeout_handler(fd, F->timeout_data); + } + F->close_handler = NULL; + F->timeout_handler = NULL; + F->read_handler = NULL; + F->write_handler = NULL; + FD_CLR(fd, readfds); + FD_CLR(fd, writefds); + } + return 0; +} +#endif + +static void +checkTimeouts(void) +{ + int fd; + fde *F = NULL; + PF *callback; + for (fd = 0; fd <= Biggest_FD; fd++) { + F = &fd_table[fd]; + if (F->open != FD_OPEN) + continue; + if (F->timeout == 0) + continue; + if (F->timeout > squid_curtime) + continue; + debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd); + if (F->timeout_handler) { + debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd); + callback = F->timeout_handler; + F->timeout_handler = NULL; + callback(fd, F->timeout_data); + } else { + debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd); + comm_close(fd); + } + } +} + +static void +commIncomingStats(StoreEntry * sentry) +{ + StatCounters *f = &Counter; + storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n", + incoming_icp_interval >> INCOMING_FACTOR); + storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n", + incoming_http_interval >> INCOMING_FACTOR); + storeAppendPrintf(sentry, "\n"); + storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n"); +#ifdef HAVE_POLL + storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_icp_incoming() call:\n"); +#else + storeAppendPrintf(sentry, "ICP Messages handled per comm_select_icp_incoming() call:\n"); +#endif + statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper); +#ifdef HAVE_POLL + storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_http_incoming() call:\n"); +#else + storeAppendPrintf(sentry, "HTTP Messages handled per comm_select_http_incoming() call:\n"); +#endif + statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper); +} diff --git a/src/main.cc b/src/main.cc index c46092e7e6..a33e2704fc 100644 --- a/src/main.cc +++ b/src/main.cc @@ -1,6 +1,6 @@ /* - * $Id: main.cc,v 1.256 1998/07/21 03:16:05 wessels Exp $ + * $Id: main.cc,v 1.257 1998/07/21 17:04:55 wessels Exp $ * * DEBUG: section 1 Startup and Main Loop * AUTHOR: Harvest Derived @@ -599,6 +599,7 @@ main(int argc, char **argv) /*init comm module */ comm_init(); + comm_select_init(); /* we have to init fdstat here. */ fd_open(0, FD_LOG, "stdin"); diff --git a/src/protos.h b/src/protos.h index e31dc8a81b..edd1fb8dc5 100644 --- a/src/protos.h +++ b/src/protos.h @@ -104,11 +104,7 @@ extern void comm_init(void); extern int comm_listen(int sock); extern int comm_open(int, int, struct in_addr, u_short port, int, const char *note); extern u_short comm_local_port(int fd); -#if HAVE_POLL -extern int comm_poll(int); -#else -extern int comm_select(int); -#endif + extern void commSetSelect(int, unsigned int, PF *, void *, time_t); extern void comm_add_close_handler(int fd, PF *, void *); extern void comm_remove_close_handler(int fd, PF *, void *); @@ -125,6 +121,16 @@ extern int commSetTimeout(int fd, int, PF *, void *); extern void commSetDefer(int fd, DEFER * func, void *); extern int ignoreErrno(int); +/* + * comm_select.c + */ +extern void comm_select_init(void); +#if HAVE_POLL +extern int comm_poll(int); +#else +extern int comm_select(int); +#endif + extern void packerToStoreInit(Packer * p, StoreEntry * e); extern void packerToMemInit(Packer * p, MemBuf * mb); extern void packerClean(Packer * p);