]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Merge form trunk
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "AccessLogEntry.h"
40 #include "acl/Acl.h"
41 #include "acl/FilledChecklist.h"
42 #include "comm/Connection.h"
43 #include "HttpRequest.h"
44 #include "icmp/net_db.h"
45 #include "ICP.h"
46 #include "ip/Address.h"
47 #include "ip/tools.h"
48 #include "ipc/StartListening.h"
49 #include "rfc1738.h"
50 #include "Store.h"
51 #include "SquidTime.h"
52 #include "SwapDir.h"
53 #include "wordlist.h"
54
55 /// dials icpIncomingConnectionOpened call
56 class IcpListeningStartedDialer: public CallDialer,
57 public Ipc::StartListeningCb
58 {
59 public:
60 typedef void (*Handler)(int errNo);
61 IcpListeningStartedDialer(Handler aHandler):
62 handler(aHandler) {}
63
64 virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
65 virtual bool canDial(AsyncCall &) const { return true; }
66 virtual void dial(AsyncCall &) { (handler)(errNo); }
67
68 public:
69 Handler handler;
70 };
71
72 static void icpIncomingConnectionOpened(int errNo);
73
74 /// \ingroup ServerProtocolICPInternal2
75 static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
76
77 /// \ingroup ServerProtocolICPInternal2
78 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
79
80 /// \ingroup ServerProtocolICPInternal2
81 static void icpCount(void *, int, size_t, int);
82
83 /// \ingroup ServerProtocolICPInternal2
84 static void icpGetOutgoingIpAddress();
85
86 /**
87 \ingroup ServerProtocolICPInternal2
88 * IcpQueueHead is global so comm_incoming() knows whether or not
89 * to call icpUdpSendQueue.
90 */
91 static icpUdpData *IcpQueueHead = NULL;
92 /// \ingroup ServerProtocolICPInternal2
93 static icpUdpData *IcpQueueTail = NULL;
94
95 /// \ingroup ServerProtocolICPInternal2
96 Comm::ConnectionPointer icpIncomingConn = NULL;
97 /// \ingroup ServerProtocolICPInternal2
98 Comm::ConnectionPointer icpOutgoingConn = NULL;
99
100 /** \ingroup ServerProtocolICPInternal2
101 * ICP v2 uses the outgoing address as host ID.
102 * NP: this *may* be identical to icpOutgoingConn->local
103 * but when IN/OUT sockets are shared we can't guarantee that
104 * so a separate variable is used for now.
105 *
106 * We have one for private use (sent only by this local cache)
107 * and one for public use (for external caches to contact us)
108 */
109 Ip::Address theIcpPrivateHostID;
110
111 /// \see theIcpPrivateHostID
112 Ip::Address theIcpPublicHostID;
113
114
115 /* icp_common_t */
116 _icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
117 {}
118
119 _icp_common_t::_icp_common_t(char *buf, unsigned int len)
120 {
121 if (len < sizeof(_icp_common_t)) {
122 /* mark as invalid */
123 length = len + 1;
124 return;
125 }
126
127 xmemcpy(this, buf, sizeof(icp_common_t));
128 /*
129 * Convert network order sensitive fields
130 */
131 length = ntohs(length);
132 reqnum = ntohl(reqnum);
133 flags = ntohl(flags);
134 pad = ntohl(pad);
135 }
136
137 icp_opcode
138 _icp_common_t::getOpCode() const
139 {
140 if (opcode > (char)ICP_END)
141 return ICP_INVALID;
142
143 return (icp_opcode)opcode;
144 }
145
146 /* ICPState */
147
148 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
149 header(aHeader),
150 request(HTTPMSGLOCK(aRequest)),
151 fd(-1),
152 url(NULL)
153 {}
154
155 ICPState::~ICPState()
156 {
157 safe_free(url);
158 HTTPMSGUNLOCK(request);
159 }
160
161
162 /* End ICPState */
163
164 /* ICP2State */
165
166 /// \ingroup ServerProtocolICPInternal2
167 class ICP2State : public ICPState, public StoreClient
168 {
169
170 public:
171 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
172 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
173
174 ~ICP2State();
175 void created(StoreEntry * newEntry);
176
177 int rtt;
178 int src_rtt;
179 uint32_t flags;
180 };
181
182 ICP2State::~ICP2State()
183 {}
184
185 void
186 ICP2State::created(StoreEntry *newEntry)
187 {
188 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
189 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
190 icp_opcode codeToSend;
191
192 if (icpCheckUdpHit(entry, request)) {
193 codeToSend = ICP_HIT;
194 } else {
195 #if USE_ICMP
196 if (Config.onoff.test_reachability && rtt == 0) {
197 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
198 netdbPingSite(request->GetHost());
199 }
200 #endif /* USE_ICMP */
201
202 if (icpGetCommonOpcode() != ICP_ERR)
203 codeToSend = icpGetCommonOpcode();
204 else if (Config.onoff.test_reachability && rtt == 0)
205 codeToSend = ICP_MISS_NOFETCH;
206 else
207 codeToSend = ICP_MISS;
208 }
209
210 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
211 delete this;
212 }
213
214 /* End ICP2State */
215
216 /// \ingroup ServerProtocolICPInternal2
217 static void
218 icpLogIcp(const Ip::Address &caddr, log_type logcode, int len, const char *url, int delay)
219 {
220 AccessLogEntry al;
221
222 if (LOG_TAG_NONE == logcode)
223 return;
224
225 if (LOG_ICP_QUERY == logcode)
226 return;
227
228 clientdbUpdate(caddr, logcode, PROTO_ICP, len);
229
230 if (!Config.onoff.log_udp)
231 return;
232
233 al.icp.opcode = ICP_QUERY;
234
235 al.url = url;
236
237 al.cache.caddr = caddr;
238
239 al.cache.replySize = len;
240
241 al.cache.code = logcode;
242
243 al.cache.msec = delay;
244
245 accessLogLog(&al, NULL);
246 }
247
248 /// \ingroup ServerProtocolICPInternal2
249 void
250 icpUdpSendQueue(int fd, void *unused)
251 {
252 icpUdpData *q;
253
254 while ((q = IcpQueueHead) != NULL) {
255 int delay = tvSubUsec(q->queue_time, current_time);
256 /* increment delay to prevent looping */
257 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
258 IcpQueueHead = q->next;
259 xfree(q);
260
261 if (x < 0)
262 break;
263 }
264 }
265
266 _icp_common_t *
267 _icp_common_t::createMessage(
268 icp_opcode opcode,
269 int flags,
270 const char *url,
271 int reqnum,
272 int pad)
273 {
274 char *buf = NULL;
275 icp_common_t *headerp = NULL;
276 char *urloffset = NULL;
277 int buf_len;
278 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
279
280 if (opcode == ICP_QUERY)
281 buf_len += sizeof(uint32_t);
282
283 buf = (char *) xcalloc(buf_len, 1);
284
285 headerp = (icp_common_t *) (void *) buf;
286
287 headerp->opcode = (char) opcode;
288
289 headerp->version = ICP_VERSION_CURRENT;
290
291 headerp->length = (uint16_t) htons(buf_len);
292
293 headerp->reqnum = htonl(reqnum);
294
295 headerp->flags = htonl(flags);
296
297 headerp->pad = htonl(pad);
298
299 theIcpPrivateHostID.GetInAddr( *((struct in_addr*)&headerp->shostid) );
300
301 urloffset = buf + sizeof(icp_common_t);
302
303 if (opcode == ICP_QUERY)
304 urloffset += sizeof(uint32_t);
305
306 xmemcpy(urloffset, url, strlen(url));
307
308 return (icp_common_t *)buf;
309 }
310
311 int
312 icpUdpSend(int fd,
313 const Ip::Address &to,
314 icp_common_t * msg,
315 log_type logcode,
316 int delay)
317 {
318 icpUdpData *queue;
319 int x;
320 int len;
321 len = (int) ntohs(msg->length);
322 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
323 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
324
325 x = comm_udp_sendto(fd, to, msg, len);
326
327 if (x >= 0) {
328 /* successfully written */
329 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
330 icpCount(msg, SENT, (size_t) len, delay);
331 safe_free(msg);
332 } else if (0 == delay) {
333 /* send failed, but queue it */
334 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
335 queue->address = to;
336 queue->msg = msg;
337 queue->len = (int) ntohs(msg->length);
338 queue->queue_time = current_time;
339 queue->logcode = logcode;
340
341 if (IcpQueueHead == NULL) {
342 IcpQueueHead = queue;
343 IcpQueueTail = queue;
344 } else if (IcpQueueTail == IcpQueueHead) {
345 IcpQueueTail = queue;
346 IcpQueueHead->next = queue;
347 } else {
348 IcpQueueTail->next = queue;
349 IcpQueueTail = queue;
350 }
351
352 commSetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
353 statCounter.icp.replies_queued++;
354 } else {
355 /* don't queue it */
356 statCounter.icp.replies_dropped++;
357 }
358
359 return x;
360 }
361
362 int
363 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
364 {
365 if (e == NULL)
366 return 0;
367
368 if (!e->validToSend())
369 return 0;
370
371 if (Config.onoff.icp_hit_stale)
372 return 1;
373
374 if (refreshCheckICP(e, request))
375 return 0;
376
377 return 1;
378 }
379
380 /**
381 * This routine selects an ICP opcode for ICP misses.
382 *
383 \retval ICP_ERR no opcode selected here
384 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
385 */
386 icp_opcode
387 icpGetCommonOpcode()
388 {
389 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
390
391 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
392 hit_only_mode_until > squid_curtime) {
393 return ICP_MISS_NOFETCH;
394 }
395
396 return ICP_ERR;
397 }
398
399 log_type
400 icpLogFromICPCode(icp_opcode opcode)
401 {
402 if (opcode == ICP_ERR)
403 return LOG_UDP_INVALID;
404
405 if (opcode == ICP_DENIED)
406 return LOG_UDP_DENIED;
407
408 if (opcode == ICP_HIT)
409 return LOG_UDP_HIT;
410
411 if (opcode == ICP_MISS)
412 return LOG_UDP_MISS;
413
414 if (opcode == ICP_MISS_NOFETCH)
415 return LOG_UDP_MISS_NOFETCH;
416
417 fatal("expected ICP opcode\n");
418
419 return LOG_UDP_INVALID;
420 }
421
422 void
423 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
424 {
425 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
426 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
427 }
428
429 void
430 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
431 {
432 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
433
434 if (clientdbCutoffDenied(from)) {
435 /*
436 * count this DENIED query in the clientdb, even though
437 * we're not sending an ICP reply...
438 */
439 clientdbUpdate(from, LOG_UDP_DENIED, PROTO_ICP, 0);
440 } else {
441 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
442 }
443 }
444
445 int
446 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
447 {
448 /* absent an explicit allow, we deny all */
449 if (!Config.accessList.icp)
450 return 0;
451
452 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
453 checklist.src_addr = from;
454 checklist.my_addr.SetNoAddr();
455 int result = checklist.fastCheck();
456 return result;
457 }
458
459 char const *
460 icpGetUrlToSend(char *url)
461 {
462 if (strpbrk(url, w_space))
463 return rfc1738_escape(url);
464 else
465 return url;
466 }
467
468 HttpRequest *
469 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
470 {
471 if (strpbrk(url, w_space)) {
472 url = rfc1738_escape(url);
473 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
474 return NULL;
475 }
476
477 HttpRequest *result;
478
479 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
480 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
481
482 return result;
483
484 }
485
486 static void
487 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
488 {
489 int rtt = 0;
490 int src_rtt = 0;
491 uint32_t flags = 0;
492 /* We have a valid packet */
493 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
494 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
495
496 if (!icp_request)
497 return;
498
499 HTTPMSGLOCK(icp_request);
500
501 if (!icpAccessAllowed(from, icp_request)) {
502 icpDenyAccess(from, url, header.reqnum, fd);
503 HTTPMSGUNLOCK(icp_request);
504 return;
505 }
506 #if USE_ICMP
507 if (header.flags & ICP_FLAG_SRC_RTT) {
508 rtt = netdbHostRtt(icp_request->GetHost());
509 int hops = netdbHostHops(icp_request->GetHost());
510 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
511
512 if (rtt)
513 flags |= ICP_FLAG_SRC_RTT;
514 }
515 #endif /* USE_ICMP */
516
517 /* The peer is allowed to use this cache */
518 ICP2State *state = new ICP2State (header, icp_request);
519
520 state->fd = fd;
521
522 state->from = from;
523
524 state->url = xstrdup (url);
525
526 state->flags = flags;
527
528 state->rtt = rtt;
529
530 state->src_rtt = src_rtt;
531
532 StoreEntry::getPublic (state, url, METHOD_GET);
533
534 HTTPMSGUNLOCK(icp_request);
535 }
536
537 void
538 _icp_common_t::handleReply(char *buf, Ip::Address &from)
539 {
540 if (neighbors_do_private_keys && reqnum == 0) {
541 debugs(12, 0, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
542 debugs(12, 0, "icpHandleIcpV2: Disabling use of private keys");
543 neighbors_do_private_keys = 0;
544 }
545
546 char *url = buf + sizeof(icp_common_t);
547 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
548
549 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
550 /* call neighborsUdpAck even if ping_status != PING_WAITING */
551 neighborsUdpAck(key, this, from);
552 }
553
554 static void
555 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
556 {
557 if (len <= 0) {
558 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
559 return;
560 }
561
562 icp_common_t header(buf, len);
563 /*
564 * Length field should match the number of bytes read
565 */
566
567 if (len != header.length) {
568 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
569 return;
570 }
571
572 switch (header.opcode) {
573
574 case ICP_QUERY:
575 /* We have a valid packet */
576 doV2Query(fd, from, buf, header);
577 break;
578
579 case ICP_HIT:
580
581 case ICP_DECHO:
582
583 case ICP_MISS:
584
585 case ICP_DENIED:
586
587 case ICP_MISS_NOFETCH:
588 header.handleReply(buf, from);
589 break;
590
591 case ICP_INVALID:
592
593 case ICP_ERR:
594 break;
595
596 default:
597 debugs(12, 0, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
598
599 break;
600 }
601 }
602
603 #ifdef ICP_PKT_DUMP
604 static void
605 icpPktDump(icp_common_t * pkt)
606 {
607 Ip::Address a;
608
609 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
610 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
611 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
612 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
613 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
614 a = (struct in_addr)pkt->shostid;
615 debugs(12, 9, "shostid: " << a );
616 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
617 }
618
619 #endif
620
621 void
622 icpHandleUdp(int sock, void *data)
623 {
624 int *N = &incoming_sockets_accepted;
625
626 Ip::Address from;
627 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
628 int len;
629 int icp_version;
630 int max = INCOMING_ICP_MAX;
631 commSetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
632
633 while (max--) {
634 len = comm_udp_recvfrom(sock,
635 buf,
636 SQUID_UDP_SO_RCVBUF - 1,
637 0,
638 from);
639
640 if (len == 0)
641 break;
642
643 if (len < 0) {
644 if (ignoreErrno(errno))
645 break;
646
647 #ifdef _SQUID_LINUX_
648 /* Some Linux systems seem to set the FD for reading and then
649 * return ECONNREFUSED when sendto() fails and generates an ICMP
650 * port unreachable message. */
651 /* or maybe an EHOSTUNREACH "No route to host" message */
652 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
653 #endif
654
655 debugs(50, 1, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
656
657 break;
658 }
659
660 (*N)++;
661 icpCount(buf, RECV, (size_t) len, 0);
662 buf[len] = '\0';
663 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
664 (unsigned long int)len << " bytes from " << from);
665
666 #ifdef ICP_PACKET_DUMP
667
668 icpPktDump(buf);
669 #endif
670
671 if ((size_t) len < sizeof(icp_common_t)) {
672 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
673 break;
674 }
675
676 icp_version = (int) buf[1]; /* cheat! */
677
678 if (icp_version == ICP_VERSION_2)
679 icpHandleIcpV2(sock, from, buf, len);
680 else if (icp_version == ICP_VERSION_3)
681 icpHandleIcpV3(sock, from, buf, len);
682 else
683 debugs(12, 1, "WARNING: Unused ICP version " << icp_version <<
684 " received from " << from);
685 }
686 }
687
688 void
689 icpConnectionsOpen(void)
690 {
691 uint16_t port;
692
693 if ((port = Config.Port.icp) <= 0)
694 return;
695
696 icpIncomingConn = new Comm::Connection;
697 icpIncomingConn->local = Config.Addrs.udp_incoming;
698 icpIncomingConn->local.SetPort(port);
699
700 if (!Ip::EnableIpv6 && !icpIncomingConn->local.SetIPv4()) {
701 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
702 fatal("ICP port cannot be opened.");
703 }
704 /* split-stack for now requires default IPv4-only ICP */
705 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.IsAnyAddr()) {
706 icpIncomingConn->local.SetIPv4();
707 }
708
709 AsyncCall::Pointer call = asyncCall(12, 2,
710 "icpIncomingConnectionOpened",
711 IcpListeningStartedDialer(&icpIncomingConnectionOpened));
712
713 Ipc::StartListening(SOCK_DGRAM,
714 IPPROTO_UDP,
715 icpIncomingConn,
716 Ipc::fdnInIcpSocket, call, Subscription::Pointer());
717
718 if ( !Config.Addrs.udp_outgoing.IsNoAddr() ) {
719 icpOutgoingConn = new Comm::Connection;
720 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
721 icpOutgoingConn->local.SetPort(port);
722
723 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.SetIPv4()) {
724 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
725 fatal("ICP port cannot be opened.");
726 }
727 /* split-stack for now requires default IPv4-only ICP */
728 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.IsAnyAddr()) {
729 icpOutgoingConn->local.SetIPv4();
730 }
731
732 enter_suid();
733 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
734 leave_suid();
735
736 if (!Comm::IsConnOpen(icpOutgoingConn))
737 fatal("Cannot open Outgoing ICP Port");
738
739 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
740
741 commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
742 fd_note(theOutIcpConnection, "Outgoing ICP socket");
743 icpGetOutgoingIpAddress();
744 }
745 }
746
747 // Ensure that we have the IP address(es) to use for Host ID.
748 // The outgoing address is used as 'private' host ID used only on packets we send
749 static void
750 icpGetOutgoingIpAddress()
751 {
752 struct addrinfo *xai = NULL;
753 theOutICPAddr.SetEmpty();
754 theIcpPrivateHostID.InitAddrInfo(xai);
755 if (getsockname(icpOutgoingConn->fd, xai->ai_addr, &xai->ai_addrlen) < 0)
756 debugs(50, DBG_IMPORTANT, "ERROR: Unable to identify ICP host ID to use for " << icpOutgoingConn
757 << ": getsockname: " << xstrerror());
758 else
759 theIcpPrivateHostID = *xai;
760 theIcpPrivateHostID.FreeAddrInfo(xai);
761 }
762
763 static void
764 icpIncomingConnectionOpened(int errNo)
765 {
766 if (!Comm::IsConnOpen(icpIncomingConn))
767 fatal("Cannot open ICP Port");
768
769 commSetSelect(icpIncomingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
770
771 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
772 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the icpIncomingConn for mcastJoinGroups usage.
773
774 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << icpIncomingConn->local);
775
776 fd_note(icpIncomingConn->fd, "Incoming ICP port");
777
778 if (Config.Addrs.udp_outgoing.IsNoAddr()) {
779 icpOutgoingConn = icpIncomingConn;
780 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
781 icpGetOutgoingIpAddress();
782 }
783
784 // Ensure that we have the IP address(es) to use for Host ID.
785 // The listening address is used as 'public' host ID which can be used to contact us
786 struct addrinfo *xai = NULL;
787 theIcpPublicHostID.InitAddrInfo(xai); // reset xai
788 if (getsockname(icpIncomingConn->fd, xai->ai_addr, &xai->ai_addrlen) < 0)
789 debugs(50, DBG_IMPORTANT, "ERROR: Unable to identify ICP host ID to use for " << icpIncomingConn
790 << ": getsockname: " << xstrerror());
791 else
792 theIcpPublicHostID = *xai;
793 theIcpPublicHostID.FreeAddrInfo(xai);
794 }
795
796 /**
797 * icpConnectionShutdown only closes the 'in' socket if it is
798 * different than the 'out' socket.
799 */
800 void
801 icpConnectionShutdown(void)
802 {
803 if (!Comm::IsConnOpen(icpIncomingConn))
804 return;
805
806 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
807
808 /** Release the 'in' socket for lazy closure.
809 * in and out sockets may be sharing one same FD.
810 * This prevents this function from executing repeatedly.
811 */
812 icpIncomingConn = NULL;
813
814 /**
815 * Normally we only write to the outgoing ICP socket, but
816 * we also have a read handler there to catch messages sent
817 * to that specific interface. During shutdown, we must
818 * disable reading on the outgoing socket.
819 */
820 assert(Comm::IsConnOpen(icpOutgoingConn));
821
822 commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
823 }
824
825 void
826 icpConnectionClose(void)
827 {
828 icpConnectionShutdown();
829
830 if (icpOutgoingConn != NULL) {
831 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
832 icpOutgoingConn = NULL;
833 }
834 }
835
836 static void
837 icpCount(void *buf, int which, size_t len, int delay)
838 {
839 icp_common_t *icp = (icp_common_t *) buf;
840
841 if (len < sizeof(*icp))
842 return;
843
844 if (SENT == which) {
845 statCounter.icp.pkts_sent++;
846 kb_incr(&statCounter.icp.kbytes_sent, len);
847
848 if (ICP_QUERY == icp->opcode) {
849 statCounter.icp.queries_sent++;
850 kb_incr(&statCounter.icp.q_kbytes_sent, len);
851 } else {
852 statCounter.icp.replies_sent++;
853 kb_incr(&statCounter.icp.r_kbytes_sent, len);
854 /* this is the sent-reply service time */
855 statHistCount(&statCounter.icp.reply_svc_time, delay);
856 }
857
858 if (ICP_HIT == icp->opcode)
859 statCounter.icp.hits_sent++;
860 } else if (RECV == which) {
861 statCounter.icp.pkts_recv++;
862 kb_incr(&statCounter.icp.kbytes_recv, len);
863
864 if (ICP_QUERY == icp->opcode) {
865 statCounter.icp.queries_recv++;
866 kb_incr(&statCounter.icp.q_kbytes_recv, len);
867 } else {
868 statCounter.icp.replies_recv++;
869 kb_incr(&statCounter.icp.r_kbytes_recv, len);
870 /* statCounter.icp.query_svc_time set in clientUpdateCounters */
871 }
872
873 if (ICP_HIT == icp->opcode)
874 statCounter.icp.hits_recv++;
875 }
876 }
877
878 #define N_QUERIED_KEYS 8192
879 #define N_QUERIED_KEYS_MASK 8191
880 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
881
882 int
883 icpSetCacheKey(const cache_key * key)
884 {
885 static int reqnum = 0;
886
887 if (++reqnum < 0)
888 reqnum = 1;
889
890 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
891
892 return reqnum;
893 }
894
895 const cache_key *
896 icpGetCacheKey(const char *url, int reqnum)
897 {
898 if (neighbors_do_private_keys && reqnum)
899 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
900
901 return storeKeyPublic(url, METHOD_GET);
902 }