]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/ModPoll.cc
Source Format Enforcement (#1234)
[thirdparty/squid.git] / src / comm / ModPoll.cc
1 /*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 05 Socket Functions */
10
11 #include "squid.h"
12
13 #if USE_POLL
14 #include "anyp/PortCfg.h"
15 #include "comm/Connection.h"
16 #include "comm/Loops.h"
17 #include "fd.h"
18 #include "fde.h"
19 #include "globals.h"
20 #include "ICP.h"
21 #include "mgr/Registration.h"
22 #include "SquidConfig.h"
23 #include "StatCounters.h"
24 #include "Store.h"
25
26 #include <cerrno>
27 #if HAVE_POLL_H
28 #include <poll.h>
29 #endif
30
31 /* Needed for poll() on Linux at least */
32 #if USE_POLL
33 #ifndef POLLRDNORM
34 #define POLLRDNORM POLLIN
35 #endif
36 #ifndef POLLWRNORM
37 #define POLLWRNORM POLLOUT
38 #endif
39 #endif
40
41 static int MAX_POLL_TIME = 1000; /* see also Comm::QuickPollRequired() */
42
43 #ifndef howmany
44 #define howmany(x, y) (((x)+((y)-1))/(y))
45 #endif
46 #ifndef NBBY
47 #define NBBY 8
48 #endif
49 #define FD_MASK_BYTES sizeof(fd_mask)
50 #define FD_MASK_BITS (FD_MASK_BYTES*NBBY)
51
52 /* STATIC */
53 static int fdIsTcpListen(int fd);
54 static int fdIsUdpListen(int fd);
55 static int fdIsDns(int fd);
56 static OBJH commIncomingStats;
57 static int comm_check_incoming_poll_handlers(int nfds, int *fds);
58 static void comm_poll_dns_incoming(void);
59
60 /*
61 * Automatic tuning for incoming requests:
62 *
63 * INCOMING sockets are the ICP and HTTP ports. We need to check these
64 * fairly regularly, but how often? When the load increases, we
65 * want to check the incoming sockets more often. If we have a lot
66 * of incoming ICP, then we need to check these sockets more than
67 * if we just have HTTP.
68 *
69 * The variables 'incoming_icp_interval' and 'incoming_http_interval'
70 * determine how many normal I/O events to process before checking
71 * incoming sockets again. Note we store the incoming_interval
72 * multiplied by a factor of (2^INCOMING_FACTOR) to have some
73 * pseudo-floating point precision.
74 *
75 * The variable 'udp_io_events' and 'tcp_io_events' counts how many normal
76 * I/O events have been processed since the last check on the incoming
77 * sockets. When io_events > incoming_interval, its time to check incoming
78 * sockets.
79 *
80 * Every time we check incoming sockets, we count how many new messages
81 * or connections were processed. This is used to adjust the
82 * incoming_interval for the next iteration. The new incoming_interval
83 * is calculated as the current incoming_interval plus what we would
84 * like to see as an average number of events minus the number of
85 * events just processed.
86 *
87 * incoming_interval = incoming_interval + target_average - number_of_events_processed
88 *
89 * There are separate incoming_interval counters for TCP-based, UDP-based, and DNS events
90 *
91 * You can see the current values of the incoming_interval's, as well as
92 * a histogram of 'incoming_events' by asking the cache manager
93 * for 'comm_incoming', e.g.:
94 *
95 * % ./client mgr:comm_poll_incoming
96 *
97 * Caveats:
98 *
99 * - We have MAX_INCOMING_INTEGER as a magic upper limit on
100 * incoming_interval for both types of sockets. At the
101 * largest value the cache will effectively be idling.
102 *
103 * - The higher the INCOMING_FACTOR, the slower the algorithm will
104 * respond to load spikes/increases/decreases in demand. A value
105 * between 3 and 8 is recommended.
106 */
107
108 #define MAX_INCOMING_INTEGER 256
109 #define INCOMING_FACTOR 5
110 #define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
111 static int udp_io_events = 0; ///< I/O events passed since last UDP receiver socket poll
112 static int dns_io_events = 0; ///< I/O events passed since last DNS socket poll
113 static int tcp_io_events = 0; ///< I/O events passed since last TCP listening socket poll
114 static int incoming_udp_interval = 16 << INCOMING_FACTOR;
115 static int incoming_dns_interval = 16 << INCOMING_FACTOR;
116 static int incoming_tcp_interval = 16 << INCOMING_FACTOR;
117 #define commCheckUdpIncoming (++udp_io_events > (incoming_udp_interval>> INCOMING_FACTOR))
118 #define commCheckDnsIncoming (++dns_io_events > (incoming_dns_interval>> INCOMING_FACTOR))
119 #define commCheckTcpIncoming (++tcp_io_events > (incoming_tcp_interval>> INCOMING_FACTOR))
120
121 void
122 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
123 {
124 fde *F = &fd_table[fd];
125 assert(fd >= 0);
126 assert(F->flags.open || (!handler && !client_data && !timeout));
127 debugs(5, 5, "FD " << fd << ", type=" << type <<
128 ", handler=" << handler << ", client_data=" << client_data <<
129 ", timeout=" << timeout);
130
131 if (type & COMM_SELECT_READ) {
132 F->read_handler = handler;
133 F->read_data = client_data;
134 }
135
136 if (type & COMM_SELECT_WRITE) {
137 F->write_handler = handler;
138 F->write_data = client_data;
139 }
140
141 if (timeout)
142 F->timeout = squid_curtime + timeout;
143 }
144
145 static int
146 fdIsUdpListen(int fd)
147 {
148 if (icpIncomingConn != nullptr && icpIncomingConn->fd == fd)
149 return 1;
150
151 if (icpOutgoingConn != nullptr && icpOutgoingConn->fd == fd)
152 return 1;
153
154 return 0;
155 }
156
157 static int
158 fdIsDns(int fd)
159 {
160 if (fd == DnsSocketA)
161 return 1;
162
163 if (fd == DnsSocketB)
164 return 1;
165
166 return 0;
167 }
168
169 static int
170 fdIsTcpListen(int fd)
171 {
172 for (AnyP::PortCfgPointer s = HttpPortList; s != nullptr; s = s->next) {
173 if (s->listenConn != nullptr && s->listenConn->fd == fd)
174 return 1;
175 }
176
177 return 0;
178 }
179
180 static int
181 comm_check_incoming_poll_handlers(int nfds, int *fds)
182 {
183 int i;
184 int fd;
185 PF *hdl = nullptr;
186 int npfds;
187
188 struct pollfd pfds[3 + MAXTCPLISTENPORTS];
189 incoming_sockets_accepted = 0;
190
191 for (i = npfds = 0; i < nfds; ++i) {
192 int events;
193 fd = fds[i];
194 events = 0;
195
196 if (fd_table[fd].read_handler)
197 events |= POLLRDNORM;
198
199 if (fd_table[fd].write_handler)
200 events |= POLLWRNORM;
201
202 if (events) {
203 pfds[npfds].fd = fd;
204 pfds[npfds].events = events;
205 pfds[npfds].revents = 0;
206 ++npfds;
207 }
208 }
209
210 if (!nfds)
211 return -1;
212
213 getCurrentTime();
214 ++ statCounter.syscalls.selects;
215
216 if (poll(pfds, npfds, 0) < 1)
217 return incoming_sockets_accepted;
218
219 for (i = 0; i < npfds; ++i) {
220 int revents;
221
222 if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
223 continue;
224
225 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
226 if ((hdl = fd_table[fd].read_handler)) {
227 fd_table[fd].read_handler = nullptr;
228 hdl(fd, fd_table[fd].read_data);
229 } else if (pfds[i].events & POLLRDNORM)
230 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL read handler");
231 }
232
233 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
234 if ((hdl = fd_table[fd].write_handler)) {
235 fd_table[fd].write_handler = nullptr;
236 hdl(fd, fd_table[fd].write_data);
237 } else if (pfds[i].events & POLLWRNORM)
238 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL write_handler");
239 }
240 }
241
242 return incoming_sockets_accepted;
243 }
244
245 static void
246 comm_poll_udp_incoming(void)
247 {
248 int nfds = 0;
249 int fds[2];
250 int nevents;
251 udp_io_events = 0;
252
253 if (Comm::IsConnOpen(icpIncomingConn)) {
254 fds[nfds] = icpIncomingConn->fd;
255 ++nfds;
256 }
257
258 if (icpIncomingConn != icpOutgoingConn && Comm::IsConnOpen(icpOutgoingConn)) {
259 fds[nfds] = icpOutgoingConn->fd;
260 ++nfds;
261 }
262
263 if (nfds == 0)
264 return;
265
266 nevents = comm_check_incoming_poll_handlers(nfds, fds);
267
268 incoming_udp_interval += Config.comm_incoming.udp.average - nevents;
269
270 if (incoming_udp_interval < Config.comm_incoming.udp.min_poll)
271 incoming_udp_interval = Config.comm_incoming.udp.min_poll;
272
273 if (incoming_udp_interval > MAX_INCOMING_INTERVAL)
274 incoming_udp_interval = MAX_INCOMING_INTERVAL;
275
276 if (nevents > INCOMING_UDP_MAX)
277 nevents = INCOMING_UDP_MAX;
278
279 statCounter.comm_udp_incoming.count(nevents);
280 }
281
282 static void
283 comm_poll_tcp_incoming(void)
284 {
285 int nfds = 0;
286 int fds[MAXTCPLISTENPORTS];
287 int j;
288 int nevents;
289 tcp_io_events = 0;
290
291 // XXX: only poll sockets that won't be deferred. But how do we identify them?
292
293 for (j = 0; j < NHttpSockets; ++j) {
294 if (HttpSockets[j] < 0)
295 continue;
296
297 fds[nfds] = HttpSockets[j];
298 ++nfds;
299 }
300
301 nevents = comm_check_incoming_poll_handlers(nfds, fds);
302 incoming_tcp_interval = incoming_tcp_interval
303 + Config.comm_incoming.tcp.average - nevents;
304
305 if (incoming_tcp_interval < Config.comm_incoming.tcp.min_poll)
306 incoming_tcp_interval = Config.comm_incoming.tcp.min_poll;
307
308 if (incoming_tcp_interval > MAX_INCOMING_INTERVAL)
309 incoming_tcp_interval = MAX_INCOMING_INTERVAL;
310
311 if (nevents > INCOMING_TCP_MAX)
312 nevents = INCOMING_TCP_MAX;
313
314 statCounter.comm_tcp_incoming.count(nevents);
315 }
316
317 /* poll all sockets; call handlers for those that are ready. */
318 Comm::Flag
319 Comm::DoSelect(int msec)
320 {
321 struct pollfd pfds[SQUID_MAXFD];
322
323 PF *hdl = nullptr;
324 int fd;
325 int maxfd;
326 unsigned long nfds;
327 unsigned long npending;
328 int num;
329 int calldns = 0, calludp = 0, calltcp = 0;
330 double timeout = current_dtime + (msec / 1000.0);
331
332 do {
333 double start;
334 getCurrentTime();
335 start = current_dtime;
336
337 if (commCheckUdpIncoming)
338 comm_poll_udp_incoming();
339
340 if (commCheckDnsIncoming)
341 comm_poll_dns_incoming();
342
343 if (commCheckTcpIncoming)
344 comm_poll_tcp_incoming();
345
346 calldns = calludp = calltcp = 0;
347
348 nfds = 0;
349
350 npending = 0;
351
352 maxfd = Biggest_FD + 1;
353
354 for (int i = 0; i < maxfd; ++i) {
355 int events;
356 events = 0;
357 /* Check each open socket for a handler. */
358
359 if (fd_table[i].read_handler)
360 events |= POLLRDNORM;
361
362 if (fd_table[i].write_handler)
363 events |= POLLWRNORM;
364
365 if (events) {
366 pfds[nfds].fd = i;
367 pfds[nfds].events = events;
368 pfds[nfds].revents = 0;
369 ++nfds;
370
371 if ((events & POLLRDNORM) && fd_table[i].flags.read_pending)
372 ++npending;
373 }
374 }
375
376 if (npending)
377 msec = 0;
378
379 if (msec > MAX_POLL_TIME)
380 msec = MAX_POLL_TIME;
381
382 /* nothing to do
383 *
384 * Note that this will only ever trigger when there are no log files
385 * and stdout/err/in are all closed too.
386 */
387 if (nfds == 0 && npending == 0) {
388 if (shutting_down)
389 return Comm::SHUTDOWN;
390 else
391 return Comm::IDLE;
392 }
393
394 for (;;) {
395 ++ statCounter.syscalls.selects;
396 num = poll(pfds, nfds, msec);
397 int xerrno = errno;
398 ++ statCounter.select_loops;
399
400 if (num >= 0 || npending > 0)
401 break;
402
403 if (ignoreErrno(xerrno))
404 continue;
405
406 debugs(5, DBG_CRITICAL, MYNAME << "poll failure: " << xstrerr(xerrno));
407
408 assert(xerrno != EINVAL);
409
410 return Comm::COMM_ERROR;
411
412 /* NOTREACHED */
413 }
414
415 getCurrentTime();
416
417 debugs(5, num ? 5 : 8, "comm_poll: " << num << "+" << npending << " FDs ready");
418 statCounter.select_fds_hist.count(num);
419
420 if (num == 0 && npending == 0)
421 continue;
422
423 /* scan each socket but the accept socket. Poll this
424 * more frequently to minimize losses due to the 5 connect
425 * limit in SunOS */
426
427 for (size_t loopIndex = 0; loopIndex < nfds; ++loopIndex) {
428 fde *F;
429 int revents = pfds[loopIndex].revents;
430 fd = pfds[loopIndex].fd;
431
432 if (fd == -1)
433 continue;
434
435 if (fd_table[fd].flags.read_pending)
436 revents |= POLLIN;
437
438 if (revents == 0)
439 continue;
440
441 if (fdIsUdpListen(fd)) {
442 calludp = 1;
443 continue;
444 }
445
446 if (fdIsDns(fd)) {
447 calldns = 1;
448 continue;
449 }
450
451 if (fdIsTcpListen(fd)) {
452 calltcp = 1;
453 continue;
454 }
455
456 F = &fd_table[fd];
457
458 if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
459 debugs(5, 6, "comm_poll: FD " << fd << " ready for reading");
460
461 if ((hdl = F->read_handler)) {
462 F->read_handler = nullptr;
463 hdl(fd, F->read_data);
464 ++ statCounter.select_fds;
465
466 if (commCheckUdpIncoming)
467 comm_poll_udp_incoming();
468
469 if (commCheckDnsIncoming)
470 comm_poll_dns_incoming();
471
472 if (commCheckTcpIncoming)
473 comm_poll_tcp_incoming();
474 }
475 }
476
477 if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
478 debugs(5, 6, "comm_poll: FD " << fd << " ready for writing");
479
480 if ((hdl = F->write_handler)) {
481 F->write_handler = nullptr;
482 hdl(fd, F->write_data);
483 ++ statCounter.select_fds;
484
485 if (commCheckUdpIncoming)
486 comm_poll_udp_incoming();
487
488 if (commCheckDnsIncoming)
489 comm_poll_dns_incoming();
490
491 if (commCheckTcpIncoming)
492 comm_poll_tcp_incoming();
493 }
494 }
495
496 if (revents & POLLNVAL) {
497 AsyncCall::Pointer ch;
498 debugs(5, DBG_CRITICAL, "WARNING: FD " << fd << " has handlers, but it's invalid.");
499 debugs(5, DBG_CRITICAL, "FD " << fd << " is a " << fdTypeStr[F->type]);
500 debugs(5, DBG_CRITICAL, "--> " << F->desc);
501 debugs(5, DBG_CRITICAL, "tmout:" << F->timeoutHandler << "read:" <<
502 F->read_handler << " write:" << F->write_handler);
503
504 for (ch = F->closeHandler; ch != nullptr; ch = ch->Next())
505 debugs(5, DBG_CRITICAL, " close handler: " << ch);
506
507 if (F->closeHandler != nullptr) {
508 commCallCloseHandlers(fd);
509 } else if (F->timeoutHandler != nullptr) {
510 debugs(5, DBG_CRITICAL, "comm_poll: Calling Timeout Handler");
511 ScheduleCallHere(F->timeoutHandler);
512 }
513
514 F->closeHandler = nullptr;
515 F->timeoutHandler = nullptr;
516 F->read_handler = nullptr;
517 F->write_handler = nullptr;
518
519 if (F->flags.open)
520 fd_close(fd);
521 }
522 }
523
524 if (calludp)
525 comm_poll_udp_incoming();
526
527 if (calldns)
528 comm_poll_dns_incoming();
529
530 if (calltcp)
531 comm_poll_tcp_incoming();
532
533 getCurrentTime();
534
535 statCounter.select_time += (current_dtime - start);
536
537 return Comm::OK;
538 } while (timeout > current_dtime);
539
540 debugs(5, 8, "comm_poll: time out: " << squid_curtime << ".");
541
542 return Comm::TIMEOUT;
543 }
544
545 static void
546 comm_poll_dns_incoming(void)
547 {
548 int nfds = 0;
549 int fds[2];
550 int nevents;
551 dns_io_events = 0;
552
553 if (DnsSocketA < 0 && DnsSocketB < 0)
554 return;
555
556 if (DnsSocketA >= 0) {
557 fds[nfds] = DnsSocketA;
558 ++nfds;
559 }
560
561 if (DnsSocketB >= 0) {
562 fds[nfds] = DnsSocketB;
563 ++nfds;
564 }
565
566 nevents = comm_check_incoming_poll_handlers(nfds, fds);
567
568 if (nevents < 0)
569 return;
570
571 incoming_dns_interval += Config.comm_incoming.dns.average - nevents;
572
573 if (incoming_dns_interval < Config.comm_incoming.dns.min_poll)
574 incoming_dns_interval = Config.comm_incoming.dns.min_poll;
575
576 if (incoming_dns_interval > MAX_INCOMING_INTERVAL)
577 incoming_dns_interval = MAX_INCOMING_INTERVAL;
578
579 if (nevents > INCOMING_DNS_MAX)
580 nevents = INCOMING_DNS_MAX;
581
582 statCounter.comm_dns_incoming.count(nevents);
583 }
584
585 static void
586 commPollRegisterWithCacheManager(void)
587 {
588 Mgr::RegisterAction("comm_poll_incoming",
589 "comm_incoming() stats",
590 commIncomingStats, 0, 1);
591 }
592
593 void
594 Comm::SelectLoopInit(void)
595 {
596 commPollRegisterWithCacheManager();
597 }
598
599 static void
600 commIncomingStats(StoreEntry * sentry)
601 {
602 storeAppendPrintf(sentry, "Current incoming_udp_interval: %d\n",
603 incoming_udp_interval >> INCOMING_FACTOR);
604 storeAppendPrintf(sentry, "Current incoming_dns_interval: %d\n",
605 incoming_dns_interval >> INCOMING_FACTOR);
606 storeAppendPrintf(sentry, "Current incoming_tcp_interval: %d\n",
607 incoming_tcp_interval >> INCOMING_FACTOR);
608 storeAppendPrintf(sentry, "\n");
609 storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");
610 storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_udp_incoming() call:\n");
611 statCounter.comm_udp_incoming.dump(sentry, statHistIntDumper);
612 storeAppendPrintf(sentry, "DNS Messages handled per comm_poll_dns_incoming() call:\n");
613 statCounter.comm_dns_incoming.dump(sentry, statHistIntDumper);
614 storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_tcp_incoming() call:\n");
615 statCounter.comm_tcp_incoming.dump(sentry, statHistIntDumper);
616 }
617
618 /* Called by async-io or diskd to speed up the polling */
619 void
620 Comm::QuickPollRequired(void)
621 {
622 MAX_POLL_TIME = 10;
623 }
624
625 #endif /* USE_POLL */
626