]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 12 Internet Cache Protocol (ICP) */
10
11 /**
12 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
13 \ingroup ServerProtocolICPAPI
14 */
15
16 #include "squid.h"
17 #include "AccessLogEntry.h"
18 #include "acl/Acl.h"
19 #include "acl/FilledChecklist.h"
20 #include "client_db.h"
21 #include "comm.h"
22 #include "comm/Connection.h"
23 #include "comm/Loops.h"
24 #include "comm/UdpOpenDialer.h"
25 #include "fd.h"
26 #include "HttpRequest.h"
27 #include "icmp/net_db.h"
28 #include "ICP.h"
29 #include "ip/Address.h"
30 #include "ip/tools.h"
31 #include "ipcache.h"
32 #include "md5.h"
33 #include "multicast.h"
34 #include "neighbors.h"
35 #include "refresh.h"
36 #include "rfc1738.h"
37 #include "SquidConfig.h"
38 #include "SquidTime.h"
39 #include "StatCounters.h"
40 #include "Store.h"
41 #include "store_key_md5.h"
42 #include "tools.h"
43 #include "wordlist.h"
44
45 // for tvSubUsec() which should be in SquidTime.h
46 #include "util.h"
47
48 #include <cerrno>
49
50 /// a delayed icpUdpSend() call
51 class DelayedUdpSend {
52 public:
53 Ip::Address address; ///< remote peer (which may not be a cache_peer)
54 icp_common_t *msg = nullptr; ///< ICP message with network byte order fields
55 DelayedUdpSend *next = nullptr; ///< invasive FIFO queue of delayed ICP messages
56 AccessLogEntryPointer ale; ///< sender's master transaction summary
57 struct timeval queue_time = {0, 0}; ///< queuing timestamp
58 };
59
60 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
61
62 /// \ingroup ServerProtocolICPInternal2
63 static void icpLogIcp(const Ip::Address &, const LogTags_ot, int, const char *, const int, AccessLogEntryPointer &);
64
65 /// \ingroup ServerProtocolICPInternal2
66 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
67
68 /// \ingroup ServerProtocolICPInternal2
69 static void icpCount(void *, int, size_t, int);
70
71 static LogTags_ot icpLogFromICPCode(icp_opcode);
72
73 static int icpUdpSend(int fd, const Ip::Address &to, icp_common_t * msg, int delay, AccessLogEntryPointer al);
74
75 static void
76 icpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, const char *url, int len, int delay)
77 {
78 if (!al)
79 al = new AccessLogEntry();
80 al->icp.opcode = ICP_QUERY;
81 al->cache.caddr = caddr;
82 al->url = url;
83 al->setVirginUrlForMissingRequest(al->url);
84 // XXX: move to use icp.clientReply instead
85 al->http.clientReplySz.payloadData = len;
86 al->cache.start_time = current_time;
87 al->cache.start_time.tv_sec -= delay;
88 al->cache.trTime.tv_sec = delay;
89 al->cache.trTime.tv_usec = 0;
90 }
91
92 /**
93 \ingroup ServerProtocolICPInternal2
94 * IcpQueueHead is global so comm_incoming() knows whether or not
95 * to call icpUdpSendQueue.
96 */
97 static DelayedUdpSend *IcpQueueHead = NULL;
98 /// \ingroup ServerProtocolICPInternal2
99 static DelayedUdpSend *IcpQueueTail = NULL;
100
101 /// \ingroup ServerProtocolICPInternal2
102 Comm::ConnectionPointer icpIncomingConn = NULL;
103 /// \ingroup ServerProtocolICPInternal2
104 Comm::ConnectionPointer icpOutgoingConn = NULL;
105
106 /* icp_common_t */
107 icp_common_t::icp_common_t() :
108 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
109 flags(0), pad(0), shostid(0)
110 {}
111
112 icp_common_t::icp_common_t(char *buf, unsigned int len) :
113 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
114 {
115 if (len < sizeof(icp_common_t)) {
116 /* mark as invalid */
117 length = len + 1;
118 return;
119 }
120
121 memcpy(this, buf, sizeof(icp_common_t));
122 /*
123 * Convert network order sensitive fields
124 */
125 length = ntohs(length);
126 reqnum = ntohl(reqnum);
127 flags = ntohl(flags);
128 pad = ntohl(pad);
129 }
130
131 icp_opcode
132 icp_common_t::getOpCode() const
133 {
134 if (opcode > static_cast<char>(icp_opcode::ICP_END))
135 return ICP_INVALID;
136
137 return static_cast<icp_opcode>(opcode);
138 }
139
140 /* ICPState */
141
142 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
143 header(aHeader),
144 request(aRequest),
145 fd(-1),
146 url(NULL)
147 {
148 HTTPMSGLOCK(request);
149 }
150
151 ICPState::~ICPState()
152 {
153 safe_free(url);
154 HTTPMSGUNLOCK(request);
155 }
156
157 bool
158 ICPState::isHit() const
159 {
160 const auto e = storeGetPublic(url, Http::METHOD_GET);
161
162 const auto hit = e && confirmAndPrepHit(*e);
163
164 if (e)
165 e->abandon(__FUNCTION__);
166
167 return hit;
168 }
169
170 bool
171 ICPState::confirmAndPrepHit(const StoreEntry &e) const
172 {
173 if (!e.validToSend())
174 return false;
175
176 if (!Config.onoff.icp_hit_stale && refreshCheckICP(&e, request))
177 return false;
178
179 if (e.hittingRequiresCollapsing() && !startCollapsingOn(e, false))
180 return false;
181
182 return true;
183 }
184
185 LogTags *
186 ICPState::loggingTags() const
187 {
188 // calling icpSyncAle(LOG_TAG_NONE) here would not change cache.code
189 if (!al)
190 al = new AccessLogEntry();
191 return &al->cache.code;
192 }
193
194 void
195 ICPState::fillChecklist(ACLFilledChecklist &checklist) const
196 {
197 checklist.setRequest(request);
198 icpSyncAle(al, from, url, 0, 0);
199 checklist.al = al;
200 }
201
202 /* End ICPState */
203
204 /* ICP2State */
205
206 /// \ingroup ServerProtocolICPInternal2
207 class ICP2State: public ICPState
208 {
209
210 public:
211 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
212 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
213
214 ~ICP2State();
215
216 int rtt;
217 int src_rtt;
218 uint32_t flags;
219 };
220
221 ICP2State::~ICP2State()
222 {}
223
224 /* End ICP2State */
225
226 /// updates ALE (if any) and logs the transaction (if needed)
227 static void
228 icpLogIcp(const Ip::Address &caddr, const LogTags_ot logcode, const int len, const char *url, int delay, AccessLogEntry::Pointer &al)
229 {
230 assert(logcode != LOG_TAG_NONE);
231
232 // Optimization: No premature (ALE creation in) icpSyncAle().
233 if (al) {
234 icpSyncAle(al, caddr, url, len, delay);
235 al->cache.code.update(logcode);
236 }
237
238 if (logcode == LOG_ICP_QUERY)
239 return; // we never log queries
240
241 if (!Config.onoff.log_udp) {
242 clientdbUpdate(caddr, al ? al->cache.code : LogTags(logcode), AnyP::PROTO_ICP, len);
243 return;
244 }
245
246 if (!al) {
247 // The above attempt to optimize ALE creation has failed. We do need it.
248 icpSyncAle(al, caddr, url, len, delay);
249 al->cache.code.update(logcode);
250 }
251 clientdbUpdate(caddr, al->cache.code, AnyP::PROTO_ICP, len);
252 accessLogLog(al, NULL);
253 }
254
255 /// \ingroup ServerProtocolICPInternal2
256 void
257 icpUdpSendQueue(int fd, void *)
258 {
259 DelayedUdpSend *q;
260
261 while ((q = IcpQueueHead) != NULL) {
262 int delay = tvSubUsec(q->queue_time, current_time);
263 /* increment delay to prevent looping */
264 const int x = icpUdpSend(fd, q->address, q->msg, ++delay, q->ale);
265 IcpQueueHead = q->next;
266 delete q;
267
268 if (x < 0)
269 break;
270 }
271 }
272
273 icp_common_t *
274 icp_common_t::CreateMessage(
275 icp_opcode opcode,
276 int flags,
277 const char *url,
278 int reqnum,
279 int pad)
280 {
281 char *buf = NULL;
282 icp_common_t *headerp = NULL;
283 char *urloffset = NULL;
284 int buf_len;
285 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
286
287 if (opcode == ICP_QUERY)
288 buf_len += sizeof(uint32_t);
289
290 buf = (char *) xcalloc(buf_len, 1);
291
292 headerp = (icp_common_t *) (void *) buf;
293
294 headerp->opcode = (char) opcode;
295
296 headerp->version = ICP_VERSION_CURRENT;
297
298 headerp->length = (uint16_t) htons(buf_len);
299
300 headerp->reqnum = htonl(reqnum);
301
302 headerp->flags = htonl(flags);
303
304 headerp->pad = htonl(pad);
305
306 headerp->shostid = 0;
307
308 urloffset = buf + sizeof(icp_common_t);
309
310 if (opcode == ICP_QUERY)
311 urloffset += sizeof(uint32_t);
312
313 memcpy(urloffset, url, strlen(url));
314
315 return (icp_common_t *)buf;
316 }
317
318 // TODO: Move retries to icpCreateAndSend(); the other caller does not retry.
319 /// writes the given UDP msg to the socket; queues a retry on the first failure
320 /// \returns a negative number on failures
321 static int
322 icpUdpSend(int fd,
323 const Ip::Address &to,
324 icp_common_t * msg,
325 int delay,
326 AccessLogEntryPointer al)
327 {
328 int x;
329 int len;
330 len = (int) ntohs(msg->length);
331 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
332 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
333
334 x = comm_udp_sendto(fd, to, msg, len);
335
336 if (x >= 0) {
337 /* successfully written */
338 const auto logcode = icpLogFromICPCode(static_cast<icp_opcode>(msg->opcode));
339 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay, al);
340 icpCount(msg, SENT, (size_t) len, delay);
341 safe_free(msg);
342 } else if (0 == delay) {
343 /* send failed, but queue it */
344 const auto queue = new DelayedUdpSend();
345 queue->address = to;
346 queue->msg = msg;
347 queue->queue_time = current_time;
348 queue->ale = al;
349
350 if (IcpQueueHead == NULL) {
351 IcpQueueHead = queue;
352 IcpQueueTail = queue;
353 } else if (IcpQueueTail == IcpQueueHead) {
354 IcpQueueTail = queue;
355 IcpQueueHead->next = queue;
356 } else {
357 IcpQueueTail->next = queue;
358 IcpQueueTail = queue;
359 }
360
361 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
362 ++statCounter.icp.replies_queued;
363 } else {
364 /* don't queue it */
365 // XXX: safe_free(msg)
366 ++statCounter.icp.replies_dropped;
367 }
368
369 return x;
370 }
371
372 /**
373 * This routine selects an ICP opcode for ICP misses.
374 *
375 \retval ICP_ERR no opcode selected here
376 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
377 */
378 icp_opcode
379 icpGetCommonOpcode()
380 {
381 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
382
383 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
384 hit_only_mode_until > squid_curtime) {
385 return ICP_MISS_NOFETCH;
386 }
387
388 return ICP_ERR;
389 }
390
391 static LogTags_ot
392 icpLogFromICPCode(icp_opcode opcode)
393 {
394 if (opcode == ICP_ERR)
395 return LOG_UDP_INVALID;
396
397 if (opcode == ICP_DENIED)
398 return LOG_UDP_DENIED;
399
400 if (opcode == ICP_HIT)
401 return LOG_UDP_HIT;
402
403 if (opcode == ICP_MISS)
404 return LOG_UDP_MISS;
405
406 if (opcode == ICP_MISS_NOFETCH)
407 return LOG_UDP_MISS_NOFETCH;
408
409 if (opcode == ICP_DECHO)
410 return LOG_ICP_QUERY;
411
412 if (opcode == ICP_QUERY)
413 return LOG_ICP_QUERY;
414
415 fatal("expected ICP opcode\n");
416
417 return LOG_UDP_INVALID;
418 }
419
420 void
421 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from, AccessLogEntry::Pointer al)
422 {
423 // update potentially shared ALE ASAP; the ICP query itself may be delayed
424 if (al)
425 al->cache.code.update(icpLogFromICPCode(opcode));
426 icp_common_t *reply = icp_common_t::CreateMessage(opcode, flags, url, reqnum, pad);
427 icpUdpSend(fd, from, reply, 0, al);
428 }
429
430 void
431 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
432 {
433 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
434
435 if (clientdbCutoffDenied(from)) {
436 /*
437 * count this DENIED query in the clientdb, even though
438 * we're not sending an ICP reply...
439 */
440 clientdbUpdate(from, LogTags(LOG_UDP_DENIED), AnyP::PROTO_ICP, 0);
441 } else {
442 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from, nullptr);
443 }
444 }
445
446 bool
447 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
448 {
449 /* absent any explicit rules, we deny all */
450 if (!Config.accessList.icp)
451 return false;
452
453 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
454 checklist.src_addr = from;
455 checklist.my_addr.setNoAddr();
456 return checklist.fastCheck().allowed();
457 }
458
459 char const *
460 icpGetUrlToSend(char *url)
461 {
462 if (strpbrk(url, w_space))
463 return rfc1738_escape(url);
464 else
465 return url;
466 }
467
468 HttpRequest *
469 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
470 {
471 if (strpbrk(url, w_space)) {
472 url = rfc1738_escape(url);
473 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from, nullptr);
474 return NULL;
475 }
476
477 const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIcp);
478 auto *result = HttpRequest::FromUrlXXX(url, mx);
479 if (!result)
480 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from, nullptr);
481
482 return result;
483
484 }
485
486 static void
487 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
488 {
489 int rtt = 0;
490 int src_rtt = 0;
491 uint32_t flags = 0;
492 /* We have a valid packet */
493 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
494 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
495
496 if (!icp_request)
497 return;
498
499 HTTPMSGLOCK(icp_request);
500
501 if (!icpAccessAllowed(from, icp_request)) {
502 icpDenyAccess(from, url, header.reqnum, fd);
503 HTTPMSGUNLOCK(icp_request);
504 return;
505 }
506 #if USE_ICMP
507 if (header.flags & ICP_FLAG_SRC_RTT) {
508 rtt = netdbHostRtt(icp_request->url.host());
509 int hops = netdbHostHops(icp_request->url.host());
510 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
511
512 if (rtt)
513 flags |= ICP_FLAG_SRC_RTT;
514 }
515 #endif /* USE_ICMP */
516
517 /* The peer is allowed to use this cache */
518 ICP2State state(header, icp_request);
519 state.fd = fd;
520 state.from = from;
521 state.url = xstrdup(url);
522 state.flags = flags;
523 state.rtt = rtt;
524 state.src_rtt = src_rtt;
525
526 icp_opcode codeToSend;
527
528 if (state.isHit()) {
529 codeToSend = ICP_HIT;
530 } else {
531 #if USE_ICMP
532 if (Config.onoff.test_reachability && state.rtt == 0) {
533 if ((state.rtt = netdbHostRtt(state.request->url.host())) == 0)
534 netdbPingSite(state.request->url.host());
535 }
536 #endif /* USE_ICMP */
537
538 if (icpGetCommonOpcode() != ICP_ERR)
539 codeToSend = icpGetCommonOpcode();
540 else if (Config.onoff.test_reachability && rtt == 0)
541 codeToSend = ICP_MISS_NOFETCH;
542 else
543 codeToSend = ICP_MISS;
544 }
545
546 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from, state.al);
547
548 HTTPMSGUNLOCK(icp_request);
549 }
550
551 void
552 icp_common_t::handleReply(char *buf, Ip::Address &from)
553 {
554 if (neighbors_do_private_keys && reqnum == 0) {
555 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
556 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
557 neighbors_do_private_keys = 0;
558 }
559
560 char *url = buf + sizeof(icp_common_t);
561 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
562
563 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
564 /* call neighborsUdpAck even if ping_status != PING_WAITING */
565 neighborsUdpAck(key, this, from);
566 }
567
568 static void
569 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
570 {
571 if (len <= 0) {
572 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
573 return;
574 }
575
576 icp_common_t header(buf, len);
577 /*
578 * Length field should match the number of bytes read
579 */
580
581 if (len != header.length) {
582 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
583 return;
584 }
585
586 debugs(12, 5, "OPCODE " << icp_opcode_str[header.getOpCode()] << '=' << uint8_t(header.opcode));
587
588 switch (header.opcode) {
589
590 case ICP_QUERY:
591 /* We have a valid packet */
592 doV2Query(fd, from, buf, header);
593 break;
594
595 case ICP_HIT:
596
597 case ICP_DECHO:
598
599 case ICP_MISS:
600
601 case ICP_DENIED:
602
603 case ICP_MISS_NOFETCH:
604 header.handleReply(buf, from);
605 break;
606
607 case ICP_INVALID:
608
609 case ICP_ERR:
610 break;
611
612 default:
613 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
614
615 break;
616 }
617 }
618
619 #ifdef ICP_PKT_DUMP
620 static void
621 icpPktDump(icp_common_t * pkt)
622 {
623 Ip::Address a;
624
625 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
626 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
627 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
628 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
629 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
630 a = (struct in_addr)pkt->shostid;
631 debugs(12, 9, "shostid: " << a );
632 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
633 }
634
635 #endif
636
637 void
638 icpHandleUdp(int sock, void *)
639 {
640 int *N = &incoming_sockets_accepted;
641
642 Ip::Address from;
643 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
644 int len;
645 int icp_version;
646 int max = INCOMING_UDP_MAX;
647 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
648
649 while (max) {
650 --max;
651 len = comm_udp_recvfrom(sock,
652 buf,
653 SQUID_UDP_SO_RCVBUF - 1,
654 0,
655 from);
656
657 if (len == 0)
658 break;
659
660 if (len < 0) {
661 int xerrno = errno;
662 if (ignoreErrno(xerrno))
663 break;
664
665 #if _SQUID_LINUX_
666 /* Some Linux systems seem to set the FD for reading and then
667 * return ECONNREFUSED when sendto() fails and generates an ICMP
668 * port unreachable message. */
669 /* or maybe an EHOSTUNREACH "No route to host" message */
670 if (xerrno != ECONNREFUSED && xerrno != EHOSTUNREACH)
671 #endif
672 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerr(xerrno));
673
674 break;
675 }
676
677 ++(*N);
678 icpCount(buf, RECV, (size_t) len, 0);
679 buf[len] = '\0';
680 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
681 (unsigned long int)len << " bytes from " << from);
682
683 #ifdef ICP_PACKET_DUMP
684
685 icpPktDump(buf);
686 #endif
687
688 if ((size_t) len < sizeof(icp_common_t)) {
689 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
690 break;
691 }
692
693 icp_version = (int) buf[1]; /* cheat! */
694
695 if (icpOutgoingConn->local == from)
696 // ignore ICP packets which loop back (multicast usually)
697 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
698 else if (icp_version == ICP_VERSION_2)
699 icpHandleIcpV2(sock, from, buf, len);
700 else if (icp_version == ICP_VERSION_3)
701 icpHandleIcpV3(sock, from, buf, len);
702 else
703 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
704 " received from " << from);
705 }
706 }
707
708 void
709 icpOpenPorts(void)
710 {
711 uint16_t port;
712
713 if ((port = Config.Port.icp) <= 0)
714 return;
715
716 icpIncomingConn = new Comm::Connection;
717 icpIncomingConn->local = Config.Addrs.udp_incoming;
718 icpIncomingConn->local.port(port);
719
720 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
721 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
722 fatal("ICP port cannot be opened.");
723 }
724 /* split-stack for now requires default IPv4-only ICP */
725 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
726 icpIncomingConn->local.setIPv4();
727 }
728
729 AsyncCall::Pointer call = asyncCall(12, 2,
730 "icpIncomingConnectionOpened",
731 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
732
733 Ipc::StartListening(SOCK_DGRAM,
734 IPPROTO_UDP,
735 icpIncomingConn,
736 Ipc::fdnInIcpSocket, call);
737
738 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
739 icpOutgoingConn = new Comm::Connection;
740 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
741 icpOutgoingConn->local.port(port);
742
743 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
744 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
745 fatal("ICP port cannot be opened.");
746 }
747 /* split-stack for now requires default IPv4-only ICP */
748 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
749 icpOutgoingConn->local.setIPv4();
750 }
751
752 enter_suid();
753 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
754 leave_suid();
755
756 if (!Comm::IsConnOpen(icpOutgoingConn))
757 fatal("Cannot open Outgoing ICP Port");
758
759 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
760
761 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
762 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
763 }
764 }
765
766 static void
767 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int)
768 {
769 if (!Comm::IsConnOpen(conn))
770 fatal("Cannot open ICP Port");
771
772 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
773
774 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
775 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
776
777 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
778
779 fd_note(conn->fd, "Incoming ICP port");
780
781 if (Config.Addrs.udp_outgoing.isNoAddr()) {
782 icpOutgoingConn = conn;
783 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
784 }
785 }
786
787 /**
788 * icpConnectionShutdown only closes the 'in' socket if it is
789 * different than the 'out' socket.
790 */
791 void
792 icpConnectionShutdown(void)
793 {
794 if (!Comm::IsConnOpen(icpIncomingConn))
795 return;
796
797 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
798
799 /** Release the 'in' socket for lazy closure.
800 * in and out sockets may be sharing one same FD.
801 * This prevents this function from executing repeatedly.
802 */
803 icpIncomingConn = NULL;
804
805 /**
806 * Normally we only write to the outgoing ICP socket, but
807 * we also have a read handler there to catch messages sent
808 * to that specific interface. During shutdown, we must
809 * disable reading on the outgoing socket.
810 */
811 assert(Comm::IsConnOpen(icpOutgoingConn));
812
813 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
814 }
815
816 void
817 icpClosePorts(void)
818 {
819 icpConnectionShutdown();
820
821 if (icpOutgoingConn != NULL) {
822 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
823 icpOutgoingConn = NULL;
824 }
825 }
826
827 static void
828 icpCount(void *buf, int which, size_t len, int delay)
829 {
830 icp_common_t *icp = (icp_common_t *) buf;
831
832 if (len < sizeof(*icp))
833 return;
834
835 if (SENT == which) {
836 ++statCounter.icp.pkts_sent;
837 statCounter.icp.kbytes_sent += len;
838
839 if (ICP_QUERY == icp->opcode) {
840 ++statCounter.icp.queries_sent;
841 statCounter.icp.q_kbytes_sent += len;
842 } else {
843 ++statCounter.icp.replies_sent;
844 statCounter.icp.r_kbytes_sent += len;
845 /* this is the sent-reply service time */
846 statCounter.icp.replySvcTime.count(delay);
847 }
848
849 if (ICP_HIT == icp->opcode)
850 ++statCounter.icp.hits_sent;
851 } else if (RECV == which) {
852 ++statCounter.icp.pkts_recv;
853 statCounter.icp.kbytes_recv += len;
854
855 if (ICP_QUERY == icp->opcode) {
856 ++statCounter.icp.queries_recv;
857 statCounter.icp.q_kbytes_recv += len;
858 } else {
859 ++statCounter.icp.replies_recv;
860 statCounter.icp.r_kbytes_recv += len;
861 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
862 }
863
864 if (ICP_HIT == icp->opcode)
865 ++statCounter.icp.hits_recv;
866 }
867 }
868
869 #define N_QUERIED_KEYS 8192
870 #define N_QUERIED_KEYS_MASK 8191
871 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
872
873 int
874 icpSetCacheKey(const cache_key * key)
875 {
876 static int reqnum = 0;
877
878 if (++reqnum < 0)
879 reqnum = 1;
880
881 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
882
883 return reqnum;
884 }
885
886 const cache_key *
887 icpGetCacheKey(const char *url, int reqnum)
888 {
889 if (neighbors_do_private_keys && reqnum)
890 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
891
892 return storeKeyPublic(url, Http::METHOD_GET);
893 }
894