]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/ModPoll.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / comm / ModPoll.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 05 Socket Functions
5 *
6 * SQUID Web Proxy Cache http://www.squid-cache.org/
7 * ----------------------------------------------------------
8 *
9 * Squid is the result of efforts by numerous individuals from
10 * the Internet community; see the CONTRIBUTORS file for full
11 * details. Many organizations have provided support for Squid's
12 * development; see the SPONSORS file for full details. Squid is
13 * Copyrighted (C) 2001 by the Regents of the University of
14 * California; see the COPYRIGHT file for full details. Squid
15 * incorporates software developed and/or copyrighted by other
16 * sources; see the CREDITS file for full details.
17 *
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program; if not, write to the Free Software
30 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
31 *
32 */
33 #include "squid.h"
34
35 #if USE_POLL
36 #include "anyp/PortCfg.h"
37 #include "comm/Connection.h"
38 #include "comm/Loops.h"
39 #include "fde.h"
40 #include "globals.h"
41 #include "ICP.h"
42 #include "mgr/Registration.h"
43 #include "profiler/Profiler.h"
44 #include "protos.h"
45 #include "SquidTime.h"
46 #include "StatCounters.h"
47 #include "Store.h"
48
49 #if HAVE_POLL_H
50 #include <poll.h>
51 #endif
52 #if HAVE_ERRNO_H
53 #include <errno.h>
54 #endif
55
56 /* Needed for poll() on Linux at least */
57 #if USE_POLL
58 #ifndef POLLRDNORM
59 #define POLLRDNORM POLLIN
60 #endif
61 #ifndef POLLWRNORM
62 #define POLLWRNORM POLLOUT
63 #endif
64 #endif
65
66 static int MAX_POLL_TIME = 1000; /* see also Comm::QuickPollRequired() */
67
68 #ifndef howmany
69 #define howmany(x, y) (((x)+((y)-1))/(y))
70 #endif
71 #ifndef NBBY
72 #define NBBY 8
73 #endif
74 #define FD_MASK_BYTES sizeof(fd_mask)
75 #define FD_MASK_BITS (FD_MASK_BYTES*NBBY)
76
77 /* STATIC */
78 static int fdIsTcpListen(int fd);
79 static int fdIsUdpListen(int fd);
80 static int fdIsDns(int fd);
81 static OBJH commIncomingStats;
82 static int comm_check_incoming_poll_handlers(int nfds, int *fds);
83 static void comm_poll_dns_incoming(void);
84
85 /*
86 * Automatic tuning for incoming requests:
87 *
88 * INCOMING sockets are the ICP and HTTP ports. We need to check these
89 * fairly regularly, but how often? When the load increases, we
90 * want to check the incoming sockets more often. If we have a lot
91 * of incoming ICP, then we need to check these sockets more than
92 * if we just have HTTP.
93 *
94 * The variables 'incoming_icp_interval' and 'incoming_http_interval'
95 * determine how many normal I/O events to process before checking
96 * incoming sockets again. Note we store the incoming_interval
97 * multipled by a factor of (2^INCOMING_FACTOR) to have some
98 * pseudo-floating point precision.
99 *
100 * The variable 'udp_io_events' and 'tcp_io_events' counts how many normal
101 * I/O events have been processed since the last check on the incoming
102 * sockets. When io_events > incoming_interval, its time to check incoming
103 * sockets.
104 *
105 * Every time we check incoming sockets, we count how many new messages
106 * or connections were processed. This is used to adjust the
107 * incoming_interval for the next iteration. The new incoming_interval
108 * is calculated as the current incoming_interval plus what we would
109 * like to see as an average number of events minus the number of
110 * events just processed.
111 *
112 * incoming_interval = incoming_interval + target_average - number_of_events_processed
113 *
114 * There are separate incoming_interval counters for TCP-based, UDP-based, and DNS events
115 *
116 * You can see the current values of the incoming_interval's, as well as
117 * a histogram of 'incoming_events' by asking the cache manager
118 * for 'comm_incoming', e.g.:
119 *
120 * % ./client mgr:comm_poll_incoming
121 *
122 * Caveats:
123 *
124 * - We have MAX_INCOMING_INTEGER as a magic upper limit on
125 * incoming_interval for both types of sockets. At the
126 * largest value the cache will effectively be idling.
127 *
128 * - The higher the INCOMING_FACTOR, the slower the algorithm will
129 * respond to load spikes/increases/decreases in demand. A value
130 * between 3 and 8 is recommended.
131 */
132
133 #define MAX_INCOMING_INTEGER 256
134 #define INCOMING_FACTOR 5
135 #define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
136 static int udp_io_events = 0; ///< I/O events passed since last UDP receiver socket poll
137 static int dns_io_events = 0; ///< I/O events passed since last DNS socket poll
138 static int tcp_io_events = 0; ///< I/O events passed since last TCP listening socket poll
139 static int incoming_udp_interval = 16 << INCOMING_FACTOR;
140 static int incoming_dns_interval = 16 << INCOMING_FACTOR;
141 static int incoming_tcp_interval = 16 << INCOMING_FACTOR;
142 #define commCheckUdpIncoming (++udp_io_events > (incoming_udp_interval>> INCOMING_FACTOR))
143 #define commCheckDnsIncoming (++dns_io_events > (incoming_dns_interval>> INCOMING_FACTOR))
144 #define commCheckTcpIncoming (++tcp_io_events > (incoming_tcp_interval>> INCOMING_FACTOR))
145
146 void
147 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
148 {
149 fde *F = &fd_table[fd];
150 assert(fd >= 0);
151 assert(F->flags.open);
152 debugs(5, 5, HERE << "FD " << fd << ", type=" << type <<
153 ", handler=" << handler << ", client_data=" << client_data <<
154 ", timeout=" << timeout);
155
156 if (type & COMM_SELECT_READ) {
157 F->read_handler = handler;
158 F->read_data = client_data;
159 }
160
161 if (type & COMM_SELECT_WRITE) {
162 F->write_handler = handler;
163 F->write_data = client_data;
164 }
165
166 if (timeout)
167 F->timeout = squid_curtime + timeout;
168 }
169
170 void
171 Comm::ResetSelect(int fd)
172 {
173 }
174
175 static int
176 fdIsUdpListen(int fd)
177 {
178 if (icpIncomingConn != NULL && icpIncomingConn->fd == fd)
179 return 1;
180
181 if (icpOutgoingConn != NULL && icpOutgoingConn->fd == fd)
182 return 1;
183
184 return 0;
185 }
186
187 static int
188 fdIsDns(int fd)
189 {
190 if (fd == DnsSocketA)
191 return 1;
192
193 if (fd == DnsSocketB)
194 return 1;
195
196 return 0;
197 }
198
199 static int
200 fdIsTcpListen(int fd)
201 {
202 for (const AnyP::PortCfg *s = Config.Sockaddr.http; s; s = s->next) {
203 if (s->listenConn != NULL && s->listenConn->fd == fd)
204 return 1;
205 }
206
207 return 0;
208 }
209
210 static int
211 comm_check_incoming_poll_handlers(int nfds, int *fds)
212 {
213 int i;
214 int fd;
215 PF *hdl = NULL;
216 int npfds;
217
218 struct pollfd pfds[3 + MAXTCPLISTENPORTS];
219 PROF_start(comm_check_incoming);
220 incoming_sockets_accepted = 0;
221
222 for (i = npfds = 0; i < nfds; ++i) {
223 int events;
224 fd = fds[i];
225 events = 0;
226
227 if (fd_table[fd].read_handler)
228 events |= POLLRDNORM;
229
230 if (fd_table[fd].write_handler)
231 events |= POLLWRNORM;
232
233 if (events) {
234 pfds[npfds].fd = fd;
235 pfds[npfds].events = events;
236 pfds[npfds].revents = 0;
237 ++npfds;
238 }
239 }
240
241 if (!nfds) {
242 PROF_stop(comm_check_incoming);
243 return -1;
244 }
245
246 getCurrentTime();
247 ++ statCounter.syscalls.selects;
248
249 if (poll(pfds, npfds, 0) < 1) {
250 PROF_stop(comm_check_incoming);
251 return incoming_sockets_accepted;
252 }
253
254 for (i = 0; i < npfds; ++i) {
255 int revents;
256
257 if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
258 continue;
259
260 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
261 if ((hdl = fd_table[fd].read_handler)) {
262 fd_table[fd].read_handler = NULL;
263 hdl(fd, fd_table[fd].read_data);
264 } else if (pfds[i].events & POLLRDNORM)
265 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL read handler");
266 }
267
268 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
269 if ((hdl = fd_table[fd].write_handler)) {
270 fd_table[fd].write_handler = NULL;
271 hdl(fd, fd_table[fd].write_data);
272 } else if (pfds[i].events & POLLWRNORM)
273 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL write_handler");
274 }
275 }
276
277 PROF_stop(comm_check_incoming);
278 return incoming_sockets_accepted;
279 }
280
281 static void
282 comm_poll_udp_incoming(void)
283 {
284 int nfds = 0;
285 int fds[2];
286 int nevents;
287 udp_io_events = 0;
288
289 if (Comm::IsConnOpen(icpIncomingConn)) {
290 fds[nfds] = icpIncomingConn->fd;
291 ++nfds;
292 }
293
294 if (icpIncomingConn != icpOutgoingConn && Comm::IsConnOpen(icpOutgoingConn)) {
295 fds[nfds] = icpOutgoingConn->fd;
296 ++nfds;
297 }
298
299 if (nfds == 0)
300 return;
301
302 nevents = comm_check_incoming_poll_handlers(nfds, fds);
303
304 incoming_udp_interval += Config.comm_incoming.udp.average - nevents;
305
306 if (incoming_udp_interval < Config.comm_incoming.udp.min_poll)
307 incoming_udp_interval = Config.comm_incoming.udp.min_poll;
308
309 if (incoming_udp_interval > MAX_INCOMING_INTERVAL)
310 incoming_udp_interval = MAX_INCOMING_INTERVAL;
311
312 if (nevents > INCOMING_UDP_MAX)
313 nevents = INCOMING_UDP_MAX;
314
315 statCounter.comm_udp_incoming.count(nevents);
316 }
317
318 static void
319 comm_poll_tcp_incoming(void)
320 {
321 int nfds = 0;
322 int fds[MAXTCPLISTENPORTS];
323 int j;
324 int nevents;
325 tcp_io_events = 0;
326
327 // XXX: only poll sockets that won't be deferred. But how do we identify them?
328
329 for (j = 0; j < NHttpSockets; ++j) {
330 if (HttpSockets[j] < 0)
331 continue;
332
333 fds[nfds] = HttpSockets[j];
334 ++nfds;
335 }
336
337 nevents = comm_check_incoming_poll_handlers(nfds, fds);
338 incoming_tcp_interval = incoming_tcp_interval
339 + Config.comm_incoming.tcp.average - nevents;
340
341 if (incoming_tcp_interval < Config.comm_incoming.tcp.min_poll)
342 incoming_tcp_interval = Config.comm_incoming.tcp.min_poll;
343
344 if (incoming_tcp_interval > MAX_INCOMING_INTERVAL)
345 incoming_tcp_interval = MAX_INCOMING_INTERVAL;
346
347 if (nevents > INCOMING_TCP_MAX)
348 nevents = INCOMING_TCP_MAX;
349
350 statCounter.comm_tcp_incoming.count(nevents);
351 }
352
353 /* poll all sockets; call handlers for those that are ready. */
354 comm_err_t
355 Comm::DoSelect(int msec)
356 {
357 struct pollfd pfds[SQUID_MAXFD];
358
359 PF *hdl = NULL;
360 int fd;
361 int maxfd;
362 unsigned long nfds;
363 unsigned long npending;
364 int num;
365 int calldns = 0, calludp = 0, calltcp = 0;
366 double timeout = current_dtime + (msec / 1000.0);
367
368 do {
369 double start;
370 getCurrentTime();
371 start = current_dtime;
372
373 if (commCheckUdpIncoming)
374 comm_poll_udp_incoming();
375
376 if (commCheckDnsIncoming)
377 comm_poll_dns_incoming();
378
379 if (commCheckTcpIncoming)
380 comm_poll_tcp_incoming();
381
382 PROF_start(comm_poll_prep_pfds);
383
384 calldns = calludp = calltcp = 0;
385
386 nfds = 0;
387
388 npending = 0;
389
390 maxfd = Biggest_FD + 1;
391
392 for (int i = 0; i < maxfd; ++i) {
393 int events;
394 events = 0;
395 /* Check each open socket for a handler. */
396
397 if (fd_table[i].read_handler)
398 events |= POLLRDNORM;
399
400 if (fd_table[i].write_handler)
401 events |= POLLWRNORM;
402
403 if (events) {
404 pfds[nfds].fd = i;
405 pfds[nfds].events = events;
406 pfds[nfds].revents = 0;
407 ++nfds;
408
409 if ((events & POLLRDNORM) && fd_table[i].flags.read_pending)
410 ++npending;
411 }
412 }
413
414 PROF_stop(comm_poll_prep_pfds);
415
416 if (npending)
417 msec = 0;
418
419 if (msec > MAX_POLL_TIME)
420 msec = MAX_POLL_TIME;
421
422 /* nothing to do
423 *
424 * Note that this will only ever trigger when there are no log files
425 * and stdout/err/in are all closed too.
426 */
427 if (nfds == 0 && npending == 0) {
428 if (shutting_down)
429 return COMM_SHUTDOWN;
430 else
431 return COMM_IDLE;
432 }
433
434 for (;;) {
435 PROF_start(comm_poll_normal);
436 ++ statCounter.syscalls.selects;
437 num = poll(pfds, nfds, msec);
438 ++ statCounter.select_loops;
439 PROF_stop(comm_poll_normal);
440
441 if (num >= 0 || npending > 0)
442 break;
443
444 if (ignoreErrno(errno))
445 continue;
446
447 debugs(5, DBG_CRITICAL, "comm_poll: poll failure: " << xstrerror());
448
449 assert(errno != EINVAL);
450
451 return COMM_ERROR;
452
453 /* NOTREACHED */
454 }
455
456 getCurrentTime();
457
458 debugs(5, num ? 5 : 8, "comm_poll: " << num << "+" << npending << " FDs ready");
459 statCounter.select_fds_hist.count(num);
460
461 if (num == 0 && npending == 0)
462 continue;
463
464 /* scan each socket but the accept socket. Poll this
465 * more frequently to minimize losses due to the 5 connect
466 * limit in SunOS */
467 PROF_start(comm_handle_ready_fd);
468
469 for (size_t loopIndex = 0; loopIndex < nfds; ++loopIndex) {
470 fde *F;
471 int revents = pfds[loopIndex].revents;
472 fd = pfds[loopIndex].fd;
473
474 if (fd == -1)
475 continue;
476
477 if (fd_table[fd].flags.read_pending)
478 revents |= POLLIN;
479
480 if (revents == 0)
481 continue;
482
483 if (fdIsUdpListen(fd)) {
484 calludp = 1;
485 continue;
486 }
487
488 if (fdIsDns(fd)) {
489 calldns = 1;
490 continue;
491 }
492
493 if (fdIsTcpListen(fd)) {
494 calltcp = 1;
495 continue;
496 }
497
498 F = &fd_table[fd];
499
500 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
501 debugs(5, 6, "comm_poll: FD " << fd << " ready for reading");
502
503 if ((hdl = F->read_handler)) {
504 PROF_start(comm_read_handler);
505 F->read_handler = NULL;
506 F->flags.read_pending = 0;
507 hdl(fd, F->read_data);
508 PROF_stop(comm_read_handler);
509 ++ statCounter.select_fds;
510
511 if (commCheckUdpIncoming)
512 comm_poll_udp_incoming();
513
514 if (commCheckDnsIncoming)
515 comm_poll_dns_incoming();
516
517 if (commCheckTcpIncoming)
518 comm_poll_tcp_incoming();
519 }
520 }
521
522 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
523 debugs(5, 6, "comm_poll: FD " << fd << " ready for writing");
524
525 if ((hdl = F->write_handler)) {
526 PROF_start(comm_write_handler);
527 F->write_handler = NULL;
528 hdl(fd, F->write_data);
529 PROF_stop(comm_write_handler);
530 ++ statCounter.select_fds;
531
532 if (commCheckUdpIncoming)
533 comm_poll_udp_incoming();
534
535 if (commCheckDnsIncoming)
536 comm_poll_dns_incoming();
537
538 if (commCheckTcpIncoming)
539 comm_poll_tcp_incoming();
540 }
541 }
542
543 if (revents & POLLNVAL) {
544 AsyncCall::Pointer ch;
545 debugs(5, DBG_CRITICAL, "WARNING: FD " << fd << " has handlers, but it's invalid.");
546 debugs(5, DBG_CRITICAL, "FD " << fd << " is a " << fdTypeStr[F->type]);
547 debugs(5, DBG_CRITICAL, "--> " << F->desc);
548 debugs(5, DBG_CRITICAL, "tmout:" << F->timeoutHandler << "read:" <<
549 F->read_handler << " write:" << F->write_handler);
550
551 for (ch = F->closeHandler; ch != NULL; ch = ch->Next())
552 debugs(5, DBG_CRITICAL, " close handler: " << ch);
553
554 if (F->closeHandler != NULL) {
555 commCallCloseHandlers(fd);
556 } else if (F->timeoutHandler != NULL) {
557 debugs(5, DBG_CRITICAL, "comm_poll: Calling Timeout Handler");
558 ScheduleCallHere(F->timeoutHandler);
559 }
560
561 F->closeHandler = NULL;
562 F->timeoutHandler = NULL;
563 F->read_handler = NULL;
564 F->write_handler = NULL;
565
566 if (F->flags.open)
567 fd_close(fd);
568 }
569 }
570
571 PROF_stop(comm_handle_ready_fd);
572
573 if (calludp)
574 comm_poll_udp_incoming();
575
576 if (calldns)
577 comm_poll_dns_incoming();
578
579 if (calltcp)
580 comm_poll_tcp_incoming();
581
582 getCurrentTime();
583
584 statCounter.select_time += (current_dtime - start);
585
586 return COMM_OK;
587 } while (timeout > current_dtime);
588
589 debugs(5, 8, "comm_poll: time out: " << squid_curtime << ".");
590
591 return COMM_TIMEOUT;
592 }
593
594 static void
595 comm_poll_dns_incoming(void)
596 {
597 int nfds = 0;
598 int fds[2];
599 int nevents;
600 dns_io_events = 0;
601
602 if (DnsSocketA < 0 && DnsSocketB < 0)
603 return;
604
605 if (DnsSocketA >= 0) {
606 fds[nfds] = DnsSocketA;
607 ++nfds;
608 }
609
610 if (DnsSocketB >= 0) {
611 fds[nfds] = DnsSocketB;
612 ++nfds;
613 }
614
615 nevents = comm_check_incoming_poll_handlers(nfds, fds);
616
617 if (nevents < 0)
618 return;
619
620 incoming_dns_interval += Config.comm_incoming.dns.average - nevents;
621
622 if (incoming_dns_interval < Config.comm_incoming.dns.min_poll)
623 incoming_dns_interval = Config.comm_incoming.dns.min_poll;
624
625 if (incoming_dns_interval > MAX_INCOMING_INTERVAL)
626 incoming_dns_interval = MAX_INCOMING_INTERVAL;
627
628 if (nevents > INCOMING_DNS_MAX)
629 nevents = INCOMING_DNS_MAX;
630
631 statCounter.comm_dns_incoming.count(nevents);
632 }
633
634 static void
635 commPollRegisterWithCacheManager(void)
636 {
637 Mgr::RegisterAction("comm_poll_incoming",
638 "comm_incoming() stats",
639 commIncomingStats, 0, 1);
640 }
641
642 void
643 Comm::SelectLoopInit(void)
644 {
645 commPollRegisterWithCacheManager();
646 }
647
648 static void
649 commIncomingStats(StoreEntry * sentry)
650 {
651 storeAppendPrintf(sentry, "Current incoming_udp_interval: %d\n",
652 incoming_udp_interval >> INCOMING_FACTOR);
653 storeAppendPrintf(sentry, "Current incoming_dns_interval: %d\n",
654 incoming_dns_interval >> INCOMING_FACTOR);
655 storeAppendPrintf(sentry, "Current incoming_tcp_interval: %d\n",
656 incoming_tcp_interval >> INCOMING_FACTOR);
657 storeAppendPrintf(sentry, "\n");
658 storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");
659 storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_udp_incoming() call:\n");
660 statCounter.comm_udp_incoming.dump(sentry, statHistIntDumper);
661 storeAppendPrintf(sentry, "DNS Messages handled per comm_poll_dns_incoming() call:\n");
662 statCounter.comm_dns_incoming.dump(sentry, statHistIntDumper);
663 storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_tcp_incoming() call:\n");
664 statCounter.comm_tcp_incoming.dump(sentry, statHistIntDumper);
665 }
666
667 /* Called by async-io or diskd to speed up the polling */
668 void
669 Comm::QuickPollRequired(void)
670 {
671 MAX_POLL_TIME = 10;
672 }
673
674 #endif /* USE_POLL */