]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Various audit updates
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "AccessLogEntry.h"
40 #include "acl/Acl.h"
41 #include "acl/FilledChecklist.h"
42 #include "client_db.h"
43 #include "comm.h"
44 #include "comm/Connection.h"
45 #include "comm/Loops.h"
46 #include "comm/UdpOpenDialer.h"
47 #include "fd.h"
48 #include "HttpRequest.h"
49 #include "icmp/net_db.h"
50 #include "ICP.h"
51 #include "ip/Address.h"
52 #include "ip/tools.h"
53 #include "ipcache.h"
54 #include "md5.h"
55 #include "multicast.h"
56 #include "neighbors.h"
57 #include "refresh.h"
58 #include "rfc1738.h"
59 #include "SquidConfig.h"
60 #include "SquidTime.h"
61 #include "StatCounters.h"
62 #include "Store.h"
63 #include "store_key_md5.h"
64 #include "SwapDir.h"
65 #include "tools.h"
66 #include "wordlist.h"
67
68 #include <cerrno>
69
70 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
71
72 /// \ingroup ServerProtocolICPInternal2
73 static void icpLogIcp(const Ip::Address &, LogTags, int, const char *, int);
74
75 /// \ingroup ServerProtocolICPInternal2
76 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
77
78 /// \ingroup ServerProtocolICPInternal2
79 static void icpCount(void *, int, size_t, int);
80
81 /**
82 \ingroup ServerProtocolICPInternal2
83 * IcpQueueHead is global so comm_incoming() knows whether or not
84 * to call icpUdpSendQueue.
85 */
86 static icpUdpData *IcpQueueHead = NULL;
87 /// \ingroup ServerProtocolICPInternal2
88 static icpUdpData *IcpQueueTail = NULL;
89
90 /// \ingroup ServerProtocolICPInternal2
91 Comm::ConnectionPointer icpIncomingConn = NULL;
92 /// \ingroup ServerProtocolICPInternal2
93 Comm::ConnectionPointer icpOutgoingConn = NULL;
94
95 /* icp_common_t */
96 _icp_common_t::_icp_common_t() :
97 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
98 flags(0), pad(0), shostid(0)
99 {}
100
101 _icp_common_t::_icp_common_t(char *buf, unsigned int len) :
102 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
103 {
104 if (len < sizeof(_icp_common_t)) {
105 /* mark as invalid */
106 length = len + 1;
107 return;
108 }
109
110 memcpy(this, buf, sizeof(icp_common_t));
111 /*
112 * Convert network order sensitive fields
113 */
114 length = ntohs(length);
115 reqnum = ntohl(reqnum);
116 flags = ntohl(flags);
117 pad = ntohl(pad);
118 }
119
120 icp_opcode
121 _icp_common_t::getOpCode() const
122 {
123 if (opcode > (char)ICP_END)
124 return ICP_INVALID;
125
126 return (icp_opcode)opcode;
127 }
128
129 /* ICPState */
130
131 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
132 header(aHeader),
133 request(aRequest),
134 fd(-1),
135 url(NULL)
136 {
137 HTTPMSGLOCK(request);
138 }
139
140 ICPState::~ICPState()
141 {
142 safe_free(url);
143 HTTPMSGUNLOCK(request);
144 }
145
146 /* End ICPState */
147
148 /* ICP2State */
149
150 /// \ingroup ServerProtocolICPInternal2
151 class ICP2State : public ICPState, public StoreClient
152 {
153
154 public:
155 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
156 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
157
158 ~ICP2State();
159 void created(StoreEntry * newEntry);
160
161 int rtt;
162 int src_rtt;
163 uint32_t flags;
164 };
165
166 ICP2State::~ICP2State()
167 {}
168
169 void
170 ICP2State::created(StoreEntry *newEntry)
171 {
172 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
173 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
174 icp_opcode codeToSend;
175
176 if (icpCheckUdpHit(entry, request)) {
177 codeToSend = ICP_HIT;
178 } else {
179 #if USE_ICMP
180 if (Config.onoff.test_reachability && rtt == 0) {
181 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
182 netdbPingSite(request->GetHost());
183 }
184 #endif /* USE_ICMP */
185
186 if (icpGetCommonOpcode() != ICP_ERR)
187 codeToSend = icpGetCommonOpcode();
188 else if (Config.onoff.test_reachability && rtt == 0)
189 codeToSend = ICP_MISS_NOFETCH;
190 else
191 codeToSend = ICP_MISS;
192 }
193
194 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
195 delete this;
196 }
197
198 /* End ICP2State */
199
200 /// \ingroup ServerProtocolICPInternal2
201 static void
202 icpLogIcp(const Ip::Address &caddr, LogTags logcode, int len, const char *url, int delay)
203 {
204 AccessLogEntry::Pointer al = new AccessLogEntry();
205
206 if (LOG_TAG_NONE == logcode)
207 return;
208
209 if (LOG_ICP_QUERY == logcode)
210 return;
211
212 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
213
214 if (!Config.onoff.log_udp)
215 return;
216
217 al->icp.opcode = ICP_QUERY;
218
219 al->url = url;
220
221 al->cache.caddr = caddr;
222
223 // XXX: move to use icp.clientReply instead
224 al->http.clientReplySz.payloadData = len;
225
226 al->cache.code = logcode;
227
228 al->cache.msec = delay;
229
230 accessLogLog(al, NULL);
231 }
232
233 /// \ingroup ServerProtocolICPInternal2
234 void
235 icpUdpSendQueue(int fd, void *unused)
236 {
237 icpUdpData *q;
238
239 while ((q = IcpQueueHead) != NULL) {
240 int delay = tvSubUsec(q->queue_time, current_time);
241 /* increment delay to prevent looping */
242 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
243 IcpQueueHead = q->next;
244 xfree(q);
245
246 if (x < 0)
247 break;
248 }
249 }
250
251 _icp_common_t *
252 _icp_common_t::createMessage(
253 icp_opcode opcode,
254 int flags,
255 const char *url,
256 int reqnum,
257 int pad)
258 {
259 char *buf = NULL;
260 icp_common_t *headerp = NULL;
261 char *urloffset = NULL;
262 int buf_len;
263 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
264
265 if (opcode == ICP_QUERY)
266 buf_len += sizeof(uint32_t);
267
268 buf = (char *) xcalloc(buf_len, 1);
269
270 headerp = (icp_common_t *) (void *) buf;
271
272 headerp->opcode = (char) opcode;
273
274 headerp->version = ICP_VERSION_CURRENT;
275
276 headerp->length = (uint16_t) htons(buf_len);
277
278 headerp->reqnum = htonl(reqnum);
279
280 headerp->flags = htonl(flags);
281
282 headerp->pad = htonl(pad);
283
284 headerp->shostid = 0;
285
286 urloffset = buf + sizeof(icp_common_t);
287
288 if (opcode == ICP_QUERY)
289 urloffset += sizeof(uint32_t);
290
291 memcpy(urloffset, url, strlen(url));
292
293 return (icp_common_t *)buf;
294 }
295
296 int
297 icpUdpSend(int fd,
298 const Ip::Address &to,
299 icp_common_t * msg,
300 LogTags logcode,
301 int delay)
302 {
303 icpUdpData *queue;
304 int x;
305 int len;
306 len = (int) ntohs(msg->length);
307 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
308 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
309
310 x = comm_udp_sendto(fd, to, msg, len);
311
312 if (x >= 0) {
313 /* successfully written */
314 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
315 icpCount(msg, SENT, (size_t) len, delay);
316 safe_free(msg);
317 } else if (0 == delay) {
318 /* send failed, but queue it */
319 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
320 queue->address = to;
321 queue->msg = msg;
322 queue->len = (int) ntohs(msg->length);
323 queue->queue_time = current_time;
324 queue->logcode = logcode;
325
326 if (IcpQueueHead == NULL) {
327 IcpQueueHead = queue;
328 IcpQueueTail = queue;
329 } else if (IcpQueueTail == IcpQueueHead) {
330 IcpQueueTail = queue;
331 IcpQueueHead->next = queue;
332 } else {
333 IcpQueueTail->next = queue;
334 IcpQueueTail = queue;
335 }
336
337 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
338 ++statCounter.icp.replies_queued;
339 } else {
340 /* don't queue it */
341 ++statCounter.icp.replies_dropped;
342 }
343
344 return x;
345 }
346
347 int
348 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
349 {
350 if (e == NULL)
351 return 0;
352
353 if (!e->validToSend())
354 return 0;
355
356 if (Config.onoff.icp_hit_stale)
357 return 1;
358
359 if (refreshCheckICP(e, request))
360 return 0;
361
362 return 1;
363 }
364
365 /**
366 * This routine selects an ICP opcode for ICP misses.
367 *
368 \retval ICP_ERR no opcode selected here
369 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
370 */
371 icp_opcode
372 icpGetCommonOpcode()
373 {
374 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
375
376 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
377 hit_only_mode_until > squid_curtime) {
378 return ICP_MISS_NOFETCH;
379 }
380
381 return ICP_ERR;
382 }
383
384 LogTags
385 icpLogFromICPCode(icp_opcode opcode)
386 {
387 if (opcode == ICP_ERR)
388 return LOG_UDP_INVALID;
389
390 if (opcode == ICP_DENIED)
391 return LOG_UDP_DENIED;
392
393 if (opcode == ICP_HIT)
394 return LOG_UDP_HIT;
395
396 if (opcode == ICP_MISS)
397 return LOG_UDP_MISS;
398
399 if (opcode == ICP_MISS_NOFETCH)
400 return LOG_UDP_MISS_NOFETCH;
401
402 fatal("expected ICP opcode\n");
403
404 return LOG_UDP_INVALID;
405 }
406
407 void
408 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
409 {
410 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
411 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
412 }
413
414 void
415 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
416 {
417 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
418
419 if (clientdbCutoffDenied(from)) {
420 /*
421 * count this DENIED query in the clientdb, even though
422 * we're not sending an ICP reply...
423 */
424 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
425 } else {
426 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
427 }
428 }
429
430 bool
431 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
432 {
433 /* absent any explicit rules, we deny all */
434 if (!Config.accessList.icp)
435 return false;
436
437 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
438 checklist.src_addr = from;
439 checklist.my_addr.setNoAddr();
440 return (checklist.fastCheck() == ACCESS_ALLOWED);
441 }
442
443 char const *
444 icpGetUrlToSend(char *url)
445 {
446 if (strpbrk(url, w_space))
447 return rfc1738_escape(url);
448 else
449 return url;
450 }
451
452 HttpRequest *
453 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
454 {
455 if (strpbrk(url, w_space)) {
456 url = rfc1738_escape(url);
457 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
458 return NULL;
459 }
460
461 HttpRequest *result;
462
463 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
464 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
465
466 return result;
467
468 }
469
470 static void
471 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
472 {
473 int rtt = 0;
474 int src_rtt = 0;
475 uint32_t flags = 0;
476 /* We have a valid packet */
477 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
478 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
479
480 if (!icp_request)
481 return;
482
483 HTTPMSGLOCK(icp_request);
484
485 if (!icpAccessAllowed(from, icp_request)) {
486 icpDenyAccess(from, url, header.reqnum, fd);
487 HTTPMSGUNLOCK(icp_request);
488 return;
489 }
490 #if USE_ICMP
491 if (header.flags & ICP_FLAG_SRC_RTT) {
492 rtt = netdbHostRtt(icp_request->GetHost());
493 int hops = netdbHostHops(icp_request->GetHost());
494 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
495
496 if (rtt)
497 flags |= ICP_FLAG_SRC_RTT;
498 }
499 #endif /* USE_ICMP */
500
501 /* The peer is allowed to use this cache */
502 ICP2State *state = new ICP2State(header, icp_request);
503 state->fd = fd;
504 state->from = from;
505 state->url = xstrdup(url);
506 state->flags = flags;
507 state->rtt = rtt;
508 state->src_rtt = src_rtt;
509
510 StoreEntry::getPublic(state, url, Http::METHOD_GET);
511
512 HTTPMSGUNLOCK(icp_request);
513 }
514
515 void
516 _icp_common_t::handleReply(char *buf, Ip::Address &from)
517 {
518 if (neighbors_do_private_keys && reqnum == 0) {
519 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
520 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
521 neighbors_do_private_keys = 0;
522 }
523
524 char *url = buf + sizeof(icp_common_t);
525 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
526
527 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
528 /* call neighborsUdpAck even if ping_status != PING_WAITING */
529 neighborsUdpAck(key, this, from);
530 }
531
532 static void
533 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
534 {
535 if (len <= 0) {
536 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
537 return;
538 }
539
540 icp_common_t header(buf, len);
541 /*
542 * Length field should match the number of bytes read
543 */
544
545 if (len != header.length) {
546 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
547 return;
548 }
549
550 switch (header.opcode) {
551
552 case ICP_QUERY:
553 /* We have a valid packet */
554 doV2Query(fd, from, buf, header);
555 break;
556
557 case ICP_HIT:
558
559 case ICP_DECHO:
560
561 case ICP_MISS:
562
563 case ICP_DENIED:
564
565 case ICP_MISS_NOFETCH:
566 header.handleReply(buf, from);
567 break;
568
569 case ICP_INVALID:
570
571 case ICP_ERR:
572 break;
573
574 default:
575 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
576
577 break;
578 }
579 }
580
581 #ifdef ICP_PKT_DUMP
582 static void
583 icpPktDump(icp_common_t * pkt)
584 {
585 Ip::Address a;
586
587 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
588 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
589 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
590 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
591 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
592 a = (struct in_addr)pkt->shostid;
593 debugs(12, 9, "shostid: " << a );
594 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
595 }
596
597 #endif
598
599 void
600 icpHandleUdp(int sock, void *data)
601 {
602 int *N = &incoming_sockets_accepted;
603
604 Ip::Address from;
605 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
606 int len;
607 int icp_version;
608 int max = INCOMING_UDP_MAX;
609 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
610
611 while (max) {
612 --max;
613 len = comm_udp_recvfrom(sock,
614 buf,
615 SQUID_UDP_SO_RCVBUF - 1,
616 0,
617 from);
618
619 if (len == 0)
620 break;
621
622 if (len < 0) {
623 if (ignoreErrno(errno))
624 break;
625
626 #if _SQUID_LINUX_
627 /* Some Linux systems seem to set the FD for reading and then
628 * return ECONNREFUSED when sendto() fails and generates an ICMP
629 * port unreachable message. */
630 /* or maybe an EHOSTUNREACH "No route to host" message */
631 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
632 #endif
633
634 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
635
636 break;
637 }
638
639 ++(*N);
640 icpCount(buf, RECV, (size_t) len, 0);
641 buf[len] = '\0';
642 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
643 (unsigned long int)len << " bytes from " << from);
644
645 #ifdef ICP_PACKET_DUMP
646
647 icpPktDump(buf);
648 #endif
649
650 if ((size_t) len < sizeof(icp_common_t)) {
651 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
652 break;
653 }
654
655 icp_version = (int) buf[1]; /* cheat! */
656
657 if (icpOutgoingConn->local == from)
658 // ignore ICP packets which loop back (multicast usually)
659 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
660 else if (icp_version == ICP_VERSION_2)
661 icpHandleIcpV2(sock, from, buf, len);
662 else if (icp_version == ICP_VERSION_3)
663 icpHandleIcpV3(sock, from, buf, len);
664 else
665 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
666 " received from " << from);
667 }
668 }
669
670 void
671 icpOpenPorts(void)
672 {
673 uint16_t port;
674
675 if ((port = Config.Port.icp) <= 0)
676 return;
677
678 icpIncomingConn = new Comm::Connection;
679 icpIncomingConn->local = Config.Addrs.udp_incoming;
680 icpIncomingConn->local.port(port);
681
682 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
683 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
684 fatal("ICP port cannot be opened.");
685 }
686 /* split-stack for now requires default IPv4-only ICP */
687 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
688 icpIncomingConn->local.setIPv4();
689 }
690
691 AsyncCall::Pointer call = asyncCall(12, 2,
692 "icpIncomingConnectionOpened",
693 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
694
695 Ipc::StartListening(SOCK_DGRAM,
696 IPPROTO_UDP,
697 icpIncomingConn,
698 Ipc::fdnInIcpSocket, call);
699
700 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
701 icpOutgoingConn = new Comm::Connection;
702 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
703 icpOutgoingConn->local.port(port);
704
705 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
706 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
707 fatal("ICP port cannot be opened.");
708 }
709 /* split-stack for now requires default IPv4-only ICP */
710 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
711 icpOutgoingConn->local.setIPv4();
712 }
713
714 enter_suid();
715 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
716 leave_suid();
717
718 if (!Comm::IsConnOpen(icpOutgoingConn))
719 fatal("Cannot open Outgoing ICP Port");
720
721 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
722
723 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
724 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
725 }
726 }
727
728 static void
729 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo)
730 {
731 if (!Comm::IsConnOpen(conn))
732 fatal("Cannot open ICP Port");
733
734 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
735
736 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
737 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
738
739 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
740
741 fd_note(conn->fd, "Incoming ICP port");
742
743 if (Config.Addrs.udp_outgoing.isNoAddr()) {
744 icpOutgoingConn = conn;
745 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
746 }
747 }
748
749 /**
750 * icpConnectionShutdown only closes the 'in' socket if it is
751 * different than the 'out' socket.
752 */
753 void
754 icpConnectionShutdown(void)
755 {
756 if (!Comm::IsConnOpen(icpIncomingConn))
757 return;
758
759 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
760
761 /** Release the 'in' socket for lazy closure.
762 * in and out sockets may be sharing one same FD.
763 * This prevents this function from executing repeatedly.
764 */
765 icpIncomingConn = NULL;
766
767 /**
768 * Normally we only write to the outgoing ICP socket, but
769 * we also have a read handler there to catch messages sent
770 * to that specific interface. During shutdown, we must
771 * disable reading on the outgoing socket.
772 */
773 assert(Comm::IsConnOpen(icpOutgoingConn));
774
775 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
776 }
777
778 void
779 icpClosePorts(void)
780 {
781 icpConnectionShutdown();
782
783 if (icpOutgoingConn != NULL) {
784 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
785 icpOutgoingConn = NULL;
786 }
787 }
788
789 static void
790 icpCount(void *buf, int which, size_t len, int delay)
791 {
792 icp_common_t *icp = (icp_common_t *) buf;
793
794 if (len < sizeof(*icp))
795 return;
796
797 if (SENT == which) {
798 ++statCounter.icp.pkts_sent;
799 kb_incr(&statCounter.icp.kbytes_sent, len);
800
801 if (ICP_QUERY == icp->opcode) {
802 ++statCounter.icp.queries_sent;
803 kb_incr(&statCounter.icp.q_kbytes_sent, len);
804 } else {
805 ++statCounter.icp.replies_sent;
806 kb_incr(&statCounter.icp.r_kbytes_sent, len);
807 /* this is the sent-reply service time */
808 statCounter.icp.replySvcTime.count(delay);
809 }
810
811 if (ICP_HIT == icp->opcode)
812 ++statCounter.icp.hits_sent;
813 } else if (RECV == which) {
814 ++statCounter.icp.pkts_recv;
815 kb_incr(&statCounter.icp.kbytes_recv, len);
816
817 if (ICP_QUERY == icp->opcode) {
818 ++statCounter.icp.queries_recv;
819 kb_incr(&statCounter.icp.q_kbytes_recv, len);
820 } else {
821 ++statCounter.icp.replies_recv;
822 kb_incr(&statCounter.icp.r_kbytes_recv, len);
823 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
824 }
825
826 if (ICP_HIT == icp->opcode)
827 ++statCounter.icp.hits_recv;
828 }
829 }
830
831 #define N_QUERIED_KEYS 8192
832 #define N_QUERIED_KEYS_MASK 8191
833 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
834
835 int
836 icpSetCacheKey(const cache_key * key)
837 {
838 static int reqnum = 0;
839
840 if (++reqnum < 0)
841 reqnum = 1;
842
843 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
844
845 return reqnum;
846 }
847
848 const cache_key *
849 icpGetCacheKey(const char *url, int reqnum)
850 {
851 if (neighbors_do_private_keys && reqnum)
852 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
853
854 return storeKeyPublic(url, Http::METHOD_GET);
855 }