]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/ModSelect.cc
6f3a58d3ef0ccc2036b2882b13069a30190e2c1e
[thirdparty/squid.git] / src / comm / ModSelect.cc
1 /*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 05 Socket Functions */
10
11 #include "squid.h"
12
13 #if USE_SELECT
14
15 #include "anyp/PortCfg.h"
16 #include "comm/Connection.h"
17 #include "comm/Loops.h"
18 #include "fde.h"
19 #include "globals.h"
20 #include "ICP.h"
21 #include "mgr/Registration.h"
22 #include "SquidConfig.h"
23 #include "StatCounters.h"
24 #include "StatHist.h"
25 #include "Store.h"
26
27 #include <cerrno>
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 static int MAX_POLL_TIME = 1000; /* see also Comm::QuickPollRequired() */
33
34 #ifndef howmany
35 #define howmany(x, y) (((x)+((y)-1))/(y))
36 #endif
37 #ifndef NBBY
38 #define NBBY 8
39 #endif
40 #define FD_MASK_BYTES sizeof(fd_mask)
41 #define FD_MASK_BITS (FD_MASK_BYTES*NBBY)
42
43 /* STATIC */
44 static int examine_select(fd_set *, fd_set *);
45 static int fdIsTcpListener(int fd);
46 static int fdIsUdpListener(int fd);
47 static int fdIsDns(int fd);
48 static OBJH commIncomingStats;
49 static int comm_check_incoming_select_handlers(int nfds, int *fds);
50 static void comm_select_dns_incoming(void);
51 static void commUpdateReadBits(int fd, PF * handler);
52 static void commUpdateWriteBits(int fd, PF * handler);
53
54 static struct timeval zero_tv;
55 static fd_set global_readfds;
56 static fd_set global_writefds;
57 static int nreadfds;
58 static int nwritefds;
59
60 void
61 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
62 {
63 fde *F = &fd_table[fd];
64 assert(fd >= 0);
65 assert(F->flags.open || (!handler && !client_data && !timeout));
66 debugs(5, 5, "FD " << fd << ", type=" << type <<
67 ", handler=" << handler << ", client_data=" << client_data <<
68 ", timeout=" << timeout);
69
70 if (type & COMM_SELECT_READ) {
71 F->read_handler = handler;
72 F->read_data = client_data;
73 commUpdateReadBits(fd, handler);
74 }
75
76 if (type & COMM_SELECT_WRITE) {
77 F->write_handler = handler;
78 F->write_data = client_data;
79 commUpdateWriteBits(fd, handler);
80 }
81
82 if (timeout)
83 F->timeout = squid_curtime + timeout;
84 }
85
86 static int
87 fdIsUdpListener(int fd)
88 {
89 if (icpIncomingConn != nullptr && fd == icpIncomingConn->fd)
90 return 1;
91
92 if (icpOutgoingConn != nullptr && fd == icpOutgoingConn->fd)
93 return 1;
94
95 return 0;
96 }
97
98 static int
99 fdIsDns(int fd)
100 {
101 if (fd == DnsSocketA)
102 return 1;
103
104 if (fd == DnsSocketB)
105 return 1;
106
107 return 0;
108 }
109
110 static int
111 fdIsTcpListener(int fd)
112 {
113 for (AnyP::PortCfgPointer s = HttpPortList; s != nullptr; s = s->next) {
114 if (s->listenConn != nullptr && s->listenConn->fd == fd)
115 return 1;
116 }
117
118 return 0;
119 }
120
121 static int
122 comm_check_incoming_select_handlers(int nfds, int *fds)
123 {
124 int i;
125 int fd;
126 int maxfd = 0;
127 PF *hdl = nullptr;
128 fd_set read_mask;
129 fd_set write_mask;
130 FD_ZERO(&read_mask);
131 FD_ZERO(&write_mask);
132 incoming_sockets_accepted = 0;
133
134 for (i = 0; i < nfds; ++i) {
135 fd = fds[i];
136
137 if (fd_table[fd].read_handler) {
138 FD_SET(fd, &read_mask);
139
140 if (fd > maxfd)
141 maxfd = fd;
142 }
143
144 if (fd_table[fd].write_handler) {
145 FD_SET(fd, &write_mask);
146
147 if (fd > maxfd)
148 maxfd = fd;
149 }
150 }
151
152 if (maxfd++ == 0)
153 return -1;
154
155 getCurrentTime();
156
157 ++ statCounter.syscalls.selects;
158
159 if (select(maxfd, &read_mask, &write_mask, nullptr, &zero_tv) < 1)
160 return incoming_sockets_accepted;
161
162 for (i = 0; i < nfds; ++i) {
163 fd = fds[i];
164
165 if (FD_ISSET(fd, &read_mask)) {
166 if ((hdl = fd_table[fd].read_handler) != nullptr) {
167 fd_table[fd].read_handler = nullptr;
168 commUpdateReadBits(fd, nullptr);
169 hdl(fd, fd_table[fd].read_data);
170 } else {
171 debugs(5, DBG_IMPORTANT, "comm_select_incoming: FD " << fd << " NULL read handler");
172 }
173 }
174
175 if (FD_ISSET(fd, &write_mask)) {
176 if ((hdl = fd_table[fd].write_handler) != nullptr) {
177 fd_table[fd].write_handler = nullptr;
178 commUpdateWriteBits(fd, nullptr);
179 hdl(fd, fd_table[fd].write_data);
180 } else {
181 debugs(5, DBG_IMPORTANT, "comm_select_incoming: FD " << fd << " NULL write handler");
182 }
183 }
184 }
185
186 return incoming_sockets_accepted;
187 }
188
189 static void
190 comm_select_udp_incoming(void)
191 {
192 int nfds = 0;
193 int fds[2];
194
195 if (Comm::IsConnOpen(icpIncomingConn)) {
196 fds[nfds] = icpIncomingConn->fd;
197 ++nfds;
198 }
199
200 if (Comm::IsConnOpen(icpOutgoingConn) && icpIncomingConn != icpOutgoingConn) {
201 fds[nfds] = icpOutgoingConn->fd;
202 ++nfds;
203 }
204
205 if (statCounter.comm_udp.startPolling(nfds)) {
206 auto n = comm_check_incoming_select_handlers(nfds, fds);
207 statCounter.comm_udp.finishPolling(n, Config.comm_incoming.udp);
208 }
209 }
210
211 static void
212 comm_select_tcp_incoming(void)
213 {
214 int nfds = 0;
215 int fds[MAXTCPLISTENPORTS];
216
217 // XXX: only poll sockets that won't be deferred. But how do we identify them?
218
219 for (AnyP::PortCfgPointer s = HttpPortList; s != nullptr; s = s->next) {
220 if (Comm::IsConnOpen(s->listenConn)) {
221 fds[nfds] = s->listenConn->fd;
222 ++nfds;
223 }
224 }
225
226 if (statCounter.comm_tcp.startPolling(nfds)) {
227 auto n = comm_check_incoming_select_handlers(nfds, fds);
228 statCounter.comm_tcp.finishPolling(n, Config.comm_incoming.tcp);
229 }
230 }
231
232 /* Select on all sockets; call handlers for those that are ready. */
233 Comm::Flag
234 Comm::DoSelect(int msec)
235 {
236 fd_set readfds;
237 fd_set pendingfds;
238 fd_set writefds;
239
240 PF *hdl = nullptr;
241 int fd;
242 int maxfd;
243 int num;
244 int pending;
245 int calldns = 0, calludp = 0, calltcp = 0;
246 int maxindex;
247 unsigned int k;
248 int j;
249 fd_mask *fdsp;
250 fd_mask *pfdsp;
251 fd_mask tmask;
252
253 struct timeval poll_time;
254 double timeout = current_dtime + (msec / 1000.0);
255 fde *F;
256
257 do {
258 double start;
259 getCurrentTime();
260 start = current_dtime;
261
262 if (statCounter.comm_udp.check())
263 comm_select_udp_incoming();
264
265 if (statCounter.comm_dns.check())
266 comm_select_dns_incoming();
267
268 if (statCounter.comm_tcp.check())
269 comm_select_tcp_incoming();
270
271 calldns = calludp = calltcp = 0;
272
273 maxfd = Biggest_FD + 1;
274
275 memcpy(&readfds, &global_readfds,
276 howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES);
277
278 memcpy(&writefds, &global_writefds,
279 howmany(maxfd, FD_MASK_BITS) * FD_MASK_BYTES);
280
281 /* remove stalled FDs, and deal with pending descriptors */
282 pending = 0;
283
284 FD_ZERO(&pendingfds);
285
286 maxindex = howmany(maxfd, FD_MASK_BITS);
287
288 fdsp = (fd_mask *) & readfds;
289
290 for (j = 0; j < maxindex; ++j) {
291 if ((tmask = fdsp[j]) == 0)
292 continue; /* no bits here */
293
294 for (k = 0; k < FD_MASK_BITS; ++k) {
295 if (!EBIT_TEST(tmask, k))
296 continue;
297
298 /* Found a set bit */
299 fd = (j * FD_MASK_BITS) + k;
300
301 if (FD_ISSET(fd, &readfds) && fd_table[fd].flags.read_pending) {
302 FD_SET(fd, &pendingfds);
303 ++pending;
304 }
305 }
306 }
307
308 if (nreadfds + nwritefds == 0) {
309 assert(shutting_down);
310 return Comm::SHUTDOWN;
311 }
312
313 if (msec > MAX_POLL_TIME)
314 msec = MAX_POLL_TIME;
315
316 if (pending)
317 msec = 0;
318
319 for (;;) {
320 poll_time.tv_sec = msec / 1000;
321 poll_time.tv_usec = (msec % 1000) * 1000;
322 ++ statCounter.syscalls.selects;
323 num = select(maxfd, &readfds, &writefds, nullptr, &poll_time);
324 int xerrno = errno;
325 ++ statCounter.select_loops;
326
327 if (num >= 0 || pending > 0)
328 break;
329
330 if (ignoreErrno(xerrno))
331 break;
332
333 debugs(5, DBG_CRITICAL, MYNAME << "select failure: " << xstrerr(xerrno));
334
335 examine_select(&readfds, &writefds);
336
337 return Comm::COMM_ERROR;
338
339 /* NOTREACHED */
340 }
341
342 if (num < 0 && !pending)
343 continue;
344
345 getCurrentTime();
346
347 debugs(5, num ? 5 : 8, "comm_select: " << num << "+" << pending << " FDs ready");
348
349 statCounter.select_fds_hist.count(num);
350
351 if (num == 0 && pending == 0)
352 continue;
353
354 /* Scan return fd masks for ready descriptors */
355 fdsp = (fd_mask *) & readfds;
356
357 pfdsp = (fd_mask *) & pendingfds;
358
359 maxindex = howmany(maxfd, FD_MASK_BITS);
360
361 for (j = 0; j < maxindex; ++j) {
362 if ((tmask = (fdsp[j] | pfdsp[j])) == 0)
363 continue; /* no bits here */
364
365 for (k = 0; k < FD_MASK_BITS; ++k) {
366 if (tmask == 0)
367 break; /* no more bits left */
368
369 if (!EBIT_TEST(tmask, k))
370 continue;
371
372 /* Found a set bit */
373 fd = (j * FD_MASK_BITS) + k;
374
375 EBIT_CLR(tmask, k); /* this will be done */
376
377 if (fdIsUdpListener(fd)) {
378 calludp = 1;
379 continue;
380 }
381
382 if (fdIsDns(fd)) {
383 calldns = 1;
384 continue;
385 }
386
387 if (fdIsTcpListener(fd)) {
388 calltcp = 1;
389 continue;
390 }
391
392 F = &fd_table[fd];
393 debugs(5, 6, "comm_select: FD " << fd << " ready for reading");
394
395 if (nullptr == (hdl = F->read_handler))
396 (void) 0;
397 else {
398 F->read_handler = nullptr;
399 commUpdateReadBits(fd, nullptr);
400 hdl(fd, F->read_data);
401 ++ statCounter.select_fds;
402
403 if (statCounter.comm_udp.check())
404 comm_select_udp_incoming();
405
406 if (statCounter.comm_dns.check())
407 comm_select_dns_incoming();
408
409 if (statCounter.comm_tcp.check())
410 comm_select_tcp_incoming();
411 }
412 }
413 }
414
415 fdsp = (fd_mask *) & writefds;
416
417 for (j = 0; j < maxindex; ++j) {
418 if ((tmask = fdsp[j]) == 0)
419 continue; /* no bits here */
420
421 for (k = 0; k < FD_MASK_BITS; ++k) {
422 if (tmask == 0)
423 break; /* no more bits left */
424
425 if (!EBIT_TEST(tmask, k))
426 continue;
427
428 /* Found a set bit */
429 fd = (j * FD_MASK_BITS) + k;
430
431 EBIT_CLR(tmask, k); /* this will be done */
432
433 if (fdIsUdpListener(fd)) {
434 calludp = 1;
435 continue;
436 }
437
438 if (fdIsDns(fd)) {
439 calldns = 1;
440 continue;
441 }
442
443 if (fdIsTcpListener(fd)) {
444 calltcp = 1;
445 continue;
446 }
447
448 F = &fd_table[fd];
449 debugs(5, 6, "comm_select: FD " << fd << " ready for writing");
450
451 if ((hdl = F->write_handler)) {
452 F->write_handler = nullptr;
453 commUpdateWriteBits(fd, nullptr);
454 hdl(fd, F->write_data);
455 ++ statCounter.select_fds;
456
457 if (statCounter.comm_udp.check())
458 comm_select_udp_incoming();
459
460 if (statCounter.comm_dns.check())
461 comm_select_dns_incoming();
462
463 if (statCounter.comm_tcp.check())
464 comm_select_tcp_incoming();
465 }
466 }
467 }
468
469 if (calludp)
470 comm_select_udp_incoming();
471
472 if (calldns)
473 comm_select_dns_incoming();
474
475 if (calltcp)
476 comm_select_tcp_incoming();
477
478 getCurrentTime();
479
480 statCounter.select_time += (current_dtime - start);
481
482 return Comm::OK;
483 } while (timeout > current_dtime);
484 debugs(5, 8, "comm_select: time out: " << squid_curtime);
485
486 return Comm::TIMEOUT;
487 }
488
489 static void
490 comm_select_dns_incoming(void)
491 {
492 int nfds = 0;
493 int fds[3];
494
495 if (DnsSocketA >= 0) {
496 fds[nfds] = DnsSocketA;
497 ++nfds;
498 }
499
500 if (DnsSocketB >= 0) {
501 fds[nfds] = DnsSocketB;
502 ++nfds;
503 }
504
505 if (statCounter.comm_dns.startPolling(nfds)) {
506 auto n = comm_check_incoming_select_handlers(nfds, fds);
507 statCounter.comm_dns.finishPolling(n, Config.comm_incoming.dns);
508 }
509 }
510
511 void
512 Comm::SelectLoopInit(void)
513 {
514 zero_tv.tv_sec = 0;
515 zero_tv.tv_usec = 0;
516 FD_ZERO(&global_readfds);
517 FD_ZERO(&global_writefds);
518 nreadfds = nwritefds = 0;
519
520 Mgr::RegisterAction("comm_select_incoming",
521 "comm_incoming() stats",
522 commIncomingStats, 0, 1);
523 }
524
525 /*
526 * examine_select - debug routine.
527 *
528 * I spend the day chasing this core dump that occurs when both the client
529 * and the server side of a cache fetch simultaneoulsy abort the
530 * connection. While I haven't really studied the code to figure out how
531 * it happens, the snippet below may prevent the cache from exitting:
532 *
533 * Call this from where the select loop fails.
534 */
535 static int
536 examine_select(fd_set * readfds, fd_set * writefds)
537 {
538 int fd = 0;
539 fd_set read_x;
540 fd_set write_x;
541
542 struct timeval tv;
543 AsyncCall::Pointer ch = nullptr;
544 fde *F = nullptr;
545
546 struct stat sb;
547 debugs(5, DBG_CRITICAL, "examine_select: Examining open file descriptors...");
548
549 for (fd = 0; fd < Squid_MaxFD; ++fd) {
550 FD_ZERO(&read_x);
551 FD_ZERO(&write_x);
552 tv.tv_sec = tv.tv_usec = 0;
553
554 if (FD_ISSET(fd, readfds))
555 FD_SET(fd, &read_x);
556 else if (FD_ISSET(fd, writefds))
557 FD_SET(fd, &write_x);
558 else
559 continue;
560
561 ++ statCounter.syscalls.selects;
562 errno = 0;
563
564 if (!fstat(fd, &sb)) {
565 debugs(5, 5, "FD " << fd << " is valid.");
566 continue;
567 }
568 int xerrno = errno;
569
570 F = &fd_table[fd];
571 debugs(5, DBG_CRITICAL, "fstat(FD " << fd << "): " << xstrerr(xerrno));
572 debugs(5, DBG_CRITICAL, "WARNING: FD " << fd << " has handlers, but it's invalid.");
573 debugs(5, DBG_CRITICAL, "FD " << fd << " is a " << fdTypeStr[F->type] << " called '" << F->desc << "'");
574 debugs(5, DBG_CRITICAL, "tmout:" << F->timeoutHandler << " read:" << F->read_handler << " write:" << F->write_handler);
575
576 for (ch = F->closeHandler; ch != nullptr; ch = ch->Next())
577 debugs(5, DBG_CRITICAL, " close handler: " << ch);
578
579 if (F->closeHandler != nullptr) {
580 commCallCloseHandlers(fd);
581 } else if (F->timeoutHandler != nullptr) {
582 debugs(5, DBG_CRITICAL, "examine_select: Calling Timeout Handler");
583 ScheduleCallHere(F->timeoutHandler);
584 }
585
586 F->closeHandler = nullptr;
587 F->timeoutHandler = nullptr;
588 F->read_handler = nullptr;
589 F->write_handler = nullptr;
590 FD_CLR(fd, readfds);
591 FD_CLR(fd, writefds);
592 }
593
594 return 0;
595 }
596
597 static void
598 commIncomingStats(StoreEntry * sentry)
599 {
600 storeAppendPrintf(sentry, "Current incoming_udp_interval: %d\n",
601 statCounter.comm_udp.interval >> Comm::Incoming::Factor);
602 storeAppendPrintf(sentry, "Current incoming_dns_interval: %d\n",
603 statCounter.comm_dns.interval >> Comm::Incoming::Factor);
604 storeAppendPrintf(sentry, "Current incoming_tcp_interval: %d\n",
605 statCounter.comm_tcp.interval >> Comm::Incoming::Factor);
606 storeAppendPrintf(sentry, "\n");
607 storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");
608 storeAppendPrintf(sentry, "ICP Messages handled per comm_select_udp_incoming() call:\n");
609 statCounter.comm_udp.history.dump(sentry, statHistIntDumper);
610 storeAppendPrintf(sentry, "DNS Messages handled per comm_select_dns_incoming() call:\n");
611 statCounter.comm_dns.history.dump(sentry, statHistIntDumper);
612 storeAppendPrintf(sentry, "HTTP Messages handled per comm_select_tcp_incoming() call:\n");
613 statCounter.comm_tcp.history.dump(sentry, statHistIntDumper);
614 }
615
616 void
617 commUpdateReadBits(int fd, PF * handler)
618 {
619 if (handler && !FD_ISSET(fd, &global_readfds)) {
620 FD_SET(fd, &global_readfds);
621 ++nreadfds;
622 } else if (!handler && FD_ISSET(fd, &global_readfds)) {
623 FD_CLR(fd, &global_readfds);
624 --nreadfds;
625 }
626 }
627
628 void
629 commUpdateWriteBits(int fd, PF * handler)
630 {
631 if (handler && !FD_ISSET(fd, &global_writefds)) {
632 FD_SET(fd, &global_writefds);
633 ++nwritefds;
634 } else if (!handler && FD_ISSET(fd, &global_writefds)) {
635 FD_CLR(fd, &global_writefds);
636 --nwritefds;
637 }
638 }
639
640 /* Called by async-io or diskd to speed up the polling */
641 void
642 Comm::QuickPollRequired(void)
643 {
644 MAX_POLL_TIME = 10;
645 }
646
647 #endif /* USE_SELECT */
648