]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Merged from trunk
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "AccessLogEntry.h"
40 #include "acl/Acl.h"
41 #include "acl/FilledChecklist.h"
42 #include "client_db.h"
43 #include "comm.h"
44 #include "comm/Connection.h"
45 #include "comm/Loops.h"
46 #include "comm/UdpOpenDialer.h"
47 #include "fd.h"
48 #include "HttpRequest.h"
49 #include "icmp/net_db.h"
50 #include "ICP.h"
51 #include "ip/Address.h"
52 #include "ip/tools.h"
53 #include "ipcache.h"
54 #include "md5.h"
55 #include "neighbors.h"
56 #include "protos.h"
57 #include "refresh.h"
58 #include "rfc1738.h"
59 #include "SquidTime.h"
60 #include "StatCounters.h"
61 #include "Store.h"
62 #include "store_key_md5.h"
63 #include "SwapDir.h"
64 #include "tools.h"
65 #include "wordlist.h"
66
67 #if HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
72
73 /// \ingroup ServerProtocolICPInternal2
74 static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
75
76 /// \ingroup ServerProtocolICPInternal2
77 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
78
79 /// \ingroup ServerProtocolICPInternal2
80 static void icpCount(void *, int, size_t, int);
81
82 /**
83 \ingroup ServerProtocolICPInternal2
84 * IcpQueueHead is global so comm_incoming() knows whether or not
85 * to call icpUdpSendQueue.
86 */
87 static icpUdpData *IcpQueueHead = NULL;
88 /// \ingroup ServerProtocolICPInternal2
89 static icpUdpData *IcpQueueTail = NULL;
90
91 /// \ingroup ServerProtocolICPInternal2
92 Comm::ConnectionPointer icpIncomingConn = NULL;
93 /// \ingroup ServerProtocolICPInternal2
94 Comm::ConnectionPointer icpOutgoingConn = NULL;
95
96 /* icp_common_t */
97 _icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
98 {}
99
100 _icp_common_t::_icp_common_t(char *buf, unsigned int len)
101 {
102 if (len < sizeof(_icp_common_t)) {
103 /* mark as invalid */
104 length = len + 1;
105 return;
106 }
107
108 memcpy(this, buf, sizeof(icp_common_t));
109 /*
110 * Convert network order sensitive fields
111 */
112 length = ntohs(length);
113 reqnum = ntohl(reqnum);
114 flags = ntohl(flags);
115 pad = ntohl(pad);
116 }
117
118 icp_opcode
119 _icp_common_t::getOpCode() const
120 {
121 if (opcode > (char)ICP_END)
122 return ICP_INVALID;
123
124 return (icp_opcode)opcode;
125 }
126
127 /* ICPState */
128
129 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
130 header(aHeader),
131 request(HTTPMSGLOCK(aRequest)),
132 fd(-1),
133 url(NULL)
134 {}
135
136 ICPState::~ICPState()
137 {
138 safe_free(url);
139 HTTPMSGUNLOCK(request);
140 }
141
142 /* End ICPState */
143
144 /* ICP2State */
145
146 /// \ingroup ServerProtocolICPInternal2
147 class ICP2State : public ICPState, public StoreClient
148 {
149
150 public:
151 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
152 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
153
154 ~ICP2State();
155 void created(StoreEntry * newEntry);
156
157 int rtt;
158 int src_rtt;
159 uint32_t flags;
160 };
161
162 ICP2State::~ICP2State()
163 {}
164
165 void
166 ICP2State::created(StoreEntry *newEntry)
167 {
168 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
169 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
170 icp_opcode codeToSend;
171
172 if (icpCheckUdpHit(entry, request)) {
173 codeToSend = ICP_HIT;
174 } else {
175 #if USE_ICMP
176 if (Config.onoff.test_reachability && rtt == 0) {
177 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
178 netdbPingSite(request->GetHost());
179 }
180 #endif /* USE_ICMP */
181
182 if (icpGetCommonOpcode() != ICP_ERR)
183 codeToSend = icpGetCommonOpcode();
184 else if (Config.onoff.test_reachability && rtt == 0)
185 codeToSend = ICP_MISS_NOFETCH;
186 else
187 codeToSend = ICP_MISS;
188 }
189
190 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
191 delete this;
192 }
193
194 /* End ICP2State */
195
196 /// \ingroup ServerProtocolICPInternal2
197 static void
198 icpLogIcp(const Ip::Address &caddr, log_type logcode, int len, const char *url, int delay)
199 {
200 AccessLogEntry::Pointer al = new AccessLogEntry();
201
202 if (LOG_TAG_NONE == logcode)
203 return;
204
205 if (LOG_ICP_QUERY == logcode)
206 return;
207
208 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
209
210 if (!Config.onoff.log_udp)
211 return;
212
213 al->icp.opcode = ICP_QUERY;
214
215 al->url = url;
216
217 al->cache.caddr = caddr;
218
219 al->cache.replySize = len;
220
221 al->cache.code = logcode;
222
223 al->cache.msec = delay;
224
225 accessLogLog(al, NULL);
226 }
227
228 /// \ingroup ServerProtocolICPInternal2
229 void
230 icpUdpSendQueue(int fd, void *unused)
231 {
232 icpUdpData *q;
233
234 while ((q = IcpQueueHead) != NULL) {
235 int delay = tvSubUsec(q->queue_time, current_time);
236 /* increment delay to prevent looping */
237 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
238 IcpQueueHead = q->next;
239 xfree(q);
240
241 if (x < 0)
242 break;
243 }
244 }
245
246 _icp_common_t *
247 _icp_common_t::createMessage(
248 icp_opcode opcode,
249 int flags,
250 const char *url,
251 int reqnum,
252 int pad)
253 {
254 char *buf = NULL;
255 icp_common_t *headerp = NULL;
256 char *urloffset = NULL;
257 int buf_len;
258 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
259
260 if (opcode == ICP_QUERY)
261 buf_len += sizeof(uint32_t);
262
263 buf = (char *) xcalloc(buf_len, 1);
264
265 headerp = (icp_common_t *) (void *) buf;
266
267 headerp->opcode = (char) opcode;
268
269 headerp->version = ICP_VERSION_CURRENT;
270
271 headerp->length = (uint16_t) htons(buf_len);
272
273 headerp->reqnum = htonl(reqnum);
274
275 headerp->flags = htonl(flags);
276
277 headerp->pad = htonl(pad);
278
279 headerp->shostid = 0;
280
281 urloffset = buf + sizeof(icp_common_t);
282
283 if (opcode == ICP_QUERY)
284 urloffset += sizeof(uint32_t);
285
286 memcpy(urloffset, url, strlen(url));
287
288 return (icp_common_t *)buf;
289 }
290
291 int
292 icpUdpSend(int fd,
293 const Ip::Address &to,
294 icp_common_t * msg,
295 log_type logcode,
296 int delay)
297 {
298 icpUdpData *queue;
299 int x;
300 int len;
301 len = (int) ntohs(msg->length);
302 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
303 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
304
305 x = comm_udp_sendto(fd, to, msg, len);
306
307 if (x >= 0) {
308 /* successfully written */
309 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
310 icpCount(msg, SENT, (size_t) len, delay);
311 safe_free(msg);
312 } else if (0 == delay) {
313 /* send failed, but queue it */
314 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
315 queue->address = to;
316 queue->msg = msg;
317 queue->len = (int) ntohs(msg->length);
318 queue->queue_time = current_time;
319 queue->logcode = logcode;
320
321 if (IcpQueueHead == NULL) {
322 IcpQueueHead = queue;
323 IcpQueueTail = queue;
324 } else if (IcpQueueTail == IcpQueueHead) {
325 IcpQueueTail = queue;
326 IcpQueueHead->next = queue;
327 } else {
328 IcpQueueTail->next = queue;
329 IcpQueueTail = queue;
330 }
331
332 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
333 ++statCounter.icp.replies_queued;
334 } else {
335 /* don't queue it */
336 ++statCounter.icp.replies_dropped;
337 }
338
339 return x;
340 }
341
342 int
343 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
344 {
345 if (e == NULL)
346 return 0;
347
348 if (!e->validToSend())
349 return 0;
350
351 if (Config.onoff.icp_hit_stale)
352 return 1;
353
354 if (refreshCheckICP(e, request))
355 return 0;
356
357 return 1;
358 }
359
360 /**
361 * This routine selects an ICP opcode for ICP misses.
362 *
363 \retval ICP_ERR no opcode selected here
364 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
365 */
366 icp_opcode
367 icpGetCommonOpcode()
368 {
369 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
370
371 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
372 hit_only_mode_until > squid_curtime) {
373 return ICP_MISS_NOFETCH;
374 }
375
376 return ICP_ERR;
377 }
378
379 log_type
380 icpLogFromICPCode(icp_opcode opcode)
381 {
382 if (opcode == ICP_ERR)
383 return LOG_UDP_INVALID;
384
385 if (opcode == ICP_DENIED)
386 return LOG_UDP_DENIED;
387
388 if (opcode == ICP_HIT)
389 return LOG_UDP_HIT;
390
391 if (opcode == ICP_MISS)
392 return LOG_UDP_MISS;
393
394 if (opcode == ICP_MISS_NOFETCH)
395 return LOG_UDP_MISS_NOFETCH;
396
397 fatal("expected ICP opcode\n");
398
399 return LOG_UDP_INVALID;
400 }
401
402 void
403 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
404 {
405 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
406 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
407 }
408
409 void
410 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
411 {
412 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
413
414 if (clientdbCutoffDenied(from)) {
415 /*
416 * count this DENIED query in the clientdb, even though
417 * we're not sending an ICP reply...
418 */
419 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
420 } else {
421 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
422 }
423 }
424
425 bool
426 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
427 {
428 /* absent an explicit allow, we deny all */
429 if (!Config.accessList.icp)
430 return true;
431
432 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
433 checklist.src_addr = from;
434 checklist.my_addr.SetNoAddr();
435 return (checklist.fastCheck() == ACCESS_ALLOWED);
436 }
437
438 char const *
439 icpGetUrlToSend(char *url)
440 {
441 if (strpbrk(url, w_space))
442 return rfc1738_escape(url);
443 else
444 return url;
445 }
446
447 HttpRequest *
448 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
449 {
450 if (strpbrk(url, w_space)) {
451 url = rfc1738_escape(url);
452 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
453 return NULL;
454 }
455
456 HttpRequest *result;
457
458 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
459 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
460
461 return result;
462
463 }
464
465 static void
466 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
467 {
468 int rtt = 0;
469 int src_rtt = 0;
470 uint32_t flags = 0;
471 /* We have a valid packet */
472 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
473 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
474
475 if (!icp_request)
476 return;
477
478 HTTPMSGLOCK(icp_request);
479
480 if (!icpAccessAllowed(from, icp_request)) {
481 icpDenyAccess(from, url, header.reqnum, fd);
482 HTTPMSGUNLOCK(icp_request);
483 return;
484 }
485 #if USE_ICMP
486 if (header.flags & ICP_FLAG_SRC_RTT) {
487 rtt = netdbHostRtt(icp_request->GetHost());
488 int hops = netdbHostHops(icp_request->GetHost());
489 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
490
491 if (rtt)
492 flags |= ICP_FLAG_SRC_RTT;
493 }
494 #endif /* USE_ICMP */
495
496 /* The peer is allowed to use this cache */
497 ICP2State *state = new ICP2State (header, icp_request);
498
499 state->fd = fd;
500
501 state->from = from;
502
503 state->url = xstrdup (url);
504
505 state->flags = flags;
506
507 state->rtt = rtt;
508
509 state->src_rtt = src_rtt;
510
511 StoreEntry::getPublic (state, url, METHOD_GET);
512
513 HTTPMSGUNLOCK(icp_request);
514 }
515
516 void
517 _icp_common_t::handleReply(char *buf, Ip::Address &from)
518 {
519 if (neighbors_do_private_keys && reqnum == 0) {
520 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
521 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
522 neighbors_do_private_keys = 0;
523 }
524
525 char *url = buf + sizeof(icp_common_t);
526 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
527
528 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
529 /* call neighborsUdpAck even if ping_status != PING_WAITING */
530 neighborsUdpAck(key, this, from);
531 }
532
533 static void
534 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
535 {
536 if (len <= 0) {
537 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
538 return;
539 }
540
541 icp_common_t header(buf, len);
542 /*
543 * Length field should match the number of bytes read
544 */
545
546 if (len != header.length) {
547 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
548 return;
549 }
550
551 switch (header.opcode) {
552
553 case ICP_QUERY:
554 /* We have a valid packet */
555 doV2Query(fd, from, buf, header);
556 break;
557
558 case ICP_HIT:
559
560 case ICP_DECHO:
561
562 case ICP_MISS:
563
564 case ICP_DENIED:
565
566 case ICP_MISS_NOFETCH:
567 header.handleReply(buf, from);
568 break;
569
570 case ICP_INVALID:
571
572 case ICP_ERR:
573 break;
574
575 default:
576 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
577
578 break;
579 }
580 }
581
582 #ifdef ICP_PKT_DUMP
583 static void
584 icpPktDump(icp_common_t * pkt)
585 {
586 Ip::Address a;
587
588 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
589 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
590 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
591 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
592 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
593 a = (struct in_addr)pkt->shostid;
594 debugs(12, 9, "shostid: " << a );
595 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
596 }
597
598 #endif
599
600 void
601 icpHandleUdp(int sock, void *data)
602 {
603 int *N = &incoming_sockets_accepted;
604
605 Ip::Address from;
606 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
607 int len;
608 int icp_version;
609 int max = INCOMING_UDP_MAX;
610 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
611
612 while (max) {
613 --max;
614 len = comm_udp_recvfrom(sock,
615 buf,
616 SQUID_UDP_SO_RCVBUF - 1,
617 0,
618 from);
619
620 if (len == 0)
621 break;
622
623 if (len < 0) {
624 if (ignoreErrno(errno))
625 break;
626
627 #if _SQUID_LINUX_
628 /* Some Linux systems seem to set the FD for reading and then
629 * return ECONNREFUSED when sendto() fails and generates an ICMP
630 * port unreachable message. */
631 /* or maybe an EHOSTUNREACH "No route to host" message */
632 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
633 #endif
634
635 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
636
637 break;
638 }
639
640 ++(*N);
641 icpCount(buf, RECV, (size_t) len, 0);
642 buf[len] = '\0';
643 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
644 (unsigned long int)len << " bytes from " << from);
645
646 #ifdef ICP_PACKET_DUMP
647
648 icpPktDump(buf);
649 #endif
650
651 if ((size_t) len < sizeof(icp_common_t)) {
652 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
653 break;
654 }
655
656 icp_version = (int) buf[1]; /* cheat! */
657
658 if (icpOutgoingConn->local == from)
659 // ignore ICP packets which loop back (multicast usually)
660 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
661 else if (icp_version == ICP_VERSION_2)
662 icpHandleIcpV2(sock, from, buf, len);
663 else if (icp_version == ICP_VERSION_3)
664 icpHandleIcpV3(sock, from, buf, len);
665 else
666 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
667 " received from " << from);
668 }
669 }
670
671 void
672 icpOpenPorts(void)
673 {
674 uint16_t port;
675
676 if ((port = Config.Port.icp) <= 0)
677 return;
678
679 icpIncomingConn = new Comm::Connection;
680 icpIncomingConn->local = Config.Addrs.udp_incoming;
681 icpIncomingConn->local.SetPort(port);
682
683 if (!Ip::EnableIpv6 && !icpIncomingConn->local.SetIPv4()) {
684 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
685 fatal("ICP port cannot be opened.");
686 }
687 /* split-stack for now requires default IPv4-only ICP */
688 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.IsAnyAddr()) {
689 icpIncomingConn->local.SetIPv4();
690 }
691
692 AsyncCall::Pointer call = asyncCall(12, 2,
693 "icpIncomingConnectionOpened",
694 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
695
696 Ipc::StartListening(SOCK_DGRAM,
697 IPPROTO_UDP,
698 icpIncomingConn,
699 Ipc::fdnInIcpSocket, call);
700
701 if ( !Config.Addrs.udp_outgoing.IsNoAddr() ) {
702 icpOutgoingConn = new Comm::Connection;
703 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
704 icpOutgoingConn->local.SetPort(port);
705
706 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.SetIPv4()) {
707 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
708 fatal("ICP port cannot be opened.");
709 }
710 /* split-stack for now requires default IPv4-only ICP */
711 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.IsAnyAddr()) {
712 icpOutgoingConn->local.SetIPv4();
713 }
714
715 enter_suid();
716 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
717 leave_suid();
718
719 if (!Comm::IsConnOpen(icpOutgoingConn))
720 fatal("Cannot open Outgoing ICP Port");
721
722 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
723
724 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
725 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
726 }
727 }
728
729 static void
730 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo)
731 {
732 if (!Comm::IsConnOpen(conn))
733 fatal("Cannot open ICP Port");
734
735 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
736
737 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
738 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
739
740 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
741
742 fd_note(conn->fd, "Incoming ICP port");
743
744 if (Config.Addrs.udp_outgoing.IsNoAddr()) {
745 icpOutgoingConn = conn;
746 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
747 }
748 }
749
750 /**
751 * icpConnectionShutdown only closes the 'in' socket if it is
752 * different than the 'out' socket.
753 */
754 void
755 icpConnectionShutdown(void)
756 {
757 if (!Comm::IsConnOpen(icpIncomingConn))
758 return;
759
760 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
761
762 /** Release the 'in' socket for lazy closure.
763 * in and out sockets may be sharing one same FD.
764 * This prevents this function from executing repeatedly.
765 */
766 icpIncomingConn = NULL;
767
768 /**
769 * Normally we only write to the outgoing ICP socket, but
770 * we also have a read handler there to catch messages sent
771 * to that specific interface. During shutdown, we must
772 * disable reading on the outgoing socket.
773 */
774 assert(Comm::IsConnOpen(icpOutgoingConn));
775
776 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
777 }
778
779 void
780 icpClosePorts(void)
781 {
782 icpConnectionShutdown();
783
784 if (icpOutgoingConn != NULL) {
785 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
786 icpOutgoingConn = NULL;
787 }
788 }
789
790 static void
791 icpCount(void *buf, int which, size_t len, int delay)
792 {
793 icp_common_t *icp = (icp_common_t *) buf;
794
795 if (len < sizeof(*icp))
796 return;
797
798 if (SENT == which) {
799 ++statCounter.icp.pkts_sent;
800 kb_incr(&statCounter.icp.kbytes_sent, len);
801
802 if (ICP_QUERY == icp->opcode) {
803 ++statCounter.icp.queries_sent;
804 kb_incr(&statCounter.icp.q_kbytes_sent, len);
805 } else {
806 ++statCounter.icp.replies_sent;
807 kb_incr(&statCounter.icp.r_kbytes_sent, len);
808 /* this is the sent-reply service time */
809 statCounter.icp.replySvcTime.count(delay);
810 }
811
812 if (ICP_HIT == icp->opcode)
813 ++statCounter.icp.hits_sent;
814 } else if (RECV == which) {
815 ++statCounter.icp.pkts_recv;
816 kb_incr(&statCounter.icp.kbytes_recv, len);
817
818 if (ICP_QUERY == icp->opcode) {
819 ++statCounter.icp.queries_recv;
820 kb_incr(&statCounter.icp.q_kbytes_recv, len);
821 } else {
822 ++statCounter.icp.replies_recv;
823 kb_incr(&statCounter.icp.r_kbytes_recv, len);
824 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
825 }
826
827 if (ICP_HIT == icp->opcode)
828 ++statCounter.icp.hits_recv;
829 }
830 }
831
832 #define N_QUERIED_KEYS 8192
833 #define N_QUERIED_KEYS_MASK 8191
834 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
835
836 int
837 icpSetCacheKey(const cache_key * key)
838 {
839 static int reqnum = 0;
840
841 if (++reqnum < 0)
842 reqnum = 1;
843
844 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
845
846 return reqnum;
847 }
848
849 const cache_key *
850 icpGetCacheKey(const char *url, int reqnum)
851 {
852 if (neighbors_do_private_keys && reqnum)
853 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
854
855 return storeKeyPublic(url, METHOD_GET);
856 }