]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Merged from trunk
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "AccessLogEntry.h"
40 #include "acl/Acl.h"
41 #include "acl/FilledChecklist.h"
42 #include "client_db.h"
43 #include "comm.h"
44 #include "comm/Connection.h"
45 #include "comm/Loops.h"
46 #include "comm/UdpOpenDialer.h"
47 #include "fd.h"
48 #include "HttpRequest.h"
49 #include "icmp/net_db.h"
50 #include "ICP.h"
51 #include "ip/Address.h"
52 #include "ip/tools.h"
53 #include "ipcache.h"
54 #include "md5.h"
55 #include "multicast.h"
56 #include "neighbors.h"
57 #include "refresh.h"
58 #include "rfc1738.h"
59 #include "SquidConfig.h"
60 #include "SquidTime.h"
61 #include "StatCounters.h"
62 #include "Store.h"
63 #include "store_key_md5.h"
64 #include "SwapDir.h"
65 #include "tools.h"
66 #include "wordlist.h"
67
68 #if HAVE_ERRNO_H
69 #include <errno.h>
70 #endif
71
72 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
73
74 /// \ingroup ServerProtocolICPInternal2
75 static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
76
77 /// \ingroup ServerProtocolICPInternal2
78 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
79
80 /// \ingroup ServerProtocolICPInternal2
81 static void icpCount(void *, int, size_t, int);
82
83 /**
84 \ingroup ServerProtocolICPInternal2
85 * IcpQueueHead is global so comm_incoming() knows whether or not
86 * to call icpUdpSendQueue.
87 */
88 static icpUdpData *IcpQueueHead = NULL;
89 /// \ingroup ServerProtocolICPInternal2
90 static icpUdpData *IcpQueueTail = NULL;
91
92 /// \ingroup ServerProtocolICPInternal2
93 Comm::ConnectionPointer icpIncomingConn = NULL;
94 /// \ingroup ServerProtocolICPInternal2
95 Comm::ConnectionPointer icpOutgoingConn = NULL;
96
97 /* icp_common_t */
98 _icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
99 {}
100
101 _icp_common_t::_icp_common_t(char *buf, unsigned int len)
102 {
103 if (len < sizeof(_icp_common_t)) {
104 /* mark as invalid */
105 length = len + 1;
106 return;
107 }
108
109 memcpy(this, buf, sizeof(icp_common_t));
110 /*
111 * Convert network order sensitive fields
112 */
113 length = ntohs(length);
114 reqnum = ntohl(reqnum);
115 flags = ntohl(flags);
116 pad = ntohl(pad);
117 }
118
119 icp_opcode
120 _icp_common_t::getOpCode() const
121 {
122 if (opcode > (char)ICP_END)
123 return ICP_INVALID;
124
125 return (icp_opcode)opcode;
126 }
127
128 /* ICPState */
129
130 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
131 header(aHeader),
132 request(HTTPMSGLOCK(aRequest)),
133 fd(-1),
134 url(NULL)
135 {}
136
137 ICPState::~ICPState()
138 {
139 safe_free(url);
140 HTTPMSGUNLOCK(request);
141 }
142
143 /* End ICPState */
144
145 /* ICP2State */
146
147 /// \ingroup ServerProtocolICPInternal2
148 class ICP2State : public ICPState, public StoreClient
149 {
150
151 public:
152 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
153 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
154
155 ~ICP2State();
156 void created(StoreEntry * newEntry);
157
158 int rtt;
159 int src_rtt;
160 uint32_t flags;
161 };
162
163 ICP2State::~ICP2State()
164 {}
165
166 void
167 ICP2State::created(StoreEntry *newEntry)
168 {
169 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
170 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
171 icp_opcode codeToSend;
172
173 if (icpCheckUdpHit(entry, request)) {
174 codeToSend = ICP_HIT;
175 } else {
176 #if USE_ICMP
177 if (Config.onoff.test_reachability && rtt == 0) {
178 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
179 netdbPingSite(request->GetHost());
180 }
181 #endif /* USE_ICMP */
182
183 if (icpGetCommonOpcode() != ICP_ERR)
184 codeToSend = icpGetCommonOpcode();
185 else if (Config.onoff.test_reachability && rtt == 0)
186 codeToSend = ICP_MISS_NOFETCH;
187 else
188 codeToSend = ICP_MISS;
189 }
190
191 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
192 delete this;
193 }
194
195 /* End ICP2State */
196
197 /// \ingroup ServerProtocolICPInternal2
198 static void
199 icpLogIcp(const Ip::Address &caddr, log_type logcode, int len, const char *url, int delay)
200 {
201 AccessLogEntry::Pointer al = new AccessLogEntry();
202
203 if (LOG_TAG_NONE == logcode)
204 return;
205
206 if (LOG_ICP_QUERY == logcode)
207 return;
208
209 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
210
211 if (!Config.onoff.log_udp)
212 return;
213
214 al->icp.opcode = ICP_QUERY;
215
216 al->url = url;
217
218 al->cache.caddr = caddr;
219
220 al->cache.replySize = len;
221
222 al->cache.code = logcode;
223
224 al->cache.msec = delay;
225
226 accessLogLog(al, NULL);
227 }
228
229 /// \ingroup ServerProtocolICPInternal2
230 void
231 icpUdpSendQueue(int fd, void *unused)
232 {
233 icpUdpData *q;
234
235 while ((q = IcpQueueHead) != NULL) {
236 int delay = tvSubUsec(q->queue_time, current_time);
237 /* increment delay to prevent looping */
238 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
239 IcpQueueHead = q->next;
240 xfree(q);
241
242 if (x < 0)
243 break;
244 }
245 }
246
247 _icp_common_t *
248 _icp_common_t::createMessage(
249 icp_opcode opcode,
250 int flags,
251 const char *url,
252 int reqnum,
253 int pad)
254 {
255 char *buf = NULL;
256 icp_common_t *headerp = NULL;
257 char *urloffset = NULL;
258 int buf_len;
259 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
260
261 if (opcode == ICP_QUERY)
262 buf_len += sizeof(uint32_t);
263
264 buf = (char *) xcalloc(buf_len, 1);
265
266 headerp = (icp_common_t *) (void *) buf;
267
268 headerp->opcode = (char) opcode;
269
270 headerp->version = ICP_VERSION_CURRENT;
271
272 headerp->length = (uint16_t) htons(buf_len);
273
274 headerp->reqnum = htonl(reqnum);
275
276 headerp->flags = htonl(flags);
277
278 headerp->pad = htonl(pad);
279
280 headerp->shostid = 0;
281
282 urloffset = buf + sizeof(icp_common_t);
283
284 if (opcode == ICP_QUERY)
285 urloffset += sizeof(uint32_t);
286
287 memcpy(urloffset, url, strlen(url));
288
289 return (icp_common_t *)buf;
290 }
291
292 int
293 icpUdpSend(int fd,
294 const Ip::Address &to,
295 icp_common_t * msg,
296 log_type logcode,
297 int delay)
298 {
299 icpUdpData *queue;
300 int x;
301 int len;
302 len = (int) ntohs(msg->length);
303 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
304 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
305
306 x = comm_udp_sendto(fd, to, msg, len);
307
308 if (x >= 0) {
309 /* successfully written */
310 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
311 icpCount(msg, SENT, (size_t) len, delay);
312 safe_free(msg);
313 } else if (0 == delay) {
314 /* send failed, but queue it */
315 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
316 queue->address = to;
317 queue->msg = msg;
318 queue->len = (int) ntohs(msg->length);
319 queue->queue_time = current_time;
320 queue->logcode = logcode;
321
322 if (IcpQueueHead == NULL) {
323 IcpQueueHead = queue;
324 IcpQueueTail = queue;
325 } else if (IcpQueueTail == IcpQueueHead) {
326 IcpQueueTail = queue;
327 IcpQueueHead->next = queue;
328 } else {
329 IcpQueueTail->next = queue;
330 IcpQueueTail = queue;
331 }
332
333 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
334 ++statCounter.icp.replies_queued;
335 } else {
336 /* don't queue it */
337 ++statCounter.icp.replies_dropped;
338 }
339
340 return x;
341 }
342
343 int
344 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
345 {
346 if (e == NULL)
347 return 0;
348
349 if (!e->validToSend())
350 return 0;
351
352 if (Config.onoff.icp_hit_stale)
353 return 1;
354
355 if (refreshCheckICP(e, request))
356 return 0;
357
358 return 1;
359 }
360
361 /**
362 * This routine selects an ICP opcode for ICP misses.
363 *
364 \retval ICP_ERR no opcode selected here
365 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
366 */
367 icp_opcode
368 icpGetCommonOpcode()
369 {
370 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
371
372 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
373 hit_only_mode_until > squid_curtime) {
374 return ICP_MISS_NOFETCH;
375 }
376
377 return ICP_ERR;
378 }
379
380 log_type
381 icpLogFromICPCode(icp_opcode opcode)
382 {
383 if (opcode == ICP_ERR)
384 return LOG_UDP_INVALID;
385
386 if (opcode == ICP_DENIED)
387 return LOG_UDP_DENIED;
388
389 if (opcode == ICP_HIT)
390 return LOG_UDP_HIT;
391
392 if (opcode == ICP_MISS)
393 return LOG_UDP_MISS;
394
395 if (opcode == ICP_MISS_NOFETCH)
396 return LOG_UDP_MISS_NOFETCH;
397
398 fatal("expected ICP opcode\n");
399
400 return LOG_UDP_INVALID;
401 }
402
403 void
404 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
405 {
406 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
407 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
408 }
409
410 void
411 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
412 {
413 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
414
415 if (clientdbCutoffDenied(from)) {
416 /*
417 * count this DENIED query in the clientdb, even though
418 * we're not sending an ICP reply...
419 */
420 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
421 } else {
422 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
423 }
424 }
425
426 bool
427 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
428 {
429 /* absent an explicit allow, we deny all */
430 if (!Config.accessList.icp)
431 return true;
432
433 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
434 checklist.src_addr = from;
435 checklist.my_addr.SetNoAddr();
436 return (checklist.fastCheck() == ACCESS_ALLOWED);
437 }
438
439 char const *
440 icpGetUrlToSend(char *url)
441 {
442 if (strpbrk(url, w_space))
443 return rfc1738_escape(url);
444 else
445 return url;
446 }
447
448 HttpRequest *
449 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
450 {
451 if (strpbrk(url, w_space)) {
452 url = rfc1738_escape(url);
453 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
454 return NULL;
455 }
456
457 HttpRequest *result;
458
459 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
460 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
461
462 return result;
463
464 }
465
466 static void
467 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
468 {
469 int rtt = 0;
470 int src_rtt = 0;
471 uint32_t flags = 0;
472 /* We have a valid packet */
473 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
474 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
475
476 if (!icp_request)
477 return;
478
479 HTTPMSGLOCK(icp_request);
480
481 if (!icpAccessAllowed(from, icp_request)) {
482 icpDenyAccess(from, url, header.reqnum, fd);
483 HTTPMSGUNLOCK(icp_request);
484 return;
485 }
486 #if USE_ICMP
487 if (header.flags & ICP_FLAG_SRC_RTT) {
488 rtt = netdbHostRtt(icp_request->GetHost());
489 int hops = netdbHostHops(icp_request->GetHost());
490 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
491
492 if (rtt)
493 flags |= ICP_FLAG_SRC_RTT;
494 }
495 #endif /* USE_ICMP */
496
497 /* The peer is allowed to use this cache */
498 ICP2State *state = new ICP2State (header, icp_request);
499
500 state->fd = fd;
501
502 state->from = from;
503
504 state->url = xstrdup (url);
505
506 state->flags = flags;
507
508 state->rtt = rtt;
509
510 state->src_rtt = src_rtt;
511
512 StoreEntry::getPublic (state, url, Http::METHOD_GET);
513
514 HTTPMSGUNLOCK(icp_request);
515 }
516
517 void
518 _icp_common_t::handleReply(char *buf, Ip::Address &from)
519 {
520 if (neighbors_do_private_keys && reqnum == 0) {
521 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
522 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
523 neighbors_do_private_keys = 0;
524 }
525
526 char *url = buf + sizeof(icp_common_t);
527 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
528
529 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
530 /* call neighborsUdpAck even if ping_status != PING_WAITING */
531 neighborsUdpAck(key, this, from);
532 }
533
534 static void
535 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
536 {
537 if (len <= 0) {
538 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
539 return;
540 }
541
542 icp_common_t header(buf, len);
543 /*
544 * Length field should match the number of bytes read
545 */
546
547 if (len != header.length) {
548 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
549 return;
550 }
551
552 switch (header.opcode) {
553
554 case ICP_QUERY:
555 /* We have a valid packet */
556 doV2Query(fd, from, buf, header);
557 break;
558
559 case ICP_HIT:
560
561 case ICP_DECHO:
562
563 case ICP_MISS:
564
565 case ICP_DENIED:
566
567 case ICP_MISS_NOFETCH:
568 header.handleReply(buf, from);
569 break;
570
571 case ICP_INVALID:
572
573 case ICP_ERR:
574 break;
575
576 default:
577 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
578
579 break;
580 }
581 }
582
583 #ifdef ICP_PKT_DUMP
584 static void
585 icpPktDump(icp_common_t * pkt)
586 {
587 Ip::Address a;
588
589 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
590 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
591 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
592 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
593 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
594 a = (struct in_addr)pkt->shostid;
595 debugs(12, 9, "shostid: " << a );
596 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
597 }
598
599 #endif
600
601 void
602 icpHandleUdp(int sock, void *data)
603 {
604 int *N = &incoming_sockets_accepted;
605
606 Ip::Address from;
607 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
608 int len;
609 int icp_version;
610 int max = INCOMING_UDP_MAX;
611 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
612
613 while (max) {
614 --max;
615 len = comm_udp_recvfrom(sock,
616 buf,
617 SQUID_UDP_SO_RCVBUF - 1,
618 0,
619 from);
620
621 if (len == 0)
622 break;
623
624 if (len < 0) {
625 if (ignoreErrno(errno))
626 break;
627
628 #if _SQUID_LINUX_
629 /* Some Linux systems seem to set the FD for reading and then
630 * return ECONNREFUSED when sendto() fails and generates an ICMP
631 * port unreachable message. */
632 /* or maybe an EHOSTUNREACH "No route to host" message */
633 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
634 #endif
635
636 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
637
638 break;
639 }
640
641 ++(*N);
642 icpCount(buf, RECV, (size_t) len, 0);
643 buf[len] = '\0';
644 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
645 (unsigned long int)len << " bytes from " << from);
646
647 #ifdef ICP_PACKET_DUMP
648
649 icpPktDump(buf);
650 #endif
651
652 if ((size_t) len < sizeof(icp_common_t)) {
653 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
654 break;
655 }
656
657 icp_version = (int) buf[1]; /* cheat! */
658
659 if (icpOutgoingConn->local == from)
660 // ignore ICP packets which loop back (multicast usually)
661 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
662 else if (icp_version == ICP_VERSION_2)
663 icpHandleIcpV2(sock, from, buf, len);
664 else if (icp_version == ICP_VERSION_3)
665 icpHandleIcpV3(sock, from, buf, len);
666 else
667 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
668 " received from " << from);
669 }
670 }
671
672 void
673 icpOpenPorts(void)
674 {
675 uint16_t port;
676
677 if ((port = Config.Port.icp) <= 0)
678 return;
679
680 icpIncomingConn = new Comm::Connection;
681 icpIncomingConn->local = Config.Addrs.udp_incoming;
682 icpIncomingConn->local.SetPort(port);
683
684 if (!Ip::EnableIpv6 && !icpIncomingConn->local.SetIPv4()) {
685 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
686 fatal("ICP port cannot be opened.");
687 }
688 /* split-stack for now requires default IPv4-only ICP */
689 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.IsAnyAddr()) {
690 icpIncomingConn->local.SetIPv4();
691 }
692
693 AsyncCall::Pointer call = asyncCall(12, 2,
694 "icpIncomingConnectionOpened",
695 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
696
697 Ipc::StartListening(SOCK_DGRAM,
698 IPPROTO_UDP,
699 icpIncomingConn,
700 Ipc::fdnInIcpSocket, call);
701
702 if ( !Config.Addrs.udp_outgoing.IsNoAddr() ) {
703 icpOutgoingConn = new Comm::Connection;
704 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
705 icpOutgoingConn->local.SetPort(port);
706
707 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.SetIPv4()) {
708 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
709 fatal("ICP port cannot be opened.");
710 }
711 /* split-stack for now requires default IPv4-only ICP */
712 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.IsAnyAddr()) {
713 icpOutgoingConn->local.SetIPv4();
714 }
715
716 enter_suid();
717 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
718 leave_suid();
719
720 if (!Comm::IsConnOpen(icpOutgoingConn))
721 fatal("Cannot open Outgoing ICP Port");
722
723 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
724
725 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
726 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
727 }
728 }
729
730 static void
731 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo)
732 {
733 if (!Comm::IsConnOpen(conn))
734 fatal("Cannot open ICP Port");
735
736 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
737
738 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
739 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
740
741 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
742
743 fd_note(conn->fd, "Incoming ICP port");
744
745 if (Config.Addrs.udp_outgoing.IsNoAddr()) {
746 icpOutgoingConn = conn;
747 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
748 }
749 }
750
751 /**
752 * icpConnectionShutdown only closes the 'in' socket if it is
753 * different than the 'out' socket.
754 */
755 void
756 icpConnectionShutdown(void)
757 {
758 if (!Comm::IsConnOpen(icpIncomingConn))
759 return;
760
761 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
762
763 /** Release the 'in' socket for lazy closure.
764 * in and out sockets may be sharing one same FD.
765 * This prevents this function from executing repeatedly.
766 */
767 icpIncomingConn = NULL;
768
769 /**
770 * Normally we only write to the outgoing ICP socket, but
771 * we also have a read handler there to catch messages sent
772 * to that specific interface. During shutdown, we must
773 * disable reading on the outgoing socket.
774 */
775 assert(Comm::IsConnOpen(icpOutgoingConn));
776
777 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
778 }
779
780 void
781 icpClosePorts(void)
782 {
783 icpConnectionShutdown();
784
785 if (icpOutgoingConn != NULL) {
786 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
787 icpOutgoingConn = NULL;
788 }
789 }
790
791 static void
792 icpCount(void *buf, int which, size_t len, int delay)
793 {
794 icp_common_t *icp = (icp_common_t *) buf;
795
796 if (len < sizeof(*icp))
797 return;
798
799 if (SENT == which) {
800 ++statCounter.icp.pkts_sent;
801 kb_incr(&statCounter.icp.kbytes_sent, len);
802
803 if (ICP_QUERY == icp->opcode) {
804 ++statCounter.icp.queries_sent;
805 kb_incr(&statCounter.icp.q_kbytes_sent, len);
806 } else {
807 ++statCounter.icp.replies_sent;
808 kb_incr(&statCounter.icp.r_kbytes_sent, len);
809 /* this is the sent-reply service time */
810 statCounter.icp.replySvcTime.count(delay);
811 }
812
813 if (ICP_HIT == icp->opcode)
814 ++statCounter.icp.hits_sent;
815 } else if (RECV == which) {
816 ++statCounter.icp.pkts_recv;
817 kb_incr(&statCounter.icp.kbytes_recv, len);
818
819 if (ICP_QUERY == icp->opcode) {
820 ++statCounter.icp.queries_recv;
821 kb_incr(&statCounter.icp.q_kbytes_recv, len);
822 } else {
823 ++statCounter.icp.replies_recv;
824 kb_incr(&statCounter.icp.r_kbytes_recv, len);
825 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
826 }
827
828 if (ICP_HIT == icp->opcode)
829 ++statCounter.icp.hits_recv;
830 }
831 }
832
833 #define N_QUERIED_KEYS 8192
834 #define N_QUERIED_KEYS_MASK 8191
835 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
836
837 int
838 icpSetCacheKey(const cache_key * key)
839 {
840 static int reqnum = 0;
841
842 if (++reqnum < 0)
843 reqnum = 1;
844
845 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
846
847 return reqnum;
848 }
849
850 const cache_key *
851 icpGetCacheKey(const char *url, int reqnum)
852 {
853 if (neighbors_do_private_keys && reqnum)
854 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
855
856 return storeKeyPublic(url, Http::METHOD_GET);
857 }