]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm.cc
Import IPv6 support from squid3-ipv6 branch to 3-HEAD.
[thirdparty/squid.git] / src / comm.cc
1
2 /*
3 * $Id: comm.cc,v 1.439 2007/12/14 23:11:46 amosjeffries Exp $
4 *
5 * DEBUG: section 5 Socket Functions
6 * AUTHOR: Harvest Derived
7 *
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 *
35 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
36 */
37
38 #include "squid.h"
39 #include "StoreIOBuffer.h"
40 #include "comm.h"
41 #include "event.h"
42 #include "fde.h"
43 #include "CommIO.h"
44 #include "CommRead.h"
45 #include "ConnectionDetail.h"
46 #include "MemBuf.h"
47 #include "pconn.h"
48 #include "SquidTime.h"
49 #include "IPAddress.h"
50
51 #if defined(_SQUID_CYGWIN_)
52 #include <sys/ioctl.h>
53 #endif
54 #ifdef HAVE_NETINET_TCP_H
55 #include <netinet/tcp.h>
56 #endif
57
58 /*
59 * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
60 */
61
62 typedef enum {
63 IOCB_NONE,
64 IOCB_READ,
65 IOCB_WRITE
66 } iocb_type;
67
68 struct _comm_io_callback {
69 iocb_type type;
70 int fd;
71 IOCB *callback;
72 void *callback_data;
73 char *buf;
74 FREE *freefunc;
75 int size;
76 int offset;
77 bool active;
78 bool completed;
79 comm_err_t errcode;
80 int xerrno;
81 dlink_node node;
82 };
83 typedef struct _comm_io_callback comm_io_callback_t;
84
85 struct _comm_fd {
86 int fd;
87 comm_io_callback_t readcb;
88 comm_io_callback_t writecb;
89 };
90 typedef struct _comm_fd comm_fd_t;
91 comm_fd_t *commfd_table;
92
93 dlink_list commfd_completed_events;
94
95 bool
96 commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
97 {
98 assert(ccb->fd == fd);
99 assert(ccb->type == type);
100 return ccb->active == true;
101 }
102
103 /*
104 * Set the given handler and mark active
105 *
106 * @param fd filedescriptor
107 * @param ccb comm io callback
108 * @param cb callback
109 * @param cbdata callback data (must be cbdata'ed)
110 * @param buf buffer, if applicable
111 * @param freefunc freefunc, if applicable
112 * @param size buffer size
113 */
114 void
115 commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, IOCB *cb, void *cbdata, char *buf, FREE *freefunc, int size)
116 {
117 assert(ccb->active == false);
118 assert(ccb->type == type);
119 ccb->fd = fd;
120 ccb->callback = cb;
121 ccb->callback_data = cbdataReference(cbdata);
122 ccb->buf = buf;
123 ccb->freefunc = freefunc;
124 ccb->size = size;
125 ccb->active = true;
126 ccb->completed = false;
127 ccb->offset = 0;
128 }
129
130
131 /*
132 * Complete the callback
133 *
134 * Someone may have already called this function once on a non-completed callback.
135 * This happens in the comm_close() routine - the IO may have completed
136 * but comm_close() is called bfeore teh callback has been called.
137 * In this case, leave the details the same (offset, for example) but just update
138 * the error codes.
139 */
140 void
141 commio_complete_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
142 {
143 debugs(5, 3, "commio_complete_callback: called for " << fd << " (" << code << ", " << xerrno << ")");
144 assert(ccb->active == true);
145 assert(ccb->fd == fd);
146 ccb->errcode = code;
147 ccb->xerrno = xerrno;
148 if (! ccb->completed)
149 dlinkAddTail(ccb, &ccb->node, &commfd_completed_events);
150 ccb->completed = true;
151 }
152
153
154 /*
155 * Cancel the given callback
156 *
157 * Remember that the data is cbdataRef'ed.
158 */
159 void
160 commio_cancel_callback(int fd, comm_io_callback_t *ccb)
161 {
162 debugs(5, 3, "commio_cancel_callback: called for " << fd);
163 assert(ccb->fd == fd);
164 assert(ccb->active == true);
165
166 if (ccb->completed == true) {
167 dlinkDelete(&ccb->node, &commfd_completed_events);
168 }
169 if (ccb->callback_data)
170 cbdataReferenceDone(ccb->callback_data);
171
172 ccb->xerrno = 0;
173 ccb->active = false;
174 ccb->completed = false;
175 ccb->callback = NULL;
176 ccb->callback_data = NULL;
177 }
178
179 /*
180 * Call the given comm callback; assumes the callback is valid.
181 *
182 * @param ccb io completion callback
183 */
184 void
185 commio_call_callback(comm_io_callback_t *ccb)
186 {
187 comm_io_callback_t cb = *ccb;
188 void *cbdata;
189 assert(cb.active == true);
190 assert(cb.completed == true);
191 debugs(5, 3, "commio_call_callback: called for " << ccb->fd);
192
193 /* We've got a copy; blow away the real one */
194 /* XXX duplicate code from commio_cancel_callback! */
195 dlinkDelete(&ccb->node, &commfd_completed_events);
196 ccb->xerrno = 0;
197 ccb->active = false;
198 ccb->completed = false;
199 ccb->callback = NULL;
200 ccb->callback_data = NULL;
201
202 /* free data */
203 if (cb.freefunc) {
204 cb.freefunc(cb.buf);
205 cb.buf = NULL;
206 }
207 if (cb.callback && cbdataReferenceValidDone(cb.callback_data, &cbdata)) {
208 /* XXX truely ugly for now! */
209 cb.callback(cb.fd, cb.buf, cb.offset, cb.errcode, cb.xerrno, cbdata);
210 }
211 }
212
213 void
214 commio_call_callbacks(void)
215 {
216 comm_io_callback_t *ccb;
217 while (commfd_completed_events.head != NULL) {
218 ccb = (comm_io_callback_t *) commfd_completed_events.head->data;
219 commio_call_callback(ccb);
220 }
221 }
222
223
224 class ConnectStateData
225 {
226
227 public:
228 void *operator new (size_t);
229 void operator delete (void *);
230 static void Connect (int fd, void *me);
231 void connect();
232 void callCallback(comm_err_t status, int xerrno);
233 void defaults();
234
235 // defaults given by client
236 char *host;
237 u_short default_port;
238 IPAddress default_addr;
239 // NP: CANNOT store the default addr:port together as it gets set/reset differently.
240
241 IPAddress S;
242 CallBack<CNCB> callback;
243
244 int fd;
245 int tries;
246 int addrcount;
247 int connstart;
248
249 private:
250 int commResetFD();
251 int commRetryConnect();
252 CBDATA_CLASS(ConnectStateData);
253 };
254
255 /* STATIC */
256
257 static comm_err_t commBind(int s, struct addrinfo &);
258 static void commSetReuseAddr(int);
259 static void commSetNoLinger(int);
260 #ifdef TCP_NODELAY
261 static void commSetTcpNoDelay(int);
262 #endif
263 static void commSetTcpRcvbuf(int, int);
264 static PF commConnectFree;
265 static PF commHandleWrite;
266 static IPH commConnectDnsHandle;
267 static void requireOpenAndActive(int const fd);
268
269 static PF comm_accept_try;
270
271 class AcceptFD
272 {
273
274 public:
275 AcceptFD() : count(0), finished_(false){}
276
277 void doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *);
278 void nullCallback();
279 void beginAccepting() {count = 0; finished(false);}
280
281 size_t acceptCount() const { return count;}
282
283 bool finishedAccepting() const;
284 CallBack<IOACB> callback;
285 bool finished() const;
286 void finished(bool);
287
288 private:
289 static size_t const MAX_ACCEPT_PER_LOOP;
290 size_t count;
291 bool finished_;
292 };
293
294 size_t const AcceptFD::MAX_ACCEPT_PER_LOOP(10);
295
296 class fdc_t
297 {
298
299 public:
300 void acceptOne(int fd);
301 void beginAccepting();
302 int acceptCount() const;
303 fdc_t() : active(0), fd(-1), half_closed (false){CommCallbackList.head = NULL;CommCallbackList.tail = NULL; }
304
305 fdc_t(int anFD) : active(0), fd(anFD), half_closed(false)
306 {
307 CommCallbackList.head = NULL;
308 CommCallbackList.tail = NULL;
309 }
310
311 int active;
312 int fd;
313 dlink_list CommCallbackList;
314
315 template<class P>
316 bool findCallback(P predicate);
317
318 class Accept
319 {
320
321 public:
322 AcceptFD accept;
323 ConnectionDetail connDetails;
324 };
325
326 Accept accept;
327
328 bool half_closed;
329 };
330
331 typedef enum {
332 COMM_CB_READ = 1,
333 COMM_CB_DERIVED,
334 } comm_callback_t;
335
336 static int CommCallbackSeqnum = 1;
337
338 class CommCommonCallback
339 {
340
341 public:
342 CommCommonCallback() : fd (-1), errcode (COMM_OK), xerrno(0), seqnum (CommCallbackSeqnum){}
343
344 CommCommonCallback(int anFD, comm_err_t errcode, int anErrno) : fd (anFD), errcode (errcode), xerrno(anErrno), seqnum (CommCallbackSeqnum){}
345
346 int fd;
347 comm_err_t errcode;
348 int xerrno;
349 int seqnum;
350 };
351
352 class CommCallbackData
353 {
354
355 public:
356 MEMPROXY_CLASS(CommCallbackData);
357 CommCallbackData(CommCommonCallback const &);
358 virtual ~CommCallbackData() {}
359
360 virtual comm_callback_t getType() const { return COMM_CB_DERIVED; }
361
362 void callACallback();
363 void fdClosing();
364 virtual void callCallback() = 0;
365 void registerSelf();
366 void deRegisterSelf();
367 char *buf;
368 StoreIOBuffer sb;
369
370 protected:
371 CommCommonCallback result;
372 friend void _comm_close(int fd, char const *file, int line);
373 friend void comm_calliocallback(void);
374
375 private:
376 dlink_node fd_node;
377 dlink_node h_node;
378 };
379
380 MEMPROXY_CLASS_INLINE(CommCallbackData)
381
382 class CommAcceptCallbackData : public CommCallbackData
383 {
384
385 public:
386 MEMPROXY_CLASS(CommAcceptCallbackData);
387 CommAcceptCallbackData(int const anFd, CallBack<IOACB>, comm_err_t, int, int, ConnectionDetail const &);
388 virtual void callCallback();
389
390 private:
391 CallBack<IOACB> callback;
392 int newfd;
393 ConnectionDetail details;
394 };
395
396 MEMPROXY_CLASS_INLINE(CommAcceptCallbackData)
397
398 class CommFillCallbackData : public CommCallbackData
399 {
400
401 public:
402 MEMPROXY_CLASS(CommFillCallbackData);
403 CommFillCallbackData(int const anFd, CallBack<IOFCB> aCallback, comm_err_t, int);
404 virtual void callCallback();
405
406 private:
407 CallBack<IOFCB> callback;
408 };
409
410 MEMPROXY_CLASS_INLINE(CommFillCallbackData)
411
412 struct _fd_debug_t
413 {
414 char const *close_file;
415 int close_line;
416 };
417
418 typedef struct _fd_debug_t fd_debug_t;
419
420 static MemAllocator *conn_close_pool = NULL;
421 fdc_t *fdc_table = NULL;
422 fd_debug_t *fdd_table = NULL;
423 dlink_list CommCallbackList;
424
425
426 /* New and improved stuff */
427
428 CommCallbackData::CommCallbackData(CommCommonCallback const &newResults) : result (newResults)
429 {
430 assert(fdc_table[result.fd].active == 1);
431 registerSelf();
432 }
433
434 CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack<IOACB> aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails)
435 {}
436
437 void
438 CommCallbackData::registerSelf()
439 {
440 /* Add it to the end of the list */
441 dlinkAddTail(this, &h_node, &CommCallbackList);
442
443 /* and add it to the end of the fd list */
444 dlinkAddTail(this, &fd_node, &(fdc_table[result.fd].CommCallbackList));
445 }
446
447 void
448 CommCallbackData::deRegisterSelf()
449 {
450 dlinkDelete(&h_node, &CommCallbackList);
451 dlinkDelete(&fd_node, &(fdc_table[result.fd].CommCallbackList));
452 }
453
454 /**
455 * add an IO callback
456 *
457 * IO callbacks are added when we want to notify someone that some IO
458 * has finished but we don't want to risk re-entering a non-reentrant
459 * code block.
460 */
461 void
462 CommAcceptCallbackData::callCallback()
463 {
464 PROF_start(CommAcceptCallbackData_callCallback);
465 callback.handler(result.fd, newfd, &details, result.errcode, result.xerrno, callback.data);
466 PROF_stop(CommAcceptCallbackData_callCallback);
467 }
468
469 void
470 CommCallbackData::fdClosing()
471 {
472 result.errcode = COMM_ERR_CLOSING;
473 }
474
475 void
476 CommCallbackData::callACallback()
477 {
478 assert(fdc_table[result.fd].active == 1);
479 deRegisterSelf();
480 callCallback();
481 }
482
483 /**
484 * call the IO callbacks
485 *
486 * This should be called before comm_select() so code can attempt to
487 * initiate some IO.
488 *
489 * When io callbacks are added, they are added with the current
490 * sequence number. The sequence number is incremented in this routine -
491 * since callbacks are added to the _tail_ of the list, when we hit a
492 * callback with a seqnum _not_ what it was when we entered this routine,
493 * we can stop.
494 */
495 void
496 comm_calliocallback(void)
497 {
498 CommCallbackData *cio;
499 int oldseqnum = CommCallbackSeqnum++;
500
501 /* Call our callbacks until we hit NULL or the seqnum changes */
502
503 /* This will likely rap other counts - again, thats ok (for now)
504 * What we should see is the total of the various callback subclasses
505 * equaling this counter.
506 * If they don't, someone has added a class but not profiled it.
507 */
508 PROF_start(comm_calliocallback);
509
510 debugs(5, 7, "comm_calliocallback: " << CommCallbackList.head);
511
512 while (CommCallbackList.head != NULL && oldseqnum != ((CommCallbackData *)CommCallbackList.head->data)->result.seqnum) {
513 dlink_node *node = (dlink_node *)CommCallbackList.head;
514 cio = (CommCallbackData *)node->data;
515 cio->callACallback();
516 delete cio;
517 }
518
519 PROF_stop(comm_calliocallback);
520 }
521
522 bool
523 comm_iocallbackpending(void)
524 {
525 debugs(5, 7, "comm_iocallbackpending: " << CommCallbackList.head);
526 return (CommCallbackList.head != NULL) || (commfd_completed_events.head != NULL);
527 }
528
529 /**
530 * Attempt a read
531 *
532 * If the read attempt succeeds or fails, call the callback.
533 * Else, wait for another IO notification.
534 */
535 void
536 commHandleRead(int fd, void *data)
537 {
538 comm_io_callback_t *ccb = (comm_io_callback_t *) data;
539
540 assert(data == COMMIO_FD_READCB(fd));
541 assert(commio_has_callback(fd, IOCB_READ, ccb));
542 /* Attempt a read */
543 statCounter.syscalls.sock.reads++;
544 errno = 0;
545 int retval;
546 retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
547 debugs(5, 3, "comm_read_try: FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
548
549 if (retval < 0 && !ignoreErrno(errno)) {
550 debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
551 ccb->offset = 0;
552 commio_complete_callback(fd, ccb, COMM_ERROR, errno);
553 return;
554 };
555
556 /* See if we read anything */
557 /* Note - read 0 == socket EOF, which is a valid read */
558 if (retval >= 0) {
559 fd_bytes(fd, retval, FD_READ);
560 ccb->offset = retval;
561 commio_complete_callback(fd, ccb, COMM_OK, errno);
562 return;
563 }
564
565 /* Nope, register for some more IO */
566 commSetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0);
567 }
568
569 /**
570 * Queue a read. handler/handler_data are called when the read
571 * completes, on error, or on file descriptor close.
572 */
573 void
574 comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
575 {
576 /* Make sure we're not reading anything and we're not closing */
577 assert(fdc_table[fd].active == 1);
578 assert(!fd_table[fd].flags.closing);
579
580 debugs(5, 4, "comm_read, queueing read for FD " << fd);
581
582 /* Queue the read */
583 /* XXX ugly */
584 commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), handler, handler_data, (char *)buf, NULL, size);
585 commSetSelect(fd, COMM_SELECT_READ, commHandleRead, COMMIO_FD_READCB(fd), 0);
586 }
587
588 /**
589 * Empty the read buffers
590 *
591 * This is a magical routine that empties the read buffers.
592 * Under some platforms (Linux) if a buffer has data in it before
593 * you call close(), the socket will hang and take quite a while
594 * to timeout.
595 */
596 static void
597 comm_empty_os_read_buffers(int fd)
598 {
599 #ifdef _SQUID_LINUX_
600 /* prevent those nasty RST packets */
601 char buf[SQUID_TCP_SO_RCVBUF];
602
603 if (fd_table[fd].flags.nonblocking == 1) {
604 while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0) {};
605 }
606 #endif
607 }
608
609 static void
610 requireOpenAndActive(int const fd)
611 {
612 assert(fd_table[fd].flags.open == 1);
613 assert(fdc_table[fd].active == 1);
614 }
615
616 /**
617 * Return whether the FD has a pending completed callback.
618 */
619 int
620 comm_has_pending_read_callback(int fd)
621 {
622 requireOpenAndActive(fd);
623 return COMMIO_FD_READCB(fd)->active && COMMIO_FD_READCB(fd)->completed;
624 }
625
626 template <class P>
627 bool
628 fdc_t::findCallback(P predicate)
629 {
630 /*
631 * XXX I don't like having to walk the list!
632 * Instead, if this routine is called often enough, we should
633 * also maintain a linked list of _read_ events - we can just
634 * check if the list head a HEAD..
635 * - adrian
636 */
637 dlink_node *node = CommCallbackList.head;
638
639 while (node != NULL) {
640 if (predicate((CommCallbackData *)node->data))
641 return true;
642
643 node = node->next;
644 }
645
646 /* Not found */
647 return false;
648 }
649
650 /**
651 * return whether a file descriptor has a read handler
652 *
653 * Assumptions: the fd is open
654 * the fd is a comm fd.
655 *
656 * Again - is this "pending read", or "pending completed event", or what?
657 * I'll assume its pending read, not pending completed.
658 *
659 * This makes no sense though - if this is called to check whether there's
660 * a pending read -before- submitting a read then it won't matter whether
661 * its completed or not! Ie:
662 *
663 * + if there's no read and you want to schedule one; fine.
664 * + if a read has completed then the callback block has been deactivated before
665 * the callback is called - if something decides to register for a read
666 * callback once again it should find !active and !completed.
667 * + scheduling a read event when the fd is ! active -and- completed, thats
668 * a bug
669 * + like, afaict, anything else is.
670 */
671 bool
672 comm_has_pending_read(int fd)
673 {
674 requireOpenAndActive(fd);
675 return COMMIO_FD_READCB(fd)->active && (! COMMIO_FD_READCB(fd)->completed);
676 }
677
678 /**
679 * Cancel a pending read. Assert that we have the right parameters,
680 * and that there are no pending read events!
681 *
682 * AHC Don't call the comm handlers?
683 */
684 void
685 comm_read_cancel(int fd, IOCB *callback, void *data)
686 {
687 requireOpenAndActive(fd);
688
689 /* Ok, we can be reasonably sure we won't lose any data here! */
690 assert(COMMIO_FD_READCB(fd)->callback == callback);
691 assert(COMMIO_FD_READCB(fd)->callback_data == data);
692
693 /* Delete the callback */
694 commio_cancel_callback(fd, COMMIO_FD_READCB(fd));
695
696 /* And the IO event */
697 commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
698 }
699
700
701 /**
702 * Open a filedescriptor, set some sane defaults
703 * XXX DPW 2006-05-30 what is the point of this?
704 */
705 void
706 fdc_open(int fd, unsigned int type, char const *desc)
707 {
708 assert(fdc_table[fd].active == 0);
709
710 fdc_table[fd].active = 1;
711 fdc_table[fd].fd = fd;
712 fd_open(fd, type, desc);
713 }
714
715
716 /**
717 * synchronous wrapper around udp socket functions
718 */
719 int
720 comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, IPAddress &from)
721 {
722 statCounter.syscalls.sock.recvfroms++;
723 int x = 0;
724 struct addrinfo *AI = NULL;
725
726 debugs(5,8, "comm_udp_recvfrom: FD " << fd << " from " << from);
727
728 assert( NULL == AI );
729
730 from.InitAddrInfo(AI);
731
732 x = recvfrom(fd, buf, len, flags, AI->ai_addr, &AI->ai_addrlen);
733
734 from = *AI;
735
736 from.FreeAddrInfo(AI);
737
738 return x;
739 }
740
741 int
742 comm_udp_recv(int fd, void *buf, size_t len, int flags)
743 {
744 IPAddress nul;
745 return comm_udp_recvfrom(fd, buf, len, flags, nul);
746 }
747
748 ssize_t
749 comm_udp_send(int s, const void *buf, size_t len, int flags)
750 {
751 return send(s, buf, len, flags);
752 }
753
754
755 bool
756 comm_has_incomplete_write(int fd)
757 {
758 requireOpenAndActive(fd);
759 return COMMIO_FD_WRITECB(fd)->active;
760 }
761
762 /**
763 * Queue a write. handler/handler_data are called when the write fully
764 * completes, on error, or on file descriptor close.
765 */
766
767 /* Return the local port associated with fd. */
768 u_short
769 comm_local_port(int fd)
770 {
771 IPAddress temp;
772 struct addrinfo *addr = NULL;
773 fde *F = &fd_table[fd];
774
775 /* If the fd is closed already, just return */
776
777 if (!F->flags.open) {
778 debugs(5, 0, "comm_local_port: FD " << fd << " has been closed.");
779 return 0;
780 }
781
782 if (F->local_addr.GetPort())
783 return F->local_addr.GetPort();
784
785 temp.InitAddrInfo(addr);
786
787 if (getsockname(fd, addr->ai_addr, &(addr->ai_addrlen)) ) {
788 debugs(50, 1, "comm_local_port: Failed to retrieve TCP/UDP port number for socket: FD " << fd << ": " << xstrerror());
789 temp.FreeAddrInfo(addr);
790 return 0;
791 }
792 temp = *addr;
793
794 temp.FreeAddrInfo(addr);
795
796 F->local_addr.SetPort(temp.GetPort());
797
798 // grab default socket information for this address
799 temp.GetAddrInfo(addr);
800
801 F->sock_family = addr->ai_family;
802
803 temp.FreeAddrInfo(addr);
804
805 debugs(5, 6, "comm_local_port: FD " << fd << ": port " << F->local_addr.GetPort());
806 return F->local_addr.GetPort();
807 }
808
809 static comm_err_t
810 commBind(int s, struct addrinfo &inaddr)
811 {
812 statCounter.syscalls.sock.binds++;
813
814 if (bind(s, inaddr.ai_addr, inaddr.ai_addrlen) == 0)
815 return COMM_OK;
816
817 debugs(50, 0, "commBind: Cannot bind socket FD " << s << " to " << fd_table[s].local_addr << ": " << xstrerror());
818
819 return COMM_ERROR;
820 }
821
822 /**
823 * Create a socket. Default is blocking, stream (TCP) socket. IO_TYPE
824 * is OR of flags specified in comm.h. Defaults TOS
825 */
826 int
827 comm_open(int sock_type,
828 int proto,
829 IPAddress &addr,
830 int flags,
831 const char *note)
832 {
833 return comm_openex(sock_type, proto, addr, flags, 0, note);
834 }
835
836 static bool
837 limitError(int const anErrno)
838 {
839 return anErrno == ENFILE || anErrno == EMFILE;
840 }
841
842 int
843 comm_set_tos(int fd, int tos)
844 {
845 #ifdef IP_TOS
846 int x = setsockopt(fd, IPPROTO_IP, IP_TOS, (char *) &tos, sizeof(int));
847 if (x < 0)
848 debugs(50, 1, "comm_set_tos: setsockopt(IP_TOS) on FD " << fd << ": " << xstrerror());
849 return x;
850 #else
851 debugs(50, 0, "WARNING: setsockopt(IP_TOS) not supported on this platform");
852 return -1;
853 #endif
854 }
855
856 void
857 comm_set_v6only(int fd, int tos)
858 {
859 #ifdef IPV6_V6ONLY
860 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &tos, sizeof(int)) < 0) {
861 debugs(50, 1, "comm_open: setsockopt(IPV6_V6ONLY) on FD " << fd << ": " << xstrerror());
862 }
863 #else
864 debugs(50, 0, "WARNING: comm_open: setsockopt(IPV6_V6ONLY) not supported on this platform");
865 #endif /* sockopt */
866 }
867
868 /**
869 * Create a socket. Default is blocking, stream (TCP) socket. IO_TYPE
870 * is OR of flags specified in defines.h:COMM_*
871 */
872 int
873 comm_openex(int sock_type,
874 int proto,
875 IPAddress &addr,
876 int flags,
877 unsigned char TOS,
878 const char *note)
879 {
880 int new_socket;
881 fde *F = NULL;
882 int tos = 0;
883 struct addrinfo *AI = NULL;
884
885 PROF_start(comm_open);
886 /* Create socket for accepting new connections. */
887 statCounter.syscalls.sock.sockets++;
888
889 /* Setup the socket addrinfo details for use */
890 addr.GetAddrInfo(AI);
891 AI->ai_socktype = sock_type;
892 AI->ai_protocol = proto;
893 AI->ai_flags = flags;
894
895 debugs(50, 3, "comm_openex: Attempt open socket for: " << addr );
896
897 if ((new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol)) < 0)
898 {
899 /* Increase the number of reserved fd's if calls to socket()
900 * are failing because the open file table is full. This
901 * limits the number of simultaneous clients */
902
903 if (limitError(errno)) {
904 debugs(50, 1, "comm_open: socket failure: " << xstrerror());
905 fdAdjustReserved();
906 } else {
907 debugs(50, 0, "comm_open: socket failure: " << xstrerror());
908 }
909
910 addr.FreeAddrInfo(AI);
911
912 PROF_stop(comm_open);
913 return -1;
914 }
915
916 debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI->ai_family << ", type=" << AI->ai_socktype << ", protocol=" << AI->ai_protocol );
917
918 /* set TOS if needed */
919 if (TOS && comm_set_tos(new_socket, TOS) ) {
920 tos = TOS;
921 }
922
923 #if IPV6_SPECIAL_SPLITSTACK
924
925 if( addr.IsIPv6() )
926 comm_set_v6only(new_socket, tos);
927
928 #endif
929
930 #if IPV6_SPECIAL_V4MAPPED && defined(_SQUID_MSWIN_)
931
932 /* Windows Vista supports Dual-Sockets. BUT defaults them to V6ONLY. Turn it OFF. */
933 if( addr.IsIPv6() )
934 comm_set_v6only(new_socket, 0);
935
936 #endif
937
938 /* update fdstat */
939 debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
940
941 fd_open(new_socket, FD_SOCKET, note);
942
943 fdd_table[new_socket].close_file = NULL;
944
945 fdd_table[new_socket].close_line = 0;
946
947 assert(fdc_table[new_socket].active == 0);
948
949 fdc_table[new_socket].active = 1;
950
951 F = &fd_table[new_socket];
952
953 F->local_addr = addr;
954
955 F->tos = TOS;
956
957 F->sock_family = AI->ai_family;
958
959 if (!(flags & COMM_NOCLOEXEC))
960 commSetCloseOnExec(new_socket);
961
962 if ((flags & COMM_REUSEADDR))
963 commSetReuseAddr(new_socket);
964
965 if (addr.GetPort() > (u_short) 0)
966 {
967 #ifdef _SQUID_MSWIN_
968
969 if (sock_type != SOCK_DGRAM)
970 #endif
971
972 commSetNoLinger(new_socket);
973
974 if (opt_reuseaddr)
975 commSetReuseAddr(new_socket);
976 }
977
978 if (!addr.IsNoAddr())
979 {
980 if (commBind(new_socket, *AI) != COMM_OK) {
981 comm_close(new_socket);
982 addr.FreeAddrInfo(AI);
983 return -1;
984 PROF_stop(comm_open);
985 }
986 }
987
988 addr.FreeAddrInfo(AI);
989
990 if (flags & COMM_NONBLOCKING)
991 if (commSetNonBlocking(new_socket) == COMM_ERROR)
992 {
993 return -1;
994 PROF_stop(comm_open);
995 }
996
997 #ifdef TCP_NODELAY
998 if (sock_type == SOCK_STREAM)
999 commSetTcpNoDelay(new_socket);
1000
1001 #endif
1002
1003 if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
1004 commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
1005
1006 PROF_stop(comm_open);
1007
1008 return new_socket;
1009 }
1010
1011 CBDATA_CLASS_INIT(ConnectStateData);
1012
1013 void *
1014 ConnectStateData::operator new (size_t size)
1015 {
1016 CBDATA_INIT_TYPE(ConnectStateData);
1017 return cbdataAlloc(ConnectStateData);
1018 }
1019
1020 void
1021 ConnectStateData::operator delete (void *address)
1022 {
1023 cbdataFree(address);
1024 }
1025
1026 void
1027 commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
1028 {
1029 ConnectStateData *cs;
1030 debugs(5, 3, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
1031 cs = new ConnectStateData;
1032 cs->fd = fd;
1033 cs->host = xstrdup(host);
1034 cs->default_port = port;
1035 cs->callback = CallBack<CNCB>(callback, data);
1036 comm_add_close_handler(fd, commConnectFree, cs);
1037 ipcache_nbgethostbyname(host, commConnectDnsHandle, cs);
1038 }
1039
1040 static void
1041 commConnectDnsHandle(const ipcache_addrs * ia, void *data)
1042 {
1043 ConnectStateData *cs = (ConnectStateData *)data;
1044
1045 if (ia == NULL) {
1046 debugs(5, 3, "commConnectDnsHandle: Unknown host: " << cs->host);
1047
1048 if (!dns_error_message) {
1049 dns_error_message = "Unknown DNS error";
1050 debugs(5, 1, "commConnectDnsHandle: Bad dns_error_message");
1051 }
1052
1053 assert(dns_error_message != NULL);
1054 cs->callCallback(COMM_ERR_DNS, 0);
1055 return;
1056 }
1057
1058 assert(ia->cur < ia->count);
1059
1060 cs->default_addr = ia->in_addrs[ia->cur];
1061
1062 if (Config.onoff.balance_on_multiple_ip)
1063 ipcacheCycleAddr(cs->host, NULL);
1064
1065 cs->addrcount = ia->count;
1066
1067 cs->connstart = squid_curtime;
1068
1069 cs->connect();
1070 }
1071
1072 void
1073 ConnectStateData::callCallback(comm_err_t status, int xerrno)
1074 {
1075 debugs(5, 3, "commConnectCallback: FD " << fd << ", data " << callback.data << ", status " << status);
1076
1077 comm_remove_close_handler(fd, commConnectFree, this);
1078 CallBack<CNCB> aCallback = callback;
1079 callback = CallBack<CNCB>();
1080 commSetTimeout(fd, -1, NULL, NULL);
1081
1082 if (aCallback.dataValid())
1083 aCallback.handler(fd, status, xerrno, aCallback.data);
1084
1085 commConnectFree(fd, this);
1086 }
1087
1088 static void
1089 commConnectFree(int fd, void *data)
1090 {
1091 ConnectStateData *cs = (ConnectStateData *)data;
1092 debugs(5, 3, "commConnectFree: FD " << fd);
1093 cs->callback = CallBack<CNCB>();
1094 safe_free(cs->host);
1095 delete cs;
1096 }
1097
1098 static void
1099 copyFDFlags(int to, fde *F)
1100 {
1101 if (F->flags.close_on_exec)
1102 commSetCloseOnExec(to);
1103
1104 if (F->flags.nonblocking)
1105 commSetNonBlocking(to);
1106
1107 #ifdef TCP_NODELAY
1108
1109 if (F->flags.nodelay)
1110 commSetTcpNoDelay(to);
1111
1112 #endif
1113
1114 if (Config.tcpRcvBufsz > 0)
1115 commSetTcpRcvbuf(to, Config.tcpRcvBufsz);
1116 }
1117
1118 /* Reset FD so that we can connect() again */
1119 int
1120 ConnectStateData::commResetFD()
1121 {
1122 struct addrinfo *AI = NULL;
1123 IPAddress nul;
1124
1125 if (!cbdataReferenceValid(callback.data))
1126 return 0;
1127
1128 statCounter.syscalls.sock.sockets++;
1129
1130 /* setup a bare-bones addrinfo */
1131 nul.GetAddrInfo(AI);
1132
1133 int fd2 = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
1134
1135 nul.FreeAddrInfo(AI);
1136
1137 if (fd2 < 0) {
1138 debugs(5, 0, "commResetFD: socket: " << xstrerror());
1139
1140 if (ENFILE == errno || EMFILE == errno)
1141 fdAdjustReserved();
1142
1143 return 0;
1144 }
1145
1146 #ifdef _SQUID_MSWIN_
1147
1148 /* On Windows dup2() can't work correctly on Sockets, the */
1149 /* workaround is to close the destination Socket before call them. */
1150 close(fd);
1151
1152 #endif
1153
1154 if (dup2(fd2, fd) < 0) {
1155 debugs(5, 0, "commResetFD: dup2: " << xstrerror());
1156
1157 if (ENFILE == errno || EMFILE == errno)
1158 fdAdjustReserved();
1159
1160 close(fd2);
1161
1162 return 0;
1163 }
1164
1165 close(fd2);
1166 fde *F = &fd_table[fd];
1167 fd_table[fd].flags.called_connect = 0;
1168 /*
1169 * yuck, this has assumptions about comm_open() arguments for
1170 * the original socket
1171 */
1172
1173 AI = NULL;
1174 F->local_addr.GetAddrInfo(AI);
1175
1176 if (commBind(fd, *AI) != COMM_OK) {
1177 debugs(5, 0, "commResetFD: bind: " << xstrerror());
1178 F->local_addr.FreeAddrInfo(AI);
1179 return 0;
1180 }
1181 F->local_addr.FreeAddrInfo(AI);
1182
1183 if (F->tos)
1184 comm_set_tos(fd, F->tos);
1185
1186 #if IPV6_SPECIAL_SPLITSTACK
1187
1188 if( F->local_addr.IsIPv6() )
1189 comm_set_v6only(fd, F->tos);
1190
1191 #endif
1192
1193 copyFDFlags (fd, F);
1194
1195 return 1;
1196 }
1197
1198 int
1199 ConnectStateData::commRetryConnect()
1200 {
1201 assert(addrcount > 0);
1202
1203 if (addrcount == 1) {
1204 if (tries >= Config.retry.maxtries)
1205 return 0;
1206
1207 if (squid_curtime - connstart > Config.Timeout.connect)
1208 return 0;
1209 } else {
1210 if (tries > addrcount)
1211 return 0;
1212 }
1213
1214 return commResetFD();
1215 }
1216
1217 static void
1218 commReconnect(void *data)
1219 {
1220 ConnectStateData *cs = (ConnectStateData *)data;
1221 ipcache_nbgethostbyname(cs->host, commConnectDnsHandle, cs);
1222 }
1223
1224 /* Connect SOCK to specified DEST_PORT at DEST_HOST. */
1225 void
1226 ConnectStateData::Connect (int fd, void *me)
1227 {
1228 ConnectStateData *cs = (ConnectStateData *)me;
1229 assert (cs->fd == fd);
1230 cs->connect();
1231 }
1232
1233 void
1234 ConnectStateData::defaults()
1235 {
1236 S = default_addr;
1237 S.SetPort(default_port);
1238 }
1239
1240 void
1241 ConnectStateData::connect()
1242 {
1243 if (S.IsAnyAddr())
1244 defaults();
1245
1246 debugs(5,5, "ConnectSateData::connect: to " << S);
1247
1248 switch (comm_connect_addr(fd, S) ) {
1249
1250 case COMM_INPROGRESS:
1251 debugs(5, 5, "ConnectStateData::connect: FD " << fd << ": COMM_INPROGRESS");
1252 commSetSelect(fd, COMM_SELECT_WRITE, ConnectStateData::Connect, this, 0);
1253 break;
1254
1255 case COMM_OK:
1256 debugs(5, 5, "ConnectStateData::connect: FD " << fd << ": COMM_OK - connected");
1257 ipcacheMarkGoodAddr(host, S);
1258 callCallback(COMM_OK, 0);
1259 break;
1260
1261 default:
1262 debugs(5, 5, "ConnectStateData::connect: FD " << fd << ": * - try again");
1263 tries++;
1264 ipcacheMarkBadAddr(host, S);
1265
1266 if (Config.onoff.test_reachability)
1267 netdbDeleteAddrNetwork(S);
1268
1269 if (commRetryConnect()) {
1270 eventAdd("commReconnect", commReconnect, this, this->addrcount == 1 ? 0.05 : 0.0, 0);
1271 } else {
1272 debugs(5, 5, "ConnectStateData::connect: FD " << fd << ": * - ERR tried too many times already.");
1273 callCallback(COMM_ERR_CONNECT, errno);
1274 }
1275 }
1276 }
1277
1278 int
1279 commSetTimeout(int fd, int timeout, PF * handler, void *data)
1280 {
1281 debugs(5, 3, "commSetTimeout: FD " << fd << " timeout " << timeout);
1282 assert(fd >= 0);
1283 assert(fd < Squid_MaxFD);
1284 fde *F = &fd_table[fd];
1285 assert(F->flags.open);
1286
1287 if (timeout < 0) {
1288 cbdataReferenceDone(F->timeout_data);
1289 F->timeout_handler = NULL;
1290 F->timeout = 0;
1291 } else {
1292 if (handler) {
1293 cbdataReferenceDone(F->timeout_data);
1294 F->timeout_handler = handler;
1295 F->timeout_data = cbdataReference(data);
1296 }
1297
1298 F->timeout = squid_curtime + (time_t) timeout;
1299 }
1300
1301 return F->timeout;
1302 }
1303
1304 int
1305 comm_connect_addr(int sock, const IPAddress &address)
1306 {
1307 comm_err_t status = COMM_OK;
1308 fde *F = &fd_table[sock];
1309 int x = 0;
1310 int err = 0;
1311 socklen_t errlen;
1312 struct addrinfo *AI = NULL;
1313 PROF_start(comm_connect_addr);
1314
1315 assert(address.GetPort() != 0);
1316
1317 debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family <<
1318 ") Old-State=" << fdc_table[sock].active);
1319
1320 address.GetAddrInfo(AI, F->sock_family);
1321
1322 /* Establish connection. */
1323 errno = 0;
1324
1325 if (!F->flags.called_connect)
1326 {
1327 F->flags.called_connect = 1;
1328 statCounter.syscalls.sock.connects++;
1329
1330 x = connect(sock, AI->ai_addr, AI->ai_addrlen);
1331
1332 if (x < 0)
1333 {
1334 debugs(5,5, "comm_connect_addr: sock=" << sock << ", addrinfo( " <<
1335 " flags=" << AI->ai_flags <<
1336 ", family=" << AI->ai_family <<
1337 ", socktype=" << AI->ai_socktype <<
1338 ", protocol=" << AI->ai_protocol <<
1339 ", &addr=" << AI->ai_addr <<
1340 ", addrlen=" << AI->ai_addrlen <<
1341 " )" );
1342 debugs(5, 9, "connect FD " << sock << ": (" << x << ") " << xstrerror());
1343 debugs(14,9, "connecting to: " << address );
1344 }
1345 } else
1346 {
1347 #if defined(_SQUID_NEWSOS6_)
1348 /* Makoto MATSUSHITA <matusita@ics.es.osaka-u.ac.jp> */
1349
1350 connect(sock, AI->ai_addr, AI->ai_addrlen);
1351
1352 if (errno == EINVAL) {
1353 errlen = sizeof(err);
1354 x = getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &errlen);
1355
1356 if (x >= 0)
1357 errno = x;
1358 }
1359
1360 #else
1361 errlen = sizeof(err);
1362
1363 x = getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &errlen);
1364
1365 if (x == 0)
1366 errno = err;
1367
1368 #if defined(_SQUID_SOLARIS_)
1369 /*
1370 * Solaris 2.4's socket emulation doesn't allow you
1371 * to determine the error from a failed non-blocking
1372 * connect and just returns EPIPE. Create a fake
1373 * error message for connect. -- fenner@parc.xerox.com
1374 */
1375 if (x < 0 && errno == EPIPE)
1376 errno = ENOTCONN;
1377
1378 #endif
1379 #endif
1380
1381 }
1382
1383 #ifdef _SQUID_LINUX_
1384 /* 2007-11-27:
1385 * Linux Debian replaces our allocated AI pointer with garbage when
1386 * connect() fails. This leads to segmentation faults deallocating
1387 * the system-allocated memory when we go to clean up our pointer.
1388 * HACK: is to leak the memory returned since we can't deallocate.
1389 */
1390 if(errno != 0) {
1391 AI = NULL;
1392 }
1393 #endif
1394
1395 address.FreeAddrInfo(AI);
1396
1397 PROF_stop(comm_connect_addr);
1398
1399 if (errno == 0 || errno == EISCONN)
1400 status = COMM_OK;
1401 else if (ignoreErrno(errno))
1402 status = COMM_INPROGRESS;
1403 else
1404 #if USE_IPV6
1405 if( address.IsIPv4() && F->sock_family == AF_INET6 ) {
1406
1407 /* failover to trying IPv4-only link if an IPv6 one fails */
1408 /* to catch the edge case of apps listening on IPv4-localhost */
1409 F->sock_family = AF_INET;
1410 int res = comm_connect_addr(sock, address);
1411
1412 /* if that fails too, undo our temporary socktype hack so the repeat works properly. */
1413 if(res == COMM_ERROR)
1414 F->sock_family = AF_INET6;
1415
1416 return res;
1417 }
1418 else
1419 #endif
1420 return COMM_ERROR;
1421
1422 address.NtoA(F->ipaddr, MAX_IPSTRLEN);
1423
1424 F->remote_port = address.GetPort(); /* remote_port is HS */
1425
1426 if (status == COMM_OK)
1427 {
1428 debugs(5, 10, "comm_connect_addr: FD " << sock << " connected to " << address);
1429 } else if (status == COMM_INPROGRESS)
1430 {
1431 debugs(5, 10, "comm_connect_addr: FD " << sock << " connection pending");
1432 }
1433
1434 return status;
1435 }
1436
1437 /* Wait for an incoming connection on FD. FD should be a socket returned
1438 * from comm_listen. */
1439 static int
1440 comm_old_accept(int fd, ConnectionDetail &details)
1441 {
1442 PROF_start(comm_accept);
1443 statCounter.syscalls.sock.accepts++;
1444 int sock;
1445 struct addrinfo *gai = NULL;
1446 details.me.InitAddrInfo(gai);
1447
1448 if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
1449
1450 details.me.FreeAddrInfo(gai);
1451
1452 PROF_stop(comm_accept);
1453
1454 if (ignoreErrno(errno))
1455 {
1456 debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror());
1457 return COMM_NOMESSAGE;
1458 } else if (ENFILE == errno || EMFILE == errno)
1459 {
1460 debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror());
1461 return COMM_ERROR;
1462 } else
1463 {
1464 debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror());
1465 return COMM_ERROR;
1466 }
1467 }
1468
1469 details.peer = *gai;
1470
1471 details.me.SetEmpty();
1472 getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
1473 details.me = *gai;
1474
1475 commSetCloseOnExec(sock);
1476
1477 /* fdstat update */
1478 fd_open(sock, FD_SOCKET, "HTTP Request");
1479 fdd_table[sock].close_file = NULL;
1480 fdd_table[sock].close_line = 0;
1481 fdc_table[sock].active = 1;
1482 fde *F = &fd_table[sock];
1483 details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
1484 F->remote_port = details.peer.GetPort();
1485 F->local_addr.SetPort(details.me.GetPort());
1486 F->sock_family = gai->ai_family;
1487
1488 commSetNonBlocking(sock);
1489
1490 details.me.FreeAddrInfo(gai);
1491
1492 PROF_stop(comm_accept);
1493 return sock;
1494 }
1495
1496 void
1497 commCallCloseHandlers(int fd)
1498 {
1499 fde *F = &fd_table[fd];
1500 debugs(5, 5, "commCallCloseHandlers: FD " << fd);
1501
1502 while (F->closeHandler != NULL) {
1503 close_handler ch = *F->closeHandler;
1504 conn_close_pool->free(F->closeHandler); /* AAA */
1505 F->closeHandler = ch.next;
1506 ch.next = NULL;
1507 debugs(5, 5, "commCallCloseHandlers: ch->handler=" << ch.handler << " data=" << ch.data);
1508
1509 if (cbdataReferenceValid(ch.data))
1510 ch.handler(fd, ch.data);
1511
1512 cbdataReferenceDone(ch.data);
1513 }
1514 }
1515
1516 #if LINGERING_CLOSE
1517 static void
1518 commLingerClose(int fd, void *unused)
1519 {
1520 LOCAL_ARRAY(char, buf, 1024);
1521 int n;
1522 n = FD_READ_METHOD(fd, buf, 1024);
1523
1524 if (n < 0)
1525 debugs(5, 3, "commLingerClose: FD " << fd << " read: " << xstrerror());
1526
1527 comm_close(fd);
1528 }
1529
1530 static void
1531 commLingerTimeout(int fd, void *unused)
1532 {
1533 debugs(5, 3, "commLingerTimeout: FD " << fd);
1534 comm_close(fd);
1535 }
1536
1537 /*
1538 * Inspired by apache
1539 */
1540 void
1541 comm_lingering_close(int fd)
1542 {
1543 #if USE_SSL
1544
1545 if (fd_table[fd].ssl)
1546 ssl_shutdown_method(fd);
1547
1548 #endif
1549
1550 if (shutdown(fd, 1) < 0) {
1551 comm_close(fd);
1552 return;
1553 }
1554
1555 fd_note(fd, "lingering close");
1556 commSetTimeout(fd, 10, commLingerTimeout, NULL);
1557 commSetSelect(fd, COMM_SELECT_READ, commLingerClose, NULL, 0);
1558 }
1559
1560 #endif
1561
1562 /*
1563 * enable linger with time of 0 so that when the socket is
1564 * closed, TCP generates a RESET
1565 */
1566 void
1567 comm_reset_close(int fd)
1568 {
1569
1570 struct linger L;
1571 L.l_onoff = 1;
1572 L.l_linger = 0;
1573
1574 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &L, sizeof(L)) < 0)
1575 debugs(50, 0, "commResetTCPClose: FD " << fd << ": " << xstrerror());
1576
1577 comm_close(fd);
1578 }
1579
1580 void
1581 CommRead::nullCallback()
1582 {
1583 callback = CallBack<IOCB>();
1584 }
1585
1586 void
1587 AcceptFD::nullCallback()
1588 {
1589 callback = CallBack<IOACB>();
1590 }
1591
1592 void
1593 CommRead::doCallback(comm_err_t errcode, int xerrno)
1594 {
1595 if (callback.handler)
1596 callback.handler(fd, buf, 0, errcode, xerrno, callback.data);
1597
1598 nullCallback();
1599 }
1600
1601 void
1602 AcceptFD::doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *connDetails)
1603 {
1604 if (callback.handler) {
1605 CallBack<IOACB> aCallback = callback;
1606 nullCallback();
1607 aCallback.handler(fd, newfd, connDetails, errcode, xerrno, aCallback.data);
1608 }
1609 }
1610
1611 /*
1612 * Close the socket fd.
1613 *
1614 * + call write handlers with ERR_CLOSING
1615 * + call read handlers with ERR_CLOSING
1616 * + call closing handlers
1617 *
1618 * NOTE: COMM_ERR_CLOSING will NOT be called for CommReads' sitting in a
1619 * DeferredReadManager.
1620 */
1621 void
1622 _comm_close(int fd, char const *file, int line)
1623 {
1624 fde *F = NULL;
1625 dlink_node *node;
1626 CommCallbackData *cio;
1627
1628 debugs(5, 5, "comm_close: FD " << fd);
1629 assert(fd >= 0);
1630 assert(fd < Squid_MaxFD);
1631 F = &fd_table[fd];
1632 fdd_table[fd].close_file = file;
1633 fdd_table[fd].close_line = line;
1634
1635 if (F->flags.closing)
1636 return;
1637
1638 if (shutting_down && (!F->flags.open || F->type == FD_FILE))
1639 return;
1640
1641 assert(F->flags.open);
1642
1643 /* The following fails because ipc.c is doing calls to pipe() to create sockets! */
1644 assert(fdc_table[fd].active == 1);
1645
1646 assert(F->type != FD_FILE);
1647
1648 PROF_start(comm_close);
1649
1650 F->flags.closing = 1;
1651
1652 #if USE_SSL
1653
1654 if (F->ssl)
1655 ssl_shutdown_method(fd);
1656
1657 #endif
1658
1659 commSetTimeout(fd, -1, NULL, NULL);
1660
1661 /* new-style read/write handler stuff */
1662 if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
1663 commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
1664 commio_call_callback(COMMIO_FD_WRITECB(fd));
1665 }
1666 if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
1667 commio_complete_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
1668 commio_call_callback(COMMIO_FD_READCB(fd));
1669 }
1670
1671 /* Do callbacks for read/accept routines, if any */
1672 fdc_table[fd].accept.accept.doCallback(fd, -1, COMM_ERR_CLOSING, 0, NULL);
1673
1674 /* Complete (w/ COMM_ERR_CLOSING!) any pending io callbacks */
1675 while (fdc_table[fd].CommCallbackList.head != NULL) {
1676 node = fdc_table[fd].CommCallbackList.head;
1677 cio = (CommCallbackData *)node->data;
1678 assert(fd == cio->result.fd); /* just paranoid */
1679 /* We're closing! */
1680 cio->fdClosing();
1681 cio->callACallback();
1682 delete cio;
1683 }
1684
1685 commCallCloseHandlers(fd);
1686
1687 if (F->pconn.uses)
1688 F->pconn.pool->count(F->pconn.uses);
1689
1690 comm_empty_os_read_buffers(fd);
1691
1692 #if USE_SSL
1693
1694 if (F->ssl) {
1695 SSL_free(F->ssl);
1696 F->ssl = NULL;
1697 }
1698
1699 #endif
1700 fd_close(fd); /* update fdstat */
1701
1702 close(fd);
1703
1704 fdc_table[fd].active = 0;
1705
1706 if (fdc_table[fd].half_closed) {
1707 AbortChecker::Instance().stopMonitoring(fd);
1708 fdc_table[fd].half_closed = false;
1709 }
1710
1711 fdc_table[fd] = fdc_t(fd);
1712
1713 statCounter.syscalls.sock.closes++;
1714
1715 PROF_stop(comm_close);
1716 /* When an fd closes, give accept() a chance, if need be */
1717
1718 if (fdNFree() >= RESERVED_FD)
1719 AcceptLimiter::Instance().kick();
1720 }
1721
1722 /* Send a udp datagram to specified TO_ADDR. */
1723 int
1724 comm_udp_sendto(int fd,
1725 const IPAddress &to_addr,
1726 const void *buf,
1727 int len)
1728 {
1729 int x = 0;
1730 struct addrinfo *AI = NULL;
1731
1732 PROF_start(comm_udp_sendto);
1733 statCounter.syscalls.sock.sendtos++;
1734
1735 debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
1736 " using FD " << fd << " using Port " << comm_local_port(fd) );
1737
1738 /* BUG: something in the above macro appears to occasionally be setting AI to garbage. */
1739 /* AYJ: 2007-08-27 : or was it because I wasn't then setting 'fd_table[fd].sock_family' to fill properly. */
1740 assert( NULL == AI );
1741
1742 to_addr.GetAddrInfo(AI, fd_table[fd].sock_family);
1743
1744 x = sendto(fd, buf, len, 0, AI->ai_addr, AI->ai_addrlen);
1745
1746 to_addr.FreeAddrInfo(AI);
1747
1748 PROF_stop(comm_udp_sendto);
1749
1750 if (x >= 0)
1751 return x;
1752
1753 #ifdef _SQUID_LINUX_
1754
1755 if (ECONNREFUSED != errno)
1756 #endif
1757
1758 debugs(50, 1, "comm_udp_sendto: FD " << fd << ", (family=" << fd_table[fd].sock_family << ") " << to_addr << ": " << xstrerror());
1759
1760 return COMM_ERROR;
1761 }
1762
1763 void
1764 comm_add_close_handler(int fd, PF * handler, void *data)
1765 {
1766 close_handler *newHandler = (close_handler *)conn_close_pool->alloc(); /* AAA */
1767 close_handler *c;
1768 debugs(5, 5, "comm_add_close_handler: FD " << fd << ", handler=" <<
1769 handler << ", data=" << data);
1770
1771 for (c = fd_table[fd].closeHandler; c; c = c->next)
1772 assert(c->handler != handler || c->data != data);
1773
1774 newHandler->handler = handler;
1775
1776 newHandler->data = cbdataReference(data);
1777
1778 newHandler->next = fd_table[fd].closeHandler;
1779
1780 fd_table[fd].closeHandler = newHandler;
1781 }
1782
1783 void
1784 comm_remove_close_handler(int fd, PF * handler, void *data)
1785 {
1786 assert (fdc_table[fd].active);
1787 close_handler *p = NULL;
1788 close_handler *last = NULL;
1789 /* Find handler in list */
1790 debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", handler=" <<
1791 handler << ", data=" << data);
1792
1793 for (p = fd_table[fd].closeHandler; p != NULL; last = p, p = p->next)
1794 if (p->handler == handler && p->data == data)
1795 break; /* This is our handler */
1796
1797 assert(p != NULL);
1798
1799 /* Remove list entry */
1800 if (last)
1801 last->next = p->next;
1802 else
1803 fd_table[fd].closeHandler = p->next;
1804
1805 cbdataReferenceDone(p->data);
1806
1807 conn_close_pool->free(p);
1808 }
1809
1810 static void
1811 commSetNoLinger(int fd)
1812 {
1813
1814 struct linger L;
1815 L.l_onoff = 0; /* off */
1816 L.l_linger = 0;
1817
1818 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &L, sizeof(L)) < 0)
1819 debugs(50, 0, "commSetNoLinger: FD " << fd << ": " << xstrerror());
1820
1821 fd_table[fd].flags.nolinger = 1;
1822 }
1823
1824 static void
1825 commSetReuseAddr(int fd)
1826 {
1827 int on = 1;
1828
1829 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0)
1830 debugs(50, 1, "commSetReuseAddr: FD " << fd << ": " << xstrerror());
1831 }
1832
1833 static void
1834 commSetTcpRcvbuf(int fd, int size)
1835 {
1836 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size)) < 0)
1837 debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
1838 if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char *) &size, sizeof(size)) < 0)
1839 debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
1840 #ifdef TCP_WINDOW_CLAMP
1841 if (setsockopt(fd, SOL_TCP, TCP_WINDOW_CLAMP, (char *) &size, sizeof(size)) < 0)
1842 debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
1843 #endif
1844 }
1845
1846 int
1847 commSetNonBlocking(int fd)
1848 {
1849 #ifndef _SQUID_MSWIN_
1850 int flags;
1851 int dummy = 0;
1852 #endif
1853 #ifdef _SQUID_WIN32_
1854
1855 int nonblocking = TRUE;
1856
1857 #ifdef _SQUID_CYGWIN_
1858
1859 if (fd_table[fd].type != FD_PIPE) {
1860 #endif
1861
1862 if (ioctl(fd, FIONBIO, &nonblocking) < 0) {
1863 debugs(50, 0, "commSetNonBlocking: FD " << fd << ": " << xstrerror() << " " << fd_table[fd].type);
1864 return COMM_ERROR;
1865 }
1866
1867 #ifdef _SQUID_CYGWIN_
1868
1869 } else {
1870 #endif
1871 #endif
1872 #ifndef _SQUID_MSWIN_
1873
1874 if ((flags = fcntl(fd, F_GETFL, dummy)) < 0) {
1875 debugs(50, 0, "FD " << fd << ": fcntl F_GETFL: " << xstrerror());
1876 return COMM_ERROR;
1877 }
1878
1879 if (fcntl(fd, F_SETFL, flags | SQUID_NONBLOCK) < 0) {
1880 debugs(50, 0, "commSetNonBlocking: FD " << fd << ": " << xstrerror());
1881 return COMM_ERROR;
1882 }
1883
1884 #endif
1885 #ifdef _SQUID_CYGWIN_
1886
1887 }
1888
1889 #endif
1890 fd_table[fd].flags.nonblocking = 1;
1891
1892 return 0;
1893 }
1894
1895 int
1896 commUnsetNonBlocking(int fd)
1897 {
1898 #ifdef _SQUID_MSWIN_
1899 int nonblocking = FALSE;
1900
1901 if (ioctlsocket(fd, FIONBIO, (unsigned long *) &nonblocking) < 0) {
1902 #else
1903 int flags;
1904 int dummy = 0;
1905
1906 if ((flags = fcntl(fd, F_GETFL, dummy)) < 0) {
1907 debugs(50, 0, "FD " << fd << ": fcntl F_GETFL: " << xstrerror());
1908 return COMM_ERROR;
1909 }
1910
1911 if (fcntl(fd, F_SETFL, flags & (~SQUID_NONBLOCK)) < 0) {
1912 #endif
1913 debugs(50, 0, "commUnsetNonBlocking: FD " << fd << ": " << xstrerror());
1914 return COMM_ERROR;
1915 }
1916
1917 fd_table[fd].flags.nonblocking = 0;
1918 return 0;
1919 }
1920
1921 void
1922 commSetCloseOnExec(int fd) {
1923 #ifdef FD_CLOEXEC
1924 int flags;
1925 int dummy = 0;
1926
1927 if ((flags = fcntl(fd, F_GETFL, dummy)) < 0) {
1928 debugs(50, 0, "FD " << fd << ": fcntl F_GETFL: " << xstrerror());
1929 return;
1930 }
1931
1932 if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0)
1933 debugs(50, 0, "FD " << fd << ": set close-on-exec failed: " << xstrerror());
1934
1935 fd_table[fd].flags.close_on_exec = 1;
1936
1937 #endif
1938 }
1939
1940 #ifdef TCP_NODELAY
1941 static void
1942 commSetTcpNoDelay(int fd) {
1943 int on = 1;
1944
1945 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0)
1946 debugs(50, 1, "commSetTcpNoDelay: FD " << fd << ": " << xstrerror());
1947
1948 fd_table[fd].flags.nodelay = 1;
1949 }
1950
1951 #endif
1952
1953
1954 void
1955 comm_init(void) {
1956 fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
1957 fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
1958 fdc_table = new fdc_t[Squid_MaxFD];
1959 commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
1960
1961 for (int pos = 0; pos < Squid_MaxFD; ++pos) {
1962 fdc_table[pos] = fdc_t(pos);
1963 }
1964 for (int pos = 0; pos < Squid_MaxFD; pos++) {
1965 commfd_table[pos].fd = pos;
1966 commfd_table[pos].readcb.fd = pos;
1967 commfd_table[pos].readcb.type = IOCB_READ;
1968 commfd_table[pos].writecb.fd = pos;
1969 commfd_table[pos].writecb.type = IOCB_WRITE;
1970 }
1971
1972 /* XXX account fd_table */
1973 /* Keep a few file descriptors free so that we don't run out of FD's
1974 * after accepting a client but before it opens a socket or a file.
1975 * Since Squid_MaxFD can be as high as several thousand, don't waste them */
1976 RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
1977
1978 conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
1979 }
1980
1981 void
1982 comm_exit(void) {
1983 safe_free(fd_table);
1984 safe_free(fdd_table);
1985 if (fdc_table) {
1986 delete[] fdc_table;
1987 fdc_table = NULL;
1988 }
1989 safe_free(commfd_table);
1990 }
1991
1992 /* Write to FD. */
1993 static void
1994 commHandleWrite(int fd, void *data) {
1995 comm_io_callback_t *state = (comm_io_callback_t *)data;
1996 int len = 0;
1997 int nleft;
1998
1999 assert(state == COMMIO_FD_WRITECB(fd));
2000
2001 PROF_start(commHandleWrite);
2002 debugs(5, 5, "commHandleWrite: FD " << fd << ": off " <<
2003 (long int) state->offset << ", sz " << (long int) state->size << ".");
2004
2005 nleft = state->size - state->offset;
2006 len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
2007 debugs(5, 5, "commHandleWrite: write() returns " << len);
2008 fd_bytes(fd, len, FD_WRITE);
2009 statCounter.syscalls.sock.writes++;
2010
2011 if (len == 0) {
2012 /* Note we even call write if nleft == 0 */
2013 /* We're done */
2014
2015 if (nleft != 0)
2016 debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
2017
2018 commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
2019 } else if (len < 0) {
2020 /* An error */
2021
2022 if (fd_table[fd].flags.socket_eof) {
2023 debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
2024 commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
2025 } else if (ignoreErrno(errno)) {
2026 debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
2027 commSetSelect(fd,
2028 COMM_SELECT_WRITE,
2029 commHandleWrite,
2030 state,
2031 0);
2032 } else {
2033 debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
2034 commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
2035 }
2036 } else {
2037 /* A successful write, continue */
2038 state->offset += len;
2039
2040 if (state->offset < (off_t)state->size) {
2041 /* Not done, reinstall the write handler and write some more */
2042 commSetSelect(fd,
2043 COMM_SELECT_WRITE,
2044 commHandleWrite,
2045 state,
2046 0);
2047 } else {
2048 commio_complete_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
2049 }
2050 }
2051
2052 PROF_stop(commHandleWrite);
2053 }
2054
2055 /*
2056 * Queue a write. handler/handler_data are called when the write
2057 * completes, on error, or on file descriptor close.
2058 *
2059 * free_func is used to free the passed buffer when the write has completed.
2060 */
2061 void
2062 comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
2063 {
2064 assert(!fd_table[fd].flags.closing);
2065
2066 debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": hndl " << handler << ": data " << handler_data << ".");
2067
2068 if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
2069 /* This means that the write has been scheduled, but has not
2070 * triggered yet
2071 */
2072 fatalf ("comm_write: fd %d: pending callback!\n", fd);
2073 }
2074 /* XXX ugly */
2075 commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), handler, handler_data, (char *)buf, free_func, size);
2076 commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, COMMIO_FD_WRITECB(fd), 0);
2077 }
2078
2079 /* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
2080 void
2081 comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) {
2082 comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
2083 }
2084
2085
2086 /*
2087 * hm, this might be too general-purpose for all the places we'd
2088 * like to use it.
2089 */
2090 int
2091 ignoreErrno(int ierrno) {
2092 switch (ierrno) {
2093
2094 case EINPROGRESS:
2095
2096 case EWOULDBLOCK:
2097 #if EAGAIN != EWOULDBLOCK
2098
2099 case EAGAIN:
2100 #endif
2101
2102 case EALREADY:
2103
2104 case EINTR:
2105 #ifdef ERESTART
2106
2107 case ERESTART:
2108 #endif
2109
2110 return 1;
2111
2112 default:
2113 return 0;
2114 }
2115
2116 /* NOTREACHED */
2117 }
2118
2119 void
2120 commCloseAllSockets(void) {
2121 int fd;
2122 fde *F = NULL;
2123
2124 for (fd = 0; fd <= Biggest_FD; fd++) {
2125 F = &fd_table[fd];
2126
2127 if (!F->flags.open)
2128 continue;
2129
2130 if (F->type != FD_SOCKET)
2131 continue;
2132
2133 if (F->flags.ipc) /* don't close inter-process sockets */
2134 continue;
2135
2136 if (F->timeout_handler) {
2137 PF *callback = F->timeout_handler;
2138 void *cbdata = NULL;
2139 F->timeout_handler = NULL;
2140 debugs(5, 5, "commCloseAllSockets: FD " << fd << ": Calling timeout handler");
2141
2142 if (cbdataReferenceValidDone(F->timeout_data, &cbdata))
2143 callback(fd, cbdata);
2144 } else {
2145 debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_close()");
2146 comm_close(fd);
2147 }
2148 }
2149 }
2150
2151 static bool
2152 AlreadyTimedOut(fde *F) {
2153 if (!F->flags.open)
2154 return true;
2155
2156 if (F->timeout == 0)
2157 return true;
2158
2159 if (F->timeout > squid_curtime)
2160 return true;
2161
2162 return false;
2163 }
2164
2165 void
2166 checkTimeouts(void) {
2167 int fd;
2168 fde *F = NULL;
2169 PF *callback;
2170
2171 for (fd = 0; fd <= Biggest_FD; fd++) {
2172 F = &fd_table[fd];
2173
2174 if (AlreadyTimedOut(F))
2175 continue;
2176
2177 debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
2178
2179 if (F->timeout_handler) {
2180 debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
2181 callback = F->timeout_handler;
2182 F->timeout_handler = NULL;
2183 callback(fd, F->timeout_data);
2184 } else {
2185 debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
2186 comm_close(fd);
2187 }
2188 }
2189 }
2190
2191 /*
2192 * New-style listen and accept routines
2193 *
2194 * Listen simply registers our interest in an FD for listening,
2195 * and accept takes a callback to call when an FD has been
2196 * accept()ed.
2197 */
2198 int
2199 comm_listen(int sock) {
2200 int x;
2201
2202 if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
2203 debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
2204 return x;
2205 }
2206
2207 if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
2208 #ifdef SO_ACCEPTFILTER
2209 struct accept_filter_arg afa;
2210 bzero(&afa, sizeof(afa));
2211 debug(5, 0) ("Installing accept filter '%s' on FD %d\n",
2212 Config.accept_filter, sock);
2213 xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
2214 x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
2215 if (x < 0)
2216 debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
2217 #elif defined(TCP_DEFER_ACCEPT)
2218 int seconds = 30;
2219 if (strncmp(Config.accept_filter, "data=", 5) == 0)
2220 seconds = atoi(Config.accept_filter + 5);
2221 x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
2222 if (x < 0)
2223 debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
2224 #else
2225 debugs(5, 0, "accept_filter not supported on your OS");
2226 #endif
2227 }
2228
2229 return sock;
2230 }
2231
2232 void
2233 fdc_t::beginAccepting() {
2234 accept.accept.beginAccepting();
2235 }
2236
2237 int
2238 fdc_t::acceptCount() const {
2239 return accept.accept.acceptCount();
2240 }
2241
2242 void
2243 fdc_t::acceptOne(int fd) {
2244 // If there is no callback and we accept, we will leak the accepted FD.
2245 // When we are running out of FDs, there is often no callback.
2246 if (!accept.accept.callback.handler) {
2247 debugs(5, 5, "fdc_t::acceptOne orphaned: FD " << fd);
2248 // XXX: can we remove this and similar "just in case" calls and
2249 // either listen always or listen only when there is a callback?
2250 if (!AcceptLimiter::Instance().deferring())
2251 commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
2252 accept.accept.finished(true);
2253 return;
2254 }
2255
2256 /*
2257 * We don't worry about running low on FDs here. Instead,
2258 * httpAccept() will use AcceptLimiter if we reach the limit
2259 * there.
2260 */
2261
2262 /* Accept a new connection */
2263 int newfd = comm_old_accept(fd, accept.connDetails);
2264
2265 /* Check for errors */
2266
2267 if (newfd < 0) {
2268 if (newfd == COMM_NOMESSAGE) {
2269 /* register interest again */
2270 debugs(5, 5, "fdc_t::acceptOne eof: FD " << fd << " handler: " << (void*)accept.accept.callback.handler);
2271 commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
2272 accept.accept.finished(true);
2273 return;
2274 }
2275
2276 /* A non-recoverable error - register an error callback */
2277 new CommAcceptCallbackData(fd, accept.accept.callback, COMM_ERROR, errno, -1, accept.connDetails);
2278
2279 accept.accept.callback = CallBack<IOACB>();
2280
2281 accept.accept.finished(true);
2282
2283 return;
2284 }
2285
2286 debugs(5, 5, "fdc_t::acceptOne accepted: FD " << fd << " handler: " << (void*)accept.accept.callback.handler << " newfd: " << newfd);
2287
2288 assert(accept.accept.callback.handler);
2289 accept.accept.doCallback(fd, newfd, COMM_OK, 0, &accept.connDetails);
2290
2291 /* If we weren't re-registed, don't bother trying again! */
2292
2293 if (accept.accept.callback.handler == NULL)
2294 accept.accept.finished(true);
2295 }
2296
2297 bool
2298 AcceptFD::finished() const {
2299 return finished_;
2300 }
2301
2302 void
2303 AcceptFD::finished(bool newValue) {
2304 finished_ = newValue;
2305 }
2306
2307 bool
2308 AcceptFD::finishedAccepting() const {
2309 return acceptCount() >= MAX_ACCEPT_PER_LOOP || finished();
2310 }
2311
2312 /*
2313 * This callback is called whenever a filedescriptor is ready
2314 * to dupe itself and fob off an accept()ed connection
2315 */
2316 static void
2317 comm_accept_try(int fd, void *data) {
2318 assert(fdc_table[fd].active == 1);
2319
2320 fdc_table[fd].beginAccepting();
2321
2322 while (!fdc_table[fd].accept.accept.finishedAccepting())
2323 fdc_table[fd].acceptOne(fd);
2324 }
2325
2326 /*
2327 * Notes:
2328 * + the current interface will queue _one_ accept per io loop.
2329 * this isn't very optimal and should be revisited at a later date.
2330 */
2331 void
2332 comm_accept(int fd, IOACB *handler, void *handler_data) {
2333 debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
2334 requireOpenAndActive(fd);
2335
2336 /* make sure we're not pending! */
2337 assert(fdc_table[fd].accept.accept.callback.handler == NULL);
2338
2339 /* Record our details */
2340 fdc_table[fd].accept.accept.callback = CallBack<IOACB> (handler, handler_data);
2341
2342 /* Kick off the accept */
2343 #if OPTIMISTIC_IO
2344
2345 comm_accept_try(fd, NULL);
2346 #else
2347
2348 commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
2349 #endif
2350 }
2351
2352 void CommIO::Initialise() {
2353 /* Initialize done pipe signal */
2354 int DonePipe[2];
2355 pipe(DonePipe);
2356 DoneFD = DonePipe[1];
2357 DoneReadFD = DonePipe[0];
2358 fd_open(DoneReadFD, FD_PIPE, "async-io completetion event: main");
2359 fd_open(DoneFD, FD_PIPE, "async-io completetion event: threads");
2360 commSetNonBlocking(DoneReadFD);
2361 commSetNonBlocking(DoneFD);
2362 commSetSelect(DoneReadFD, COMM_SELECT_READ, NULLFDHandler, NULL, 0);
2363 Initialised = true;
2364 }
2365
2366 void CommIO::NotifyIOClose() {
2367 /* Close done pipe signal */
2368 FlushPipe();
2369 close(DoneFD);
2370 close(DoneReadFD);
2371 fd_close(DoneFD);
2372 fd_close(DoneReadFD);
2373 Initialised = false;
2374 }
2375
2376 bool CommIO::Initialised = false;
2377 bool CommIO::DoneSignalled = false;
2378 int CommIO::DoneFD = -1;
2379 int CommIO::DoneReadFD = -1;
2380
2381 void
2382 CommIO::FlushPipe() {
2383 char buf[256];
2384 FD_READ_METHOD(DoneReadFD, buf, sizeof(buf));
2385 }
2386
2387 void
2388 CommIO::NULLFDHandler(int fd, void *data) {
2389 FlushPipe();
2390 commSetSelect(fd, COMM_SELECT_READ, NULLFDHandler, NULL, 0);
2391 }
2392
2393 void
2394 CommIO::ResetNotifications() {
2395 if (DoneSignalled) {
2396 FlushPipe();
2397 DoneSignalled = false;
2398 }
2399 }
2400
2401 AcceptLimiter AcceptLimiter::Instance_;
2402
2403 AcceptLimiter &AcceptLimiter::Instance() {
2404 return Instance_;
2405 }
2406
2407 bool
2408 AcceptLimiter::deferring() const {
2409 return deferred.size() > 0;
2410 }
2411
2412 void
2413 AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data) {
2414 debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc);
2415 Acceptor temp;
2416 temp.theFunction = aFunc;
2417 temp.acceptFD = fd;
2418 temp.theData = data;
2419 deferred.push_back(temp);
2420 }
2421
2422 void
2423 AcceptLimiter::kick() {
2424 if (!deferring())
2425 return;
2426
2427 /* Yes, this means the first on is the last off....
2428 * If the list container was a little more friendly, we could sensibly us it.
2429 */
2430 Acceptor temp = deferred.pop_back();
2431
2432 comm_accept (temp.acceptFD, temp.theFunction, temp.theData);
2433 }
2434
2435 void
2436 commMarkHalfClosed(int fd) {
2437 assert (fdc_table[fd].active && !fdc_table[fd].half_closed);
2438 AbortChecker::Instance().monitor(fd);
2439 fdc_table[fd].half_closed = true;
2440 }
2441
2442 int commIsHalfClosed(int fd) {
2443 assert (fdc_table[fd].active);
2444
2445 return fdc_table[fd].half_closed;
2446 }
2447
2448 void
2449 commCheckHalfClosed(void *data) {
2450 AbortChecker::Instance().doIOLoop();
2451 eventAdd("commCheckHalfClosed", commCheckHalfClosed, NULL, 1.0, false);
2452 }
2453
2454 AbortChecker &AbortChecker::Instance() {return Instance_;}
2455
2456 AbortChecker AbortChecker::Instance_;
2457
2458 void
2459 AbortChecker::AbortCheckReader(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data) {
2460 assert (size == 0);
2461 /* sketch:
2462 * if the read is ok and 0, the conn is still open.
2463 * if the read is a fail, close the conn
2464 */
2465
2466 if (flag != COMM_OK && flag != COMM_ERR_CLOSING) {
2467 debugs(5, 3, "AbortChecker::AbortCheckReader: FD " << fd << " aborted");
2468 comm_close(fd);
2469 }
2470 }
2471
2472 void
2473 AbortChecker::monitor(int fd) {
2474 assert (!contains(fd));
2475
2476 add
2477 (fd);
2478
2479 debugs(5, 3, "AbortChecker::monitor: monitoring half closed FD " << fd << " for aborts");
2480 }
2481
2482 void
2483 AbortChecker::stopMonitoring (int fd) {
2484 assert (contains (fd));
2485
2486 remove
2487 (fd);
2488
2489 debugs(5, 3, "AbortChecker::stopMonitoring: stopped monitoring half closed FD " << fd << " for aborts");
2490 }
2491
2492 #include "splay.h"
2493 void
2494 AbortChecker::doIOLoop() {
2495 fds->walk(RemoveCheck, this);
2496 fds->walk(AddCheck, this);
2497 }
2498
2499 void
2500 AbortChecker::AddCheck (int const &fd, void *data) {
2501 AbortChecker *me = (AbortChecker *)data;
2502 me->addCheck(fd);
2503 }
2504
2505 void
2506 AbortChecker::RemoveCheck (int const &fd, void *data) {
2507 AbortChecker *me = (AbortChecker *)data;
2508 me->removeCheck(fd);
2509 }
2510
2511
2512 int
2513 AbortChecker::IntCompare (int const &lhs, int const &rhs) {
2514 return lhs - rhs;
2515 }
2516
2517 bool
2518 AbortChecker::contains (int const fd) const {
2519 fds = fds->splay(fd, IntCompare);
2520
2521 if (splayLastResult != 0)
2522 return false;
2523
2524 return true;
2525 }
2526
2527 void
2528
2529 AbortChecker::remove
2530 (int const fd) {
2531
2532 fds = fds->remove
2533 (fd, IntCompare);
2534 }
2535
2536 void
2537
2538 AbortChecker::add
2539 (int const fd) {
2540 fds = fds->insert (fd, IntCompare);
2541 }
2542
2543 void
2544 AbortChecker::addCheck (int const fd) {
2545 /* assert comm_is_open (fd); */
2546 comm_read(fd, NULL, 0, AbortCheckReader, NULL);
2547 }
2548
2549 void
2550 AbortChecker::removeCheck (int const fd) {
2551 /*
2552 comm_read_cancel(fd, AbortCheckReader, NULL);
2553 */
2554 }
2555
2556 CommRead::CommRead() : fd(-1), buf(NULL), len(0) {}
2557
2558 CommRead::CommRead(int fd_, char *buf_, int len_, IOCB *handler_, void *data_)
2559 : fd(fd_), buf(buf_), len(len_), callback(handler_, data_) {}
2560
2561 DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {}
2562
2563 DeferredRead::DeferredRead (DeferrableRead *aReader, void *data, CommRead const &aRead) : theReader(aReader), theContext (data), theRead(aRead), cancelled(false) {}
2564
2565 DeferredReadManager::~DeferredReadManager() {
2566 flushReads();
2567 assert (deferredReads.empty());
2568 }
2569
2570 /* explicit instantiation required for some systems */
2571
2572 template cbdata_type List<DeferredRead>
2573 ::CBDATA_List;
2574
2575 void
2576 DeferredReadManager::delayRead(DeferredRead const &aRead) {
2577 debugs(5, 3, "Adding deferred read on FD " << aRead.theRead.fd);
2578 List<DeferredRead> *temp = deferredReads.push_back(aRead);
2579 comm_add_close_handler (aRead.theRead.fd, CloseHandler, temp);
2580 }
2581
2582 void
2583 DeferredReadManager::CloseHandler(int fd, void *thecbdata) {
2584 if (!cbdataReferenceValid (thecbdata))
2585 return;
2586
2587 List<DeferredRead> *temp = (List<DeferredRead> *)thecbdata;
2588
2589 temp->element.markCancelled();
2590 }
2591
2592 DeferredRead
2593 DeferredReadManager::popHead(ListContainer<DeferredRead> &deferredReads) {
2594 assert (!deferredReads.empty());
2595
2596 if (!deferredReads.head->element.cancelled)
2597 comm_remove_close_handler(deferredReads.head->element.theRead.fd, CloseHandler, deferredReads.head);
2598
2599 DeferredRead result = deferredReads.pop_front();
2600
2601 return result;
2602 }
2603
2604 void
2605 DeferredReadManager::kickReads(int const count) {
2606 /* if we had List::size() we could consolidate this and flushReads */
2607
2608 if (count < 1) {
2609 flushReads();
2610 return;
2611 }
2612
2613 size_t remaining = count;
2614
2615 while (!deferredReads.empty() && remaining) {
2616 DeferredRead aRead = popHead(deferredReads);
2617 kickARead(aRead);
2618
2619 if (!aRead.cancelled)
2620 --remaining;
2621 }
2622 }
2623
2624 void
2625 DeferredReadManager::flushReads() {
2626 ListContainer<DeferredRead> reads;
2627 reads = deferredReads;
2628 deferredReads = ListContainer<DeferredRead>();
2629
2630 while (!reads.empty()) {
2631 DeferredRead aRead = popHead(reads);
2632 kickARead(aRead);
2633 }
2634 }
2635
2636 void
2637 DeferredReadManager::kickARead(DeferredRead const &aRead) {
2638 if (aRead.cancelled)
2639 return;
2640
2641 debugs(5, 3, "Kicking deferred read on FD " << aRead.theRead.fd);
2642
2643 aRead.theReader(aRead.theContext, aRead.theRead);
2644 }
2645
2646 void
2647 DeferredRead::markCancelled() {
2648 cancelled = true;
2649 }
2650
2651 ConnectionDetail::ConnectionDetail() : me(), peer() {
2652 }
2653
2654 bool
2655 CommDispatcher::dispatch() {
2656 bool result = comm_iocallbackpending();
2657 comm_calliocallback();
2658 /* and again to deal with indirectly queued events
2659 * resulting from the first call. These are usually
2660 * callbacks and should be dealt with immediately.
2661 */
2662 comm_calliocallback();
2663
2664 /* Adrian's *new* stuff */
2665 commio_call_callbacks();
2666 return result;
2667 }
2668
2669 int
2670 CommSelectEngine::checkEvents(int timeout) {
2671 static time_t last_timeout = 0;
2672
2673 /* No, this shouldn't be here. But it shouldn't be in each comm handler. -adrian */
2674 if (squid_curtime > last_timeout) {
2675 last_timeout = squid_curtime;
2676 checkTimeouts();
2677 }
2678
2679 switch (comm_select(timeout)) {
2680
2681 case COMM_OK:
2682
2683 case COMM_TIMEOUT:
2684 return 0;
2685
2686 case COMM_IDLE:
2687
2688 case COMM_SHUTDOWN:
2689 return EVENT_IDLE;
2690
2691 case COMM_ERROR:
2692 return EVENT_ERROR;
2693
2694 default:
2695 fatal_dump("comm.cc: Internal error -- this should never happen.");
2696 return EVENT_ERROR;
2697 };
2698 }