]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/ModPoll.cc
Upgrade comm layer Connection handling
[thirdparty/squid.git] / src / comm / ModPoll.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 05 Socket Functions
5 *
6 * SQUID Web Proxy Cache http://www.squid-cache.org/
7 * ----------------------------------------------------------
8 *
9 * Squid is the result of efforts by numerous individuals from
10 * the Internet community; see the CONTRIBUTORS file for full
11 * details. Many organizations have provided support for Squid's
12 * development; see the SPONSORS file for full details. Squid is
13 * Copyrighted (C) 2001 by the Regents of the University of
14 * California; see the COPYRIGHT file for full details. Squid
15 * incorporates software developed and/or copyrighted by other
16 * sources; see the CREDITS file for full details.
17 *
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program; if not, write to the Free Software
30 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
31 *
32 */
33 #include "config.h"
34
35 #if USE_POLL
36
37 #include "squid.h"
38 #include "comm/Connection.h"
39 #include "comm/Loops.h"
40 #include "fde.h"
41 #include "ICP.h"
42 #include "mgr/Registration.h"
43 #include "SquidTime.h"
44 #include "Store.h"
45
46 #if HAVE_POLL_H
47 #include <poll.h>
48 #endif
49
50 /* Needed for poll() on Linux at least */
51 #if USE_POLL
52 #ifndef POLLRDNORM
53 #define POLLRDNORM POLLIN
54 #endif
55 #ifndef POLLWRNORM
56 #define POLLWRNORM POLLOUT
57 #endif
58 #endif
59
60 static int MAX_POLL_TIME = 1000; /* see also Comm::QuickPollRequired() */
61
62 #ifndef howmany
63 #define howmany(x, y) (((x)+((y)-1))/(y))
64 #endif
65 #ifndef NBBY
66 #define NBBY 8
67 #endif
68 #define FD_MASK_BYTES sizeof(fd_mask)
69 #define FD_MASK_BITS (FD_MASK_BYTES*NBBY)
70
71 /* STATIC */
72 static int fdIsHttp(int fd);
73 static int fdIsIcp(int fd);
74 static int fdIsDns(int fd);
75 static OBJH commIncomingStats;
76 static int comm_check_incoming_poll_handlers(int nfds, int *fds);
77 static void comm_poll_dns_incoming(void);
78
79 /*
80 * Automatic tuning for incoming requests:
81 *
82 * INCOMING sockets are the ICP and HTTP ports. We need to check these
83 * fairly regularly, but how often? When the load increases, we
84 * want to check the incoming sockets more often. If we have a lot
85 * of incoming ICP, then we need to check these sockets more than
86 * if we just have HTTP.
87 *
88 * The variables 'incoming_icp_interval' and 'incoming_http_interval'
89 * determine how many normal I/O events to process before checking
90 * incoming sockets again. Note we store the incoming_interval
91 * multipled by a factor of (2^INCOMING_FACTOR) to have some
92 * pseudo-floating point precision.
93 *
94 * The variable 'icp_io_events' and 'http_io_events' counts how many normal
95 * I/O events have been processed since the last check on the incoming
96 * sockets. When io_events > incoming_interval, its time to check incoming
97 * sockets.
98 *
99 * Every time we check incoming sockets, we count how many new messages
100 * or connections were processed. This is used to adjust the
101 * incoming_interval for the next iteration. The new incoming_interval
102 * is calculated as the current incoming_interval plus what we would
103 * like to see as an average number of events minus the number of
104 * events just processed.
105 *
106 * incoming_interval = incoming_interval + target_average - number_of_events_processed
107 *
108 * There are separate incoming_interval counters for both HTTP and ICP events
109 *
110 * You can see the current values of the incoming_interval's, as well as
111 * a histogram of 'incoming_events' by asking the cache manager
112 * for 'comm_incoming', e.g.:
113 *
114 * % ./client mgr:comm_poll_incoming
115 *
116 * Caveats:
117 *
118 * - We have MAX_INCOMING_INTEGER as a magic upper limit on
119 * incoming_interval for both types of sockets. At the
120 * largest value the cache will effectively be idling.
121 *
122 * - The higher the INCOMING_FACTOR, the slower the algorithm will
123 * respond to load spikes/increases/decreases in demand. A value
124 * between 3 and 8 is recommended.
125 */
126
127 #define MAX_INCOMING_INTEGER 256
128 #define INCOMING_FACTOR 5
129 #define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
130 static int icp_io_events = 0;
131 static int dns_io_events = 0;
132 static int http_io_events = 0;
133 static int incoming_icp_interval = 16 << INCOMING_FACTOR;
134 static int incoming_dns_interval = 16 << INCOMING_FACTOR;
135 static int incoming_http_interval = 16 << INCOMING_FACTOR;
136 #define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>> INCOMING_FACTOR))
137 #define commCheckDNSIncoming (++dns_io_events > (incoming_dns_interval>> INCOMING_FACTOR))
138 #define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>> INCOMING_FACTOR))
139
140
141 void
142 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
143 {
144 fde *F = &fd_table[fd];
145 assert(fd >= 0);
146 assert(F->flags.open);
147 debugs(5, 5, "commSetSelect: FD " << fd << " type " << type);
148
149 if (type & COMM_SELECT_READ) {
150 F->read_handler = handler;
151 F->read_data = client_data;
152 }
153
154 if (type & COMM_SELECT_WRITE) {
155 F->write_handler = handler;
156 F->write_data = client_data;
157 }
158
159 if (timeout)
160 F->timeout = squid_curtime + timeout;
161 }
162
163 void
164 Comm::ResetSelect(int fd)
165 {
166 }
167
168 static int
169 fdIsIcp(int fd)
170 {
171 if (icpIncomingConn != NULL && icpIncomingConn->fd == fd)
172 return 1;
173
174 if (icpOutgoingConn != NULL && icpOutgoingConn->fd == fd)
175 return 1;
176
177 return 0;
178 }
179
180 static int
181 fdIsDns(int fd)
182 {
183 if (fd == DnsSocketA)
184 return 1;
185
186 if (fd == DnsSocketB)
187 return 1;
188
189 return 0;
190 }
191
192 static int
193 fdIsHttp(int fd)
194 {
195 int j;
196
197 for (j = 0; j < NHttpSockets; j++) {
198 if (fd == HttpSockets[j])
199 return 1;
200 }
201
202 return 0;
203 }
204
205 static int
206 comm_check_incoming_poll_handlers(int nfds, int *fds)
207 {
208 int i;
209 int fd;
210 PF *hdl = NULL;
211 int npfds;
212
213 struct pollfd pfds[3 + MAXHTTPPORTS];
214 PROF_start(comm_check_incoming);
215 incoming_sockets_accepted = 0;
216
217 for (i = npfds = 0; i < nfds; i++) {
218 int events;
219 fd = fds[i];
220 events = 0;
221
222 if (fd_table[fd].read_handler)
223 events |= POLLRDNORM;
224
225 if (fd_table[fd].write_handler)
226 events |= POLLWRNORM;
227
228 if (events) {
229 pfds[npfds].fd = fd;
230 pfds[npfds].events = events;
231 pfds[npfds].revents = 0;
232 npfds++;
233 }
234 }
235
236 if (!nfds) {
237 PROF_stop(comm_check_incoming);
238 return -1;
239 }
240
241 getCurrentTime();
242 statCounter.syscalls.selects++;
243
244 if (poll(pfds, npfds, 0) < 1) {
245 PROF_stop(comm_check_incoming);
246 return incoming_sockets_accepted;
247 }
248
249 for (i = 0; i < npfds; i++) {
250 int revents;
251
252 if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
253 continue;
254
255 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
256 if ((hdl = fd_table[fd].read_handler)) {
257 fd_table[fd].read_handler = NULL;
258 hdl(fd, fd_table[fd].read_data);
259 } else if (pfds[i].events & POLLRDNORM)
260 debugs(5, 1, "comm_poll_incoming: FD " << fd << " NULL read handler");
261 }
262
263 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
264 if ((hdl = fd_table[fd].write_handler)) {
265 fd_table[fd].write_handler = NULL;
266 hdl(fd, fd_table[fd].write_data);
267 } else if (pfds[i].events & POLLWRNORM)
268 debugs(5, 1, "comm_poll_incoming: FD " << fd << " NULL write_handler");
269 }
270 }
271
272 PROF_stop(comm_check_incoming);
273 return incoming_sockets_accepted;
274 }
275
276 static void
277 comm_poll_icp_incoming(void)
278 {
279 int nfds = 0;
280 int fds[2];
281 int nevents;
282 icp_io_events = 0;
283
284 if (Comm::IsConnOpen(icpIncomingConn))
285 fds[nfds++] = icpIncomingConn->fd;
286
287 if (icpIncomingConn != icpOutgoingConn && Comm::IsConnOpen(icpOutgoingConn))
288 fds[nfds++] = icpOutgoingConn->fd;
289
290 if (nfds == 0)
291 return;
292
293 nevents = comm_check_incoming_poll_handlers(nfds, fds);
294
295 incoming_icp_interval += Config.comm_incoming.icp_average - nevents;
296
297 if (incoming_icp_interval < Config.comm_incoming.icp_min_poll)
298 incoming_icp_interval = Config.comm_incoming.icp_min_poll;
299
300 if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
301 incoming_icp_interval = MAX_INCOMING_INTERVAL;
302
303 if (nevents > INCOMING_ICP_MAX)
304 nevents = INCOMING_ICP_MAX;
305
306 statHistCount(&statCounter.comm_icp_incoming, nevents);
307 }
308
309 static void
310 comm_poll_http_incoming(void)
311 {
312 int nfds = 0;
313 int fds[MAXHTTPPORTS];
314 int j;
315 int nevents;
316 http_io_events = 0;
317
318 /* only poll sockets that won't be deferred */
319
320 for (j = 0; j < NHttpSockets; j++) {
321 if (HttpSockets[j] < 0)
322 continue;
323
324 fds[nfds++] = HttpSockets[j];
325 }
326
327 nevents = comm_check_incoming_poll_handlers(nfds, fds);
328 incoming_http_interval = incoming_http_interval
329 + Config.comm_incoming.http_average - nevents;
330
331 if (incoming_http_interval < Config.comm_incoming.http_min_poll)
332 incoming_http_interval = Config.comm_incoming.http_min_poll;
333
334 if (incoming_http_interval > MAX_INCOMING_INTERVAL)
335 incoming_http_interval = MAX_INCOMING_INTERVAL;
336
337 if (nevents > INCOMING_HTTP_MAX)
338 nevents = INCOMING_HTTP_MAX;
339
340 statHistCount(&statCounter.comm_http_incoming, nevents);
341 }
342
343 /* poll all sockets; call handlers for those that are ready. */
344 comm_err_t
345 Comm::DoSelect(int msec)
346 {
347 struct pollfd pfds[SQUID_MAXFD];
348
349 PF *hdl = NULL;
350 int fd;
351 int maxfd;
352 unsigned long nfds;
353 unsigned long npending;
354 int num;
355 int callicp = 0, callhttp = 0;
356 int calldns = 0;
357 double timeout = current_dtime + (msec / 1000.0);
358
359 do {
360 double start;
361 getCurrentTime();
362 start = current_dtime;
363
364 if (commCheckICPIncoming)
365 comm_poll_icp_incoming();
366
367 if (commCheckDNSIncoming)
368 comm_poll_dns_incoming();
369
370 if (commCheckHTTPIncoming)
371 comm_poll_http_incoming();
372
373 PROF_start(comm_poll_prep_pfds);
374
375 callicp = calldns = callhttp = 0;
376
377 nfds = 0;
378
379 npending = 0;
380
381 maxfd = Biggest_FD + 1;
382
383 for (int i = 0; i < maxfd; i++) {
384 int events;
385 events = 0;
386 /* Check each open socket for a handler. */
387
388 if (fd_table[i].read_handler)
389 events |= POLLRDNORM;
390
391 if (fd_table[i].write_handler)
392 events |= POLLWRNORM;
393
394 if (events) {
395 pfds[nfds].fd = i;
396 pfds[nfds].events = events;
397 pfds[nfds].revents = 0;
398 nfds++;
399
400 if ((events & POLLRDNORM) && fd_table[i].flags.read_pending)
401 npending++;
402 }
403 }
404
405 PROF_stop(comm_poll_prep_pfds);
406
407 if (npending)
408 msec = 0;
409
410 if (msec > MAX_POLL_TIME)
411 msec = MAX_POLL_TIME;
412
413 /* nothing to do
414 *
415 * Note that this will only ever trigger when there are no log files
416 * and stdout/err/in are all closed too.
417 */
418 if (nfds == 0 && npending == 0) {
419 if (shutting_down)
420 return COMM_SHUTDOWN;
421 else
422 return COMM_IDLE;
423 }
424
425 for (;;) {
426 PROF_start(comm_poll_normal);
427 ++statCounter.syscalls.selects;
428 num = poll(pfds, nfds, msec);
429 ++statCounter.select_loops;
430 PROF_stop(comm_poll_normal);
431
432 if (num >= 0 || npending > 0)
433 break;
434
435 if (ignoreErrno(errno))
436 continue;
437
438 debugs(5, 0, "comm_poll: poll failure: " << xstrerror());
439
440 assert(errno != EINVAL);
441
442 return COMM_ERROR;
443
444 /* NOTREACHED */
445 }
446
447 getCurrentTime();
448
449 debugs(5, num ? 5 : 8, "comm_poll: " << num << "+" << npending << " FDs ready");
450 statHistCount(&statCounter.select_fds_hist, num);
451
452 if (num == 0 && npending == 0)
453 continue;
454
455 /* scan each socket but the accept socket. Poll this
456 * more frequently to minimize losses due to the 5 connect
457 * limit in SunOS */
458 PROF_start(comm_handle_ready_fd);
459
460 for (size_t loopIndex = 0; loopIndex < nfds; loopIndex++) {
461 fde *F;
462 int revents = pfds[loopIndex].revents;
463 fd = pfds[loopIndex].fd;
464
465 if (fd == -1)
466 continue;
467
468 if (fd_table[fd].flags.read_pending)
469 revents |= POLLIN;
470
471 if (revents == 0)
472 continue;
473
474 if (fdIsIcp(fd)) {
475 callicp = 1;
476 continue;
477 }
478
479 if (fdIsDns(fd)) {
480 calldns = 1;
481 continue;
482 }
483
484 if (fdIsHttp(fd)) {
485 callhttp = 1;
486 continue;
487 }
488
489 F = &fd_table[fd];
490
491 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
492 debugs(5, 6, "comm_poll: FD " << fd << " ready for reading");
493
494 if (NULL == (hdl = F->read_handler))
495 (void) 0;
496 else {
497 PROF_start(comm_read_handler);
498 F->read_handler = NULL;
499 F->flags.read_pending = 0;
500 hdl(fd, F->read_data);
501 PROF_stop(comm_read_handler);
502 statCounter.select_fds++;
503
504 if (commCheckICPIncoming)
505 comm_poll_icp_incoming();
506
507 if (commCheckDNSIncoming)
508 comm_poll_dns_incoming();
509
510 if (commCheckHTTPIncoming)
511 comm_poll_http_incoming();
512 }
513 }
514
515 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
516 debugs(5, 5, "comm_poll: FD " << fd << " ready for writing");
517
518 if ((hdl = F->write_handler)) {
519 PROF_start(comm_write_handler);
520 F->write_handler = NULL;
521 hdl(fd, F->write_data);
522 PROF_stop(comm_write_handler);
523 statCounter.select_fds++;
524
525 if (commCheckICPIncoming)
526 comm_poll_icp_incoming();
527
528 if (commCheckDNSIncoming)
529 comm_poll_dns_incoming();
530
531 if (commCheckHTTPIncoming)
532 comm_poll_http_incoming();
533 }
534 }
535
536 if (revents & POLLNVAL) {
537 AsyncCall::Pointer ch;
538 debugs(5, 0, "WARNING: FD " << fd << " has handlers, but it's invalid.");
539 debugs(5, 0, "FD " << fd << " is a " << fdTypeStr[F->type]);
540 debugs(5, 0, "--> " << F->desc);
541 debugs(5, 0, "tmout:" << F->timeoutHandler << "read:" <<
542 F->read_handler << " write:" << F->write_handler);
543
544 for (ch = F->closeHandler; ch != NULL; ch = ch->Next())
545 debugs(5, 0, " close handler: " << ch);
546
547 if (F->closeHandler != NULL) {
548 commCallCloseHandlers(fd);
549 } else if (F->timeoutHandler != NULL) {
550 debugs(5, 0, "comm_poll: Calling Timeout Handler");
551 ScheduleCallHere(F->timeoutHandler);
552 }
553
554 F->closeHandler = NULL;
555 F->timeoutHandler = NULL;
556 F->read_handler = NULL;
557 F->write_handler = NULL;
558
559 if (F->flags.open)
560 fd_close(fd);
561 }
562 }
563
564 PROF_stop(comm_handle_ready_fd);
565
566 if (callicp)
567 comm_poll_icp_incoming();
568
569 if (calldns)
570 comm_poll_dns_incoming();
571
572 if (callhttp)
573 comm_poll_http_incoming();
574
575 getCurrentTime();
576
577 statCounter.select_time += (current_dtime - start);
578
579 return COMM_OK;
580 } while (timeout > current_dtime);
581
582 debugs(5, 8, "comm_poll: time out: " << squid_curtime << ".");
583
584 return COMM_TIMEOUT;
585 }
586
587
588 static void
589 comm_poll_dns_incoming(void)
590 {
591 int nfds = 0;
592 int fds[2];
593 int nevents;
594 dns_io_events = 0;
595
596 if (DnsSocketA < 0 && DnsSocketB < 0)
597 return;
598
599 if (DnsSocketA >= 0)
600 fds[nfds++] = DnsSocketA;
601
602 if (DnsSocketB >= 0)
603 fds[nfds++] = DnsSocketB;
604
605 nevents = comm_check_incoming_poll_handlers(nfds, fds);
606
607 if (nevents < 0)
608 return;
609
610 incoming_dns_interval += Config.comm_incoming.dns_average - nevents;
611
612 if (incoming_dns_interval < Config.comm_incoming.dns_min_poll)
613 incoming_dns_interval = Config.comm_incoming.dns_min_poll;
614
615 if (incoming_dns_interval > MAX_INCOMING_INTERVAL)
616 incoming_dns_interval = MAX_INCOMING_INTERVAL;
617
618 if (nevents > INCOMING_DNS_MAX)
619 nevents = INCOMING_DNS_MAX;
620
621 statHistCount(&statCounter.comm_dns_incoming, nevents);
622 }
623
624
625 static void
626 commPollRegisterWithCacheManager(void)
627 {
628 Mgr::RegisterAction("comm_poll_incoming",
629 "comm_incoming() stats",
630 commIncomingStats, 0, 1);
631 }
632
633 void
634 Comm::SelectLoopInit(void)
635 {
636 commPollRegisterWithCacheManager();
637 }
638
639 static void
640 commIncomingStats(StoreEntry * sentry)
641 {
642 StatCounters *f = &statCounter;
643 storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n",
644 incoming_icp_interval >> INCOMING_FACTOR);
645 storeAppendPrintf(sentry, "Current incoming_dns_interval: %d\n",
646 incoming_dns_interval >> INCOMING_FACTOR);
647 storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n",
648 incoming_http_interval >> INCOMING_FACTOR);
649 storeAppendPrintf(sentry, "\n");
650 storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");
651 storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_icp_incoming() call:\n");
652 statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);
653 storeAppendPrintf(sentry, "DNS Messages handled per comm_poll_dns_incoming() call:\n");
654 statHistDump(&f->comm_dns_incoming, sentry, statHistIntDumper);
655 storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_http_incoming() call:\n");
656 statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);
657 }
658
659 /* Called by async-io or diskd to speed up the polling */
660 void
661 Comm::QuickPollRequired(void)
662 {
663 MAX_POLL_TIME = 10;
664 }
665
666 #endif /* USE_POLL */