]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Fix Regression: SMP broke ICP outgoing IP lookup if no udp_outgoing_addr set
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "Store.h"
40 #include "comm.h"
41 #include "ICP.h"
42 #include "HttpRequest.h"
43 #include "acl/FilledChecklist.h"
44 #include "acl/Acl.h"
45 #include "AccessLogEntry.h"
46 #include "wordlist.h"
47 #include "SquidTime.h"
48 #include "SwapDir.h"
49 #include "icmp/net_db.h"
50 #include "ip/Address.h"
51 #include "ip/tools.h"
52 #include "ipc/StartListening.h"
53 #include "rfc1738.h"
54
55 /// dials icpIncomingConnectionOpened call
56 class IcpListeningStartedDialer: public CallDialer,
57 public Ipc::StartListeningCb
58 {
59 public:
60 typedef void (*Handler)(int fd, int errNo, Ip::Address& addr);
61 IcpListeningStartedDialer(Handler aHandler, Ip::Address& anAddr):
62 handler(aHandler), addr(anAddr) {}
63
64 virtual void print(std::ostream &os) const {
65 startPrint(os) <<
66 ", address=" << addr << ')';
67 }
68
69 virtual bool canDial(AsyncCall &) const { return true; }
70 virtual void dial(AsyncCall &) { (handler)(fd, errNo, addr); }
71
72 public:
73 Handler handler;
74 Ip::Address addr;
75 };
76
77 static void icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr);
78
79 /// \ingroup ServerProtocolICPInternal2
80 static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
81
82 /// \ingroup ServerProtocolICPInternal2
83 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
84
85 /// \ingroup ServerProtocolICPInternal2
86 static void icpCount(void *, int, size_t, int);
87
88 /**
89 \ingroup ServerProtocolICPInternal2
90 * IcpQueueHead is global so comm_incoming() knows whether or not
91 * to call icpUdpSendQueue.
92 */
93 static icpUdpData *IcpQueueHead = NULL;
94 /// \ingroup ServerProtocolICPInternal2
95 static icpUdpData *IcpQueueTail = NULL;
96
97 /// \ingroup ServerProtocolICPInternal2
98 Ip::Address theOutICPAddr;
99
100 /* icp_common_t */
101 _icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
102 {}
103
104 _icp_common_t::_icp_common_t(char *buf, unsigned int len)
105 {
106 if (len < sizeof(_icp_common_t)) {
107 /* mark as invalid */
108 length = len + 1;
109 return;
110 }
111
112 xmemcpy(this, buf, sizeof(icp_common_t));
113 /*
114 * Convert network order sensitive fields
115 */
116 length = ntohs(length);
117 reqnum = ntohl(reqnum);
118 flags = ntohl(flags);
119 pad = ntohl(pad);
120 }
121
122 icp_opcode
123 _icp_common_t::getOpCode() const
124 {
125 if (opcode > (char)ICP_END)
126 return ICP_INVALID;
127
128 return (icp_opcode)opcode;
129 }
130
131 /* ICPState */
132
133 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
134 header(aHeader),
135 request(HTTPMSGLOCK(aRequest)),
136 fd(-1),
137 url(NULL)
138 {}
139
140 ICPState::~ICPState()
141 {
142 safe_free(url);
143 HTTPMSGUNLOCK(request);
144 }
145
146
147 /* End ICPState */
148
149 /* ICP2State */
150
151 /// \ingroup ServerProtocolICPInternal2
152 class ICP2State : public ICPState, public StoreClient
153 {
154
155 public:
156 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
157 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
158
159 ~ICP2State();
160 void created(StoreEntry * newEntry);
161
162 int rtt;
163 int src_rtt;
164 uint32_t flags;
165 };
166
167 ICP2State::~ICP2State()
168 {}
169
170 void
171 ICP2State::created(StoreEntry *newEntry)
172 {
173 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
174 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
175 icp_opcode codeToSend;
176
177 if (icpCheckUdpHit(entry, request)) {
178 codeToSend = ICP_HIT;
179 } else {
180 #if USE_ICMP
181 if (Config.onoff.test_reachability && rtt == 0) {
182 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
183 netdbPingSite(request->GetHost());
184 }
185 #endif /* USE_ICMP */
186
187 if (icpGetCommonOpcode() != ICP_ERR)
188 codeToSend = icpGetCommonOpcode();
189 else if (Config.onoff.test_reachability && rtt == 0)
190 codeToSend = ICP_MISS_NOFETCH;
191 else
192 codeToSend = ICP_MISS;
193 }
194
195 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
196 delete this;
197 }
198
199 /* End ICP2State */
200
201 /// \ingroup ServerProtocolICPInternal2
202 static void
203 icpLogIcp(const Ip::Address &caddr, log_type logcode, int len, const char *url, int delay)
204 {
205 AccessLogEntry al;
206
207 if (LOG_TAG_NONE == logcode)
208 return;
209
210 if (LOG_ICP_QUERY == logcode)
211 return;
212
213 clientdbUpdate(caddr, logcode, PROTO_ICP, len);
214
215 if (!Config.onoff.log_udp)
216 return;
217
218 al.icp.opcode = ICP_QUERY;
219
220 al.url = url;
221
222 al.cache.caddr = caddr;
223
224 al.cache.replySize = len;
225
226 al.cache.code = logcode;
227
228 al.cache.msec = delay;
229
230 accessLogLog(&al, NULL);
231 }
232
233 /// \ingroup ServerProtocolICPInternal2
234 void
235 icpUdpSendQueue(int fd, void *unused)
236 {
237 icpUdpData *q;
238
239 while ((q = IcpQueueHead) != NULL) {
240 int delay = tvSubUsec(q->queue_time, current_time);
241 /* increment delay to prevent looping */
242 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
243 IcpQueueHead = q->next;
244 xfree(q);
245
246 if (x < 0)
247 break;
248 }
249 }
250
251 _icp_common_t *
252 _icp_common_t::createMessage(
253 icp_opcode opcode,
254 int flags,
255 const char *url,
256 int reqnum,
257 int pad)
258 {
259 char *buf = NULL;
260 icp_common_t *headerp = NULL;
261 char *urloffset = NULL;
262 int buf_len;
263 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
264
265 if (opcode == ICP_QUERY)
266 buf_len += sizeof(uint32_t);
267
268 buf = (char *) xcalloc(buf_len, 1);
269
270 headerp = (icp_common_t *) (void *) buf;
271
272 headerp->opcode = (char) opcode;
273
274 headerp->version = ICP_VERSION_CURRENT;
275
276 headerp->length = (uint16_t) htons(buf_len);
277
278 headerp->reqnum = htonl(reqnum);
279
280 headerp->flags = htonl(flags);
281
282 headerp->pad = htonl(pad);
283
284 theOutICPAddr.GetInAddr( *((struct in_addr*)&headerp->shostid) );
285
286 urloffset = buf + sizeof(icp_common_t);
287
288 if (opcode == ICP_QUERY)
289 urloffset += sizeof(uint32_t);
290
291 xmemcpy(urloffset, url, strlen(url));
292
293 return (icp_common_t *)buf;
294 }
295
296 int
297 icpUdpSend(int fd,
298 const Ip::Address &to,
299 icp_common_t * msg,
300 log_type logcode,
301 int delay)
302 {
303 icpUdpData *queue;
304 int x;
305 int len;
306 len = (int) ntohs(msg->length);
307 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
308 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
309
310 x = comm_udp_sendto(fd, to, msg, len);
311
312 if (x >= 0) {
313 /* successfully written */
314 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
315 icpCount(msg, SENT, (size_t) len, delay);
316 safe_free(msg);
317 } else if (0 == delay) {
318 /* send failed, but queue it */
319 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
320 queue->address = to;
321 queue->msg = msg;
322 queue->len = (int) ntohs(msg->length);
323 queue->queue_time = current_time;
324 queue->logcode = logcode;
325
326 if (IcpQueueHead == NULL) {
327 IcpQueueHead = queue;
328 IcpQueueTail = queue;
329 } else if (IcpQueueTail == IcpQueueHead) {
330 IcpQueueTail = queue;
331 IcpQueueHead->next = queue;
332 } else {
333 IcpQueueTail->next = queue;
334 IcpQueueTail = queue;
335 }
336
337 commSetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
338 statCounter.icp.replies_queued++;
339 } else {
340 /* don't queue it */
341 statCounter.icp.replies_dropped++;
342 }
343
344 return x;
345 }
346
347 int
348 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
349 {
350 if (e == NULL)
351 return 0;
352
353 if (!e->validToSend())
354 return 0;
355
356 if (Config.onoff.icp_hit_stale)
357 return 1;
358
359 if (refreshCheckICP(e, request))
360 return 0;
361
362 return 1;
363 }
364
365 /**
366 * This routine selects an ICP opcode for ICP misses.
367 *
368 \retval ICP_ERR no opcode selected here
369 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
370 */
371 icp_opcode
372 icpGetCommonOpcode()
373 {
374 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
375
376 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
377 hit_only_mode_until > squid_curtime) {
378 return ICP_MISS_NOFETCH;
379 }
380
381 return ICP_ERR;
382 }
383
384 log_type
385 icpLogFromICPCode(icp_opcode opcode)
386 {
387 if (opcode == ICP_ERR)
388 return LOG_UDP_INVALID;
389
390 if (opcode == ICP_DENIED)
391 return LOG_UDP_DENIED;
392
393 if (opcode == ICP_HIT)
394 return LOG_UDP_HIT;
395
396 if (opcode == ICP_MISS)
397 return LOG_UDP_MISS;
398
399 if (opcode == ICP_MISS_NOFETCH)
400 return LOG_UDP_MISS_NOFETCH;
401
402 fatal("expected ICP opcode\n");
403
404 return LOG_UDP_INVALID;
405 }
406
407 void
408 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
409 {
410 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
411 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
412 }
413
414 void
415 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
416 {
417 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
418
419 if (clientdbCutoffDenied(from)) {
420 /*
421 * count this DENIED query in the clientdb, even though
422 * we're not sending an ICP reply...
423 */
424 clientdbUpdate(from, LOG_UDP_DENIED, PROTO_ICP, 0);
425 } else {
426 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
427 }
428 }
429
430 int
431 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
432 {
433 /* absent an explicit allow, we deny all */
434 if (!Config.accessList.icp)
435 return 0;
436
437 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
438 checklist.src_addr = from;
439 checklist.my_addr.SetNoAddr();
440 int result = checklist.fastCheck();
441 return result;
442 }
443
444 char const *
445 icpGetUrlToSend(char *url)
446 {
447 if (strpbrk(url, w_space))
448 return rfc1738_escape(url);
449 else
450 return url;
451 }
452
453 HttpRequest *
454 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
455 {
456 if (strpbrk(url, w_space)) {
457 url = rfc1738_escape(url);
458 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
459 return NULL;
460 }
461
462 HttpRequest *result;
463
464 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
465 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
466
467 return result;
468
469 }
470
471 static void
472 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
473 {
474 int rtt = 0;
475 int src_rtt = 0;
476 uint32_t flags = 0;
477 /* We have a valid packet */
478 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
479 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
480
481 if (!icp_request)
482 return;
483
484 HTTPMSGLOCK(icp_request);
485
486 if (!icpAccessAllowed(from, icp_request)) {
487 icpDenyAccess(from, url, header.reqnum, fd);
488 HTTPMSGUNLOCK(icp_request);
489 return;
490 }
491 #if USE_ICMP
492 if (header.flags & ICP_FLAG_SRC_RTT) {
493 rtt = netdbHostRtt(icp_request->GetHost());
494 int hops = netdbHostHops(icp_request->GetHost());
495 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
496
497 if (rtt)
498 flags |= ICP_FLAG_SRC_RTT;
499 }
500 #endif /* USE_ICMP */
501
502 /* The peer is allowed to use this cache */
503 ICP2State *state = new ICP2State (header, icp_request);
504
505 state->fd = fd;
506
507 state->from = from;
508
509 state->url = xstrdup (url);
510
511 state->flags = flags;
512
513 state->rtt = rtt;
514
515 state->src_rtt = src_rtt;
516
517 StoreEntry::getPublic (state, url, METHOD_GET);
518
519 HTTPMSGUNLOCK(icp_request);
520 }
521
522 void
523 _icp_common_t::handleReply(char *buf, Ip::Address &from)
524 {
525 if (neighbors_do_private_keys && reqnum == 0) {
526 debugs(12, 0, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
527 debugs(12, 0, "icpHandleIcpV2: Disabling use of private keys");
528 neighbors_do_private_keys = 0;
529 }
530
531 char *url = buf + sizeof(icp_common_t);
532 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
533
534 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
535 /* call neighborsUdpAck even if ping_status != PING_WAITING */
536 neighborsUdpAck(key, this, from);
537 }
538
539 static void
540 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
541 {
542 if (len <= 0) {
543 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
544 return;
545 }
546
547 icp_common_t header(buf, len);
548 /*
549 * Length field should match the number of bytes read
550 */
551
552 if (len != header.length) {
553 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
554 return;
555 }
556
557 switch (header.opcode) {
558
559 case ICP_QUERY:
560 /* We have a valid packet */
561 doV2Query(fd, from, buf, header);
562 break;
563
564 case ICP_HIT:
565
566 case ICP_DECHO:
567
568 case ICP_MISS:
569
570 case ICP_DENIED:
571
572 case ICP_MISS_NOFETCH:
573 header.handleReply(buf, from);
574 break;
575
576 case ICP_INVALID:
577
578 case ICP_ERR:
579 break;
580
581 default:
582 debugs(12, 0, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
583
584 break;
585 }
586 }
587
588 #ifdef ICP_PKT_DUMP
589 static void
590 icpPktDump(icp_common_t * pkt)
591 {
592 Ip::Address a;
593
594 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
595 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
596 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
597 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
598 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
599 a = (struct in_addr)pkt->shostid;
600 debugs(12, 9, "shostid: " << a );
601 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
602 }
603
604 #endif
605
606 void
607 icpHandleUdp(int sock, void *data)
608 {
609 int *N = &incoming_sockets_accepted;
610
611 Ip::Address from;
612 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
613 int len;
614 int icp_version;
615 int max = INCOMING_ICP_MAX;
616 commSetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
617
618 while (max--) {
619 len = comm_udp_recvfrom(sock,
620 buf,
621 SQUID_UDP_SO_RCVBUF - 1,
622 0,
623 from);
624
625 if (len == 0)
626 break;
627
628 if (len < 0) {
629 if (ignoreErrno(errno))
630 break;
631
632 #ifdef _SQUID_LINUX_
633 /* Some Linux systems seem to set the FD for reading and then
634 * return ECONNREFUSED when sendto() fails and generates an ICMP
635 * port unreachable message. */
636 /* or maybe an EHOSTUNREACH "No route to host" message */
637 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
638 #endif
639
640 debugs(50, 1, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
641
642 break;
643 }
644
645 (*N)++;
646 icpCount(buf, RECV, (size_t) len, 0);
647 buf[len] = '\0';
648 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
649 (unsigned long int)len << " bytes from " << from);
650
651 #ifdef ICP_PACKET_DUMP
652
653 icpPktDump(buf);
654 #endif
655
656 if ((size_t) len < sizeof(icp_common_t)) {
657 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
658 break;
659 }
660
661 icp_version = (int) buf[1]; /* cheat! */
662
663 if (icp_version == ICP_VERSION_2)
664 icpHandleIcpV2(sock, from, buf, len);
665 else if (icp_version == ICP_VERSION_3)
666 icpHandleIcpV3(sock, from, buf, len);
667 else
668 debugs(12, 1, "WARNING: Unused ICP version " << icp_version <<
669 " received from " << from);
670 }
671 }
672
673 void
674 icpConnectionsOpen(void)
675 {
676 uint16_t port;
677 Ip::Address addr;
678
679 if ((port = Config.Port.icp) <= 0)
680 return;
681
682 addr = Config.Addrs.udp_incoming;
683 addr.SetPort(port);
684
685 if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
686 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
687 fatal("ICP port cannot be opened.");
688 }
689 /* split-stack for now requires default IPv4-only ICP */
690 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
691 addr.SetIPv4();
692 }
693
694 AsyncCall::Pointer call = asyncCall(12, 2,
695 "icpIncomingConnectionOpened",
696 IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
697
698 Ipc::StartListening(SOCK_DGRAM,
699 IPPROTO_UDP,
700 addr,
701 COMM_NONBLOCKING,
702 Ipc::fdnInIcpSocket, call);
703
704 addr.SetEmpty(); // clear for next use.
705 addr = Config.Addrs.udp_outgoing;
706 if ( !addr.IsNoAddr() ) {
707 enter_suid();
708 addr.SetPort(port);
709
710 if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
711 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
712 fatal("ICP port cannot be opened.");
713 }
714 /* split-stack for now requires default IPv4-only ICP */
715 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
716 addr.SetIPv4();
717 }
718
719 theOutIcpConnection = comm_open_listener(SOCK_DGRAM,
720 IPPROTO_UDP,
721 addr,
722 COMM_NONBLOCKING,
723 "ICP Port");
724 leave_suid();
725
726 if (theOutIcpConnection < 0)
727 fatal("Cannot open Outgoing ICP Port");
728
729 commSetSelect(theOutIcpConnection,
730 COMM_SELECT_READ,
731 icpHandleUdp,
732 NULL,
733 0);
734
735 debugs(12, 1, "Outgoing ICP messages on port " << addr.GetPort() << ", FD " << theOutIcpConnection << ".");
736
737 fd_note(theOutIcpConnection, "Outgoing ICP socket");
738 icpGetOutgoingIpAddress();
739 }
740 }
741
742 static void
743 icpGetOutgoingIpAddress()
744 {
745 struct addrinfo *xai = NULL;
746 theOutICPAddr.SetEmpty();
747 theOutICPAddr.InitAddrInfo(xai);
748
749 if (getsockname(theOutIcpConnection, xai->ai_addr, &xai->ai_addrlen) < 0)
750 debugs(50, 1, "theOutIcpConnection FD " << theOutIcpConnection << ": getsockname: " << xstrerror());
751 else
752 theOutICPAddr = *xai;
753
754 theOutICPAddr.FreeAddrInfo(xai);
755 }
756
757 static void
758 icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr)
759 {
760 theInIcpConnection = fd;
761
762 if (theInIcpConnection < 0)
763 fatal("Cannot open ICP Port");
764
765 commSetSelect(theInIcpConnection,
766 COMM_SELECT_READ,
767 icpHandleUdp,
768 NULL,
769 0);
770
771 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
772 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
773
774 debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << ".");
775
776 fd_note(theInIcpConnection, "Incoming ICP socket");
777
778 if (Config.Addrs.udp_outgoing.IsNoAddr()) {
779 theOutIcpConnection = theInIcpConnection;
780 icpGetOutgoingIpAddress();
781 }
782 }
783
784 /**
785 * icpConnectionShutdown only closes the 'in' socket if it is
786 * different than the 'out' socket.
787 */
788 void
789 icpConnectionShutdown(void)
790 {
791 if (theInIcpConnection < 0)
792 return;
793
794 if (theInIcpConnection != theOutIcpConnection) {
795 debugs(12, 1, "FD " << theInIcpConnection << " Closing ICP connection");
796 comm_close(theInIcpConnection);
797 }
798
799 /**
800 * Here we set 'theInIcpConnection' to -1 even though the ICP 'in'
801 * and 'out' sockets might be just one FD. This prevents this
802 * function from executing repeatedly. When we are really ready to
803 * exit or restart, main will comm_close the 'out' descriptor.
804 */
805 theInIcpConnection = -1;
806
807 /**
808 * Normally we only write to the outgoing ICP socket, but
809 * we also have a read handler there to catch messages sent
810 * to that specific interface. During shutdown, we must
811 * disable reading on the outgoing socket.
812 */
813 assert(theOutIcpConnection > -1);
814
815 commSetSelect(theOutIcpConnection, COMM_SELECT_READ, NULL, NULL, 0);
816 }
817
818 void
819 icpConnectionClose(void)
820 {
821 icpConnectionShutdown();
822
823 if (theOutIcpConnection > -1) {
824 debugs(12, 1, "FD " << theOutIcpConnection << " Closing ICP connection");
825 comm_close(theOutIcpConnection);
826 theOutIcpConnection = -1;
827 }
828 }
829
830 static void
831 icpCount(void *buf, int which, size_t len, int delay)
832 {
833 icp_common_t *icp = (icp_common_t *) buf;
834
835 if (len < sizeof(*icp))
836 return;
837
838 if (SENT == which) {
839 statCounter.icp.pkts_sent++;
840 kb_incr(&statCounter.icp.kbytes_sent, len);
841
842 if (ICP_QUERY == icp->opcode) {
843 statCounter.icp.queries_sent++;
844 kb_incr(&statCounter.icp.q_kbytes_sent, len);
845 } else {
846 statCounter.icp.replies_sent++;
847 kb_incr(&statCounter.icp.r_kbytes_sent, len);
848 /* this is the sent-reply service time */
849 statHistCount(&statCounter.icp.reply_svc_time, delay);
850 }
851
852 if (ICP_HIT == icp->opcode)
853 statCounter.icp.hits_sent++;
854 } else if (RECV == which) {
855 statCounter.icp.pkts_recv++;
856 kb_incr(&statCounter.icp.kbytes_recv, len);
857
858 if (ICP_QUERY == icp->opcode) {
859 statCounter.icp.queries_recv++;
860 kb_incr(&statCounter.icp.q_kbytes_recv, len);
861 } else {
862 statCounter.icp.replies_recv++;
863 kb_incr(&statCounter.icp.r_kbytes_recv, len);
864 /* statCounter.icp.query_svc_time set in clientUpdateCounters */
865 }
866
867 if (ICP_HIT == icp->opcode)
868 statCounter.icp.hits_recv++;
869 }
870 }
871
872 #define N_QUERIED_KEYS 8192
873 #define N_QUERIED_KEYS_MASK 8191
874 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
875
876 int
877 icpSetCacheKey(const cache_key * key)
878 {
879 static int reqnum = 0;
880
881 if (++reqnum < 0)
882 reqnum = 1;
883
884 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
885
886 return reqnum;
887 }
888
889 const cache_key *
890 icpGetCacheKey(const char *url, int reqnum)
891 {
892 if (neighbors_do_private_keys && reqnum)
893 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
894
895 return storeKeyPublic(url, METHOD_GET);
896 }