]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Cleanup: fix most 'unused parameter' warnings
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 12 Internet Cache Protocol (ICP) */
10
11 /**
12 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
13 \ingroup ServerProtocolICPAPI
14 */
15
16 #include "squid.h"
17 #include "AccessLogEntry.h"
18 #include "acl/Acl.h"
19 #include "acl/FilledChecklist.h"
20 #include "client_db.h"
21 #include "comm.h"
22 #include "comm/Connection.h"
23 #include "comm/Loops.h"
24 #include "comm/UdpOpenDialer.h"
25 #include "fd.h"
26 #include "HttpRequest.h"
27 #include "icmp/net_db.h"
28 #include "ICP.h"
29 #include "ip/Address.h"
30 #include "ip/tools.h"
31 #include "ipcache.h"
32 #include "md5.h"
33 #include "multicast.h"
34 #include "neighbors.h"
35 #include "refresh.h"
36 #include "rfc1738.h"
37 #include "SquidConfig.h"
38 #include "SquidTime.h"
39 #include "StatCounters.h"
40 #include "Store.h"
41 #include "store_key_md5.h"
42 #include "SwapDir.h"
43 #include "tools.h"
44 #include "wordlist.h"
45
46 // for tvSubUsec() which should be in SquidTime.h
47 #include "util.h"
48
49 #include <cerrno>
50
51 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
52
53 /// \ingroup ServerProtocolICPInternal2
54 static void icpLogIcp(const Ip::Address &, LogTags, int, const char *, int);
55
56 /// \ingroup ServerProtocolICPInternal2
57 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
58
59 /// \ingroup ServerProtocolICPInternal2
60 static void icpCount(void *, int, size_t, int);
61
62 /**
63 \ingroup ServerProtocolICPInternal2
64 * IcpQueueHead is global so comm_incoming() knows whether or not
65 * to call icpUdpSendQueue.
66 */
67 static icpUdpData *IcpQueueHead = NULL;
68 /// \ingroup ServerProtocolICPInternal2
69 static icpUdpData *IcpQueueTail = NULL;
70
71 /// \ingroup ServerProtocolICPInternal2
72 Comm::ConnectionPointer icpIncomingConn = NULL;
73 /// \ingroup ServerProtocolICPInternal2
74 Comm::ConnectionPointer icpOutgoingConn = NULL;
75
76 /* icp_common_t */
77 _icp_common_t::_icp_common_t() :
78 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
79 flags(0), pad(0), shostid(0)
80 {}
81
82 _icp_common_t::_icp_common_t(char *buf, unsigned int len) :
83 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
84 {
85 if (len < sizeof(_icp_common_t)) {
86 /* mark as invalid */
87 length = len + 1;
88 return;
89 }
90
91 memcpy(this, buf, sizeof(icp_common_t));
92 /*
93 * Convert network order sensitive fields
94 */
95 length = ntohs(length);
96 reqnum = ntohl(reqnum);
97 flags = ntohl(flags);
98 pad = ntohl(pad);
99 }
100
101 icp_opcode
102 _icp_common_t::getOpCode() const
103 {
104 if (opcode > (char)ICP_END)
105 return ICP_INVALID;
106
107 return (icp_opcode)opcode;
108 }
109
110 /* ICPState */
111
112 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
113 header(aHeader),
114 request(aRequest),
115 fd(-1),
116 url(NULL)
117 {
118 HTTPMSGLOCK(request);
119 }
120
121 ICPState::~ICPState()
122 {
123 safe_free(url);
124 HTTPMSGUNLOCK(request);
125 }
126
127 /* End ICPState */
128
129 /* ICP2State */
130
131 /// \ingroup ServerProtocolICPInternal2
132 class ICP2State : public ICPState, public StoreClient
133 {
134
135 public:
136 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
137 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
138
139 ~ICP2State();
140 void created(StoreEntry * newEntry);
141
142 int rtt;
143 int src_rtt;
144 uint32_t flags;
145 };
146
147 ICP2State::~ICP2State()
148 {}
149
150 void
151 ICP2State::created(StoreEntry *newEntry)
152 {
153 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
154 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
155 icp_opcode codeToSend;
156
157 if (icpCheckUdpHit(entry, request)) {
158 codeToSend = ICP_HIT;
159 } else {
160 #if USE_ICMP
161 if (Config.onoff.test_reachability && rtt == 0) {
162 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
163 netdbPingSite(request->GetHost());
164 }
165 #endif /* USE_ICMP */
166
167 if (icpGetCommonOpcode() != ICP_ERR)
168 codeToSend = icpGetCommonOpcode();
169 else if (Config.onoff.test_reachability && rtt == 0)
170 codeToSend = ICP_MISS_NOFETCH;
171 else
172 codeToSend = ICP_MISS;
173 }
174
175 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
176 delete this;
177 }
178
179 /* End ICP2State */
180
181 /// \ingroup ServerProtocolICPInternal2
182 static void
183 icpLogIcp(const Ip::Address &caddr, LogTags logcode, int len, const char *url, int delay)
184 {
185 AccessLogEntry::Pointer al = new AccessLogEntry();
186
187 if (LOG_TAG_NONE == logcode)
188 return;
189
190 if (LOG_ICP_QUERY == logcode)
191 return;
192
193 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
194
195 if (!Config.onoff.log_udp)
196 return;
197
198 al->icp.opcode = ICP_QUERY;
199
200 al->url = url;
201
202 al->cache.caddr = caddr;
203
204 // XXX: move to use icp.clientReply instead
205 al->http.clientReplySz.payloadData = len;
206
207 al->cache.code = logcode;
208
209 al->cache.trTime.tv_sec = delay;
210
211 accessLogLog(al, NULL);
212 }
213
214 /// \ingroup ServerProtocolICPInternal2
215 void
216 icpUdpSendQueue(int fd, void *)
217 {
218 icpUdpData *q;
219
220 while ((q = IcpQueueHead) != NULL) {
221 int delay = tvSubUsec(q->queue_time, current_time);
222 /* increment delay to prevent looping */
223 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
224 IcpQueueHead = q->next;
225 xfree(q);
226
227 if (x < 0)
228 break;
229 }
230 }
231
232 _icp_common_t *
233 _icp_common_t::createMessage(
234 icp_opcode opcode,
235 int flags,
236 const char *url,
237 int reqnum,
238 int pad)
239 {
240 char *buf = NULL;
241 icp_common_t *headerp = NULL;
242 char *urloffset = NULL;
243 int buf_len;
244 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
245
246 if (opcode == ICP_QUERY)
247 buf_len += sizeof(uint32_t);
248
249 buf = (char *) xcalloc(buf_len, 1);
250
251 headerp = (icp_common_t *) (void *) buf;
252
253 headerp->opcode = (char) opcode;
254
255 headerp->version = ICP_VERSION_CURRENT;
256
257 headerp->length = (uint16_t) htons(buf_len);
258
259 headerp->reqnum = htonl(reqnum);
260
261 headerp->flags = htonl(flags);
262
263 headerp->pad = htonl(pad);
264
265 headerp->shostid = 0;
266
267 urloffset = buf + sizeof(icp_common_t);
268
269 if (opcode == ICP_QUERY)
270 urloffset += sizeof(uint32_t);
271
272 memcpy(urloffset, url, strlen(url));
273
274 return (icp_common_t *)buf;
275 }
276
277 int
278 icpUdpSend(int fd,
279 const Ip::Address &to,
280 icp_common_t * msg,
281 LogTags logcode,
282 int delay)
283 {
284 icpUdpData *queue;
285 int x;
286 int len;
287 len = (int) ntohs(msg->length);
288 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
289 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
290
291 x = comm_udp_sendto(fd, to, msg, len);
292
293 if (x >= 0) {
294 /* successfully written */
295 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
296 icpCount(msg, SENT, (size_t) len, delay);
297 safe_free(msg);
298 } else if (0 == delay) {
299 /* send failed, but queue it */
300 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
301 queue->address = to;
302 queue->msg = msg;
303 queue->len = (int) ntohs(msg->length);
304 queue->queue_time = current_time;
305 queue->logcode = logcode;
306
307 if (IcpQueueHead == NULL) {
308 IcpQueueHead = queue;
309 IcpQueueTail = queue;
310 } else if (IcpQueueTail == IcpQueueHead) {
311 IcpQueueTail = queue;
312 IcpQueueHead->next = queue;
313 } else {
314 IcpQueueTail->next = queue;
315 IcpQueueTail = queue;
316 }
317
318 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
319 ++statCounter.icp.replies_queued;
320 } else {
321 /* don't queue it */
322 ++statCounter.icp.replies_dropped;
323 }
324
325 return x;
326 }
327
328 int
329 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
330 {
331 if (e == NULL)
332 return 0;
333
334 if (!e->validToSend())
335 return 0;
336
337 if (Config.onoff.icp_hit_stale)
338 return 1;
339
340 if (refreshCheckICP(e, request))
341 return 0;
342
343 return 1;
344 }
345
346 /**
347 * This routine selects an ICP opcode for ICP misses.
348 *
349 \retval ICP_ERR no opcode selected here
350 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
351 */
352 icp_opcode
353 icpGetCommonOpcode()
354 {
355 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
356
357 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
358 hit_only_mode_until > squid_curtime) {
359 return ICP_MISS_NOFETCH;
360 }
361
362 return ICP_ERR;
363 }
364
365 LogTags
366 icpLogFromICPCode(icp_opcode opcode)
367 {
368 if (opcode == ICP_ERR)
369 return LOG_UDP_INVALID;
370
371 if (opcode == ICP_DENIED)
372 return LOG_UDP_DENIED;
373
374 if (opcode == ICP_HIT)
375 return LOG_UDP_HIT;
376
377 if (opcode == ICP_MISS)
378 return LOG_UDP_MISS;
379
380 if (opcode == ICP_MISS_NOFETCH)
381 return LOG_UDP_MISS_NOFETCH;
382
383 fatal("expected ICP opcode\n");
384
385 return LOG_UDP_INVALID;
386 }
387
388 void
389 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
390 {
391 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
392 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
393 }
394
395 void
396 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
397 {
398 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
399
400 if (clientdbCutoffDenied(from)) {
401 /*
402 * count this DENIED query in the clientdb, even though
403 * we're not sending an ICP reply...
404 */
405 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
406 } else {
407 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
408 }
409 }
410
411 bool
412 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
413 {
414 /* absent any explicit rules, we deny all */
415 if (!Config.accessList.icp)
416 return false;
417
418 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
419 checklist.src_addr = from;
420 checklist.my_addr.setNoAddr();
421 return (checklist.fastCheck() == ACCESS_ALLOWED);
422 }
423
424 char const *
425 icpGetUrlToSend(char *url)
426 {
427 if (strpbrk(url, w_space))
428 return rfc1738_escape(url);
429 else
430 return url;
431 }
432
433 HttpRequest *
434 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
435 {
436 if (strpbrk(url, w_space)) {
437 url = rfc1738_escape(url);
438 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
439 return NULL;
440 }
441
442 HttpRequest *result;
443
444 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
445 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
446
447 return result;
448
449 }
450
451 static void
452 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
453 {
454 int rtt = 0;
455 int src_rtt = 0;
456 uint32_t flags = 0;
457 /* We have a valid packet */
458 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
459 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
460
461 if (!icp_request)
462 return;
463
464 HTTPMSGLOCK(icp_request);
465
466 if (!icpAccessAllowed(from, icp_request)) {
467 icpDenyAccess(from, url, header.reqnum, fd);
468 HTTPMSGUNLOCK(icp_request);
469 return;
470 }
471 #if USE_ICMP
472 if (header.flags & ICP_FLAG_SRC_RTT) {
473 rtt = netdbHostRtt(icp_request->GetHost());
474 int hops = netdbHostHops(icp_request->GetHost());
475 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
476
477 if (rtt)
478 flags |= ICP_FLAG_SRC_RTT;
479 }
480 #endif /* USE_ICMP */
481
482 /* The peer is allowed to use this cache */
483 ICP2State *state = new ICP2State(header, icp_request);
484 state->fd = fd;
485 state->from = from;
486 state->url = xstrdup(url);
487 state->flags = flags;
488 state->rtt = rtt;
489 state->src_rtt = src_rtt;
490
491 StoreEntry::getPublic(state, url, Http::METHOD_GET);
492
493 HTTPMSGUNLOCK(icp_request);
494 }
495
496 void
497 _icp_common_t::handleReply(char *buf, Ip::Address &from)
498 {
499 if (neighbors_do_private_keys && reqnum == 0) {
500 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
501 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
502 neighbors_do_private_keys = 0;
503 }
504
505 char *url = buf + sizeof(icp_common_t);
506 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
507
508 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
509 /* call neighborsUdpAck even if ping_status != PING_WAITING */
510 neighborsUdpAck(key, this, from);
511 }
512
513 static void
514 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
515 {
516 if (len <= 0) {
517 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
518 return;
519 }
520
521 icp_common_t header(buf, len);
522 /*
523 * Length field should match the number of bytes read
524 */
525
526 if (len != header.length) {
527 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
528 return;
529 }
530
531 switch (header.opcode) {
532
533 case ICP_QUERY:
534 /* We have a valid packet */
535 doV2Query(fd, from, buf, header);
536 break;
537
538 case ICP_HIT:
539
540 case ICP_DECHO:
541
542 case ICP_MISS:
543
544 case ICP_DENIED:
545
546 case ICP_MISS_NOFETCH:
547 header.handleReply(buf, from);
548 break;
549
550 case ICP_INVALID:
551
552 case ICP_ERR:
553 break;
554
555 default:
556 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
557
558 break;
559 }
560 }
561
562 #ifdef ICP_PKT_DUMP
563 static void
564 icpPktDump(icp_common_t * pkt)
565 {
566 Ip::Address a;
567
568 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
569 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
570 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
571 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
572 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
573 a = (struct in_addr)pkt->shostid;
574 debugs(12, 9, "shostid: " << a );
575 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
576 }
577
578 #endif
579
580 void
581 icpHandleUdp(int sock, void *)
582 {
583 int *N = &incoming_sockets_accepted;
584
585 Ip::Address from;
586 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
587 int len;
588 int icp_version;
589 int max = INCOMING_UDP_MAX;
590 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
591
592 while (max) {
593 --max;
594 len = comm_udp_recvfrom(sock,
595 buf,
596 SQUID_UDP_SO_RCVBUF - 1,
597 0,
598 from);
599
600 if (len == 0)
601 break;
602
603 if (len < 0) {
604 if (ignoreErrno(errno))
605 break;
606
607 #if _SQUID_LINUX_
608 /* Some Linux systems seem to set the FD for reading and then
609 * return ECONNREFUSED when sendto() fails and generates an ICMP
610 * port unreachable message. */
611 /* or maybe an EHOSTUNREACH "No route to host" message */
612 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
613 #endif
614
615 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
616
617 break;
618 }
619
620 ++(*N);
621 icpCount(buf, RECV, (size_t) len, 0);
622 buf[len] = '\0';
623 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
624 (unsigned long int)len << " bytes from " << from);
625
626 #ifdef ICP_PACKET_DUMP
627
628 icpPktDump(buf);
629 #endif
630
631 if ((size_t) len < sizeof(icp_common_t)) {
632 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
633 break;
634 }
635
636 icp_version = (int) buf[1]; /* cheat! */
637
638 if (icpOutgoingConn->local == from)
639 // ignore ICP packets which loop back (multicast usually)
640 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
641 else if (icp_version == ICP_VERSION_2)
642 icpHandleIcpV2(sock, from, buf, len);
643 else if (icp_version == ICP_VERSION_3)
644 icpHandleIcpV3(sock, from, buf, len);
645 else
646 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
647 " received from " << from);
648 }
649 }
650
651 void
652 icpOpenPorts(void)
653 {
654 uint16_t port;
655
656 if ((port = Config.Port.icp) <= 0)
657 return;
658
659 icpIncomingConn = new Comm::Connection;
660 icpIncomingConn->local = Config.Addrs.udp_incoming;
661 icpIncomingConn->local.port(port);
662
663 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
664 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
665 fatal("ICP port cannot be opened.");
666 }
667 /* split-stack for now requires default IPv4-only ICP */
668 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
669 icpIncomingConn->local.setIPv4();
670 }
671
672 AsyncCall::Pointer call = asyncCall(12, 2,
673 "icpIncomingConnectionOpened",
674 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
675
676 Ipc::StartListening(SOCK_DGRAM,
677 IPPROTO_UDP,
678 icpIncomingConn,
679 Ipc::fdnInIcpSocket, call);
680
681 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
682 icpOutgoingConn = new Comm::Connection;
683 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
684 icpOutgoingConn->local.port(port);
685
686 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
687 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
688 fatal("ICP port cannot be opened.");
689 }
690 /* split-stack for now requires default IPv4-only ICP */
691 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
692 icpOutgoingConn->local.setIPv4();
693 }
694
695 enter_suid();
696 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
697 leave_suid();
698
699 if (!Comm::IsConnOpen(icpOutgoingConn))
700 fatal("Cannot open Outgoing ICP Port");
701
702 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
703
704 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
705 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
706 }
707 }
708
709 static void
710 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int)
711 {
712 if (!Comm::IsConnOpen(conn))
713 fatal("Cannot open ICP Port");
714
715 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
716
717 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
718 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
719
720 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
721
722 fd_note(conn->fd, "Incoming ICP port");
723
724 if (Config.Addrs.udp_outgoing.isNoAddr()) {
725 icpOutgoingConn = conn;
726 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
727 }
728 }
729
730 /**
731 * icpConnectionShutdown only closes the 'in' socket if it is
732 * different than the 'out' socket.
733 */
734 void
735 icpConnectionShutdown(void)
736 {
737 if (!Comm::IsConnOpen(icpIncomingConn))
738 return;
739
740 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
741
742 /** Release the 'in' socket for lazy closure.
743 * in and out sockets may be sharing one same FD.
744 * This prevents this function from executing repeatedly.
745 */
746 icpIncomingConn = NULL;
747
748 /**
749 * Normally we only write to the outgoing ICP socket, but
750 * we also have a read handler there to catch messages sent
751 * to that specific interface. During shutdown, we must
752 * disable reading on the outgoing socket.
753 */
754 assert(Comm::IsConnOpen(icpOutgoingConn));
755
756 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
757 }
758
759 void
760 icpClosePorts(void)
761 {
762 icpConnectionShutdown();
763
764 if (icpOutgoingConn != NULL) {
765 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
766 icpOutgoingConn = NULL;
767 }
768 }
769
770 static void
771 icpCount(void *buf, int which, size_t len, int delay)
772 {
773 icp_common_t *icp = (icp_common_t *) buf;
774
775 if (len < sizeof(*icp))
776 return;
777
778 if (SENT == which) {
779 ++statCounter.icp.pkts_sent;
780 kb_incr(&statCounter.icp.kbytes_sent, len);
781
782 if (ICP_QUERY == icp->opcode) {
783 ++statCounter.icp.queries_sent;
784 kb_incr(&statCounter.icp.q_kbytes_sent, len);
785 } else {
786 ++statCounter.icp.replies_sent;
787 kb_incr(&statCounter.icp.r_kbytes_sent, len);
788 /* this is the sent-reply service time */
789 statCounter.icp.replySvcTime.count(delay);
790 }
791
792 if (ICP_HIT == icp->opcode)
793 ++statCounter.icp.hits_sent;
794 } else if (RECV == which) {
795 ++statCounter.icp.pkts_recv;
796 kb_incr(&statCounter.icp.kbytes_recv, len);
797
798 if (ICP_QUERY == icp->opcode) {
799 ++statCounter.icp.queries_recv;
800 kb_incr(&statCounter.icp.q_kbytes_recv, len);
801 } else {
802 ++statCounter.icp.replies_recv;
803 kb_incr(&statCounter.icp.r_kbytes_recv, len);
804 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
805 }
806
807 if (ICP_HIT == icp->opcode)
808 ++statCounter.icp.hits_recv;
809 }
810 }
811
812 #define N_QUERIED_KEYS 8192
813 #define N_QUERIED_KEYS_MASK 8191
814 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
815
816 int
817 icpSetCacheKey(const cache_key * key)
818 {
819 static int reqnum = 0;
820
821 if (++reqnum < 0)
822 reqnum = 1;
823
824 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
825
826 return reqnum;
827 }
828
829 const cache_key *
830 icpGetCacheKey(const char *url, int reqnum)
831 {
832 if (neighbors_do_private_keys && reqnum)
833 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
834
835 return storeKeyPublic(url, Http::METHOD_GET);
836 }
837